1 /* 2 * Copyright (c) 2008, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package sun.nio.ch; 27 28 import java.nio.channels.*; 29 import java.nio.ByteBuffer; 30 import java.nio.BufferOverflowException; 31 import java.net.*; 32 import java.util.concurrent.*; 33 import java.io.IOException; 34 import java.security.AccessController; 35 import java.security.PrivilegedActionException; 36 import java.security.PrivilegedExceptionAction; 37 import jdk.internal.misc.Unsafe; 38 39 /** 40 * Windows implementation of AsynchronousSocketChannel using overlapped I/O. 41 */ 42 43 class WindowsAsynchronousSocketChannelImpl 44 extends AsynchronousSocketChannelImpl implements Iocp.OverlappedChannel 45 { 46 private static final Unsafe unsafe = Unsafe.getUnsafe(); 47 private static int addressSize = unsafe.addressSize(); 48 49 private static int dependsArch(int value32, int value64) { 50 return (addressSize == 4) ? value32 : value64; 51 } 52 53 /* 54 * typedef struct _WSABUF { 55 * u_long len; 56 * char FAR * buf; 57 * } WSABUF; 58 */ 59 private static final int SIZEOF_WSABUF = dependsArch(8, 16); 60 private static final int OFFSETOF_LEN = 0; 61 private static final int OFFSETOF_BUF = dependsArch(4, 8); 62 63 // maximum vector size for scatter/gather I/O 64 private static final int MAX_WSABUF = 16; 65 66 private static final int SIZEOF_WSABUFARRAY = MAX_WSABUF * SIZEOF_WSABUF; 67 68 69 // socket handle. Use begin()/end() around each usage of this handle. 70 final long handle; 71 72 // I/O completion port that the socket is associated with 73 private final Iocp iocp; 74 75 // completion key to identify channel when I/O completes 76 private final int completionKey; 77 78 // Pending I/O operations are tied to an OVERLAPPED structure that can only 79 // be released when the I/O completion event is posted to the completion 80 // port. Where I/O operations complete immediately then it is possible 81 // there may be more than two OVERLAPPED structures in use. 82 private final PendingIoCache ioCache; 83 84 // per-channel arrays of WSABUF structures 85 private final long readBufferArray; 86 private final long writeBufferArray; 87 88 89 WindowsAsynchronousSocketChannelImpl(Iocp iocp, boolean failIfGroupShutdown) 90 throws IOException 91 { 92 super(iocp); 93 94 // associate socket with default completion port 95 long h = IOUtil.fdVal(fd); 96 int key = 0; 97 try { 98 key = iocp.associate(this, h); 99 } catch (ShutdownChannelGroupException x) { 100 if (failIfGroupShutdown) { 101 closesocket0(h); 102 throw x; 103 } 104 } catch (IOException x) { 105 closesocket0(h); 106 throw x; 107 } 108 109 this.handle = h; 110 this.iocp = iocp; 111 this.completionKey = key; 112 this.ioCache = new PendingIoCache(); 113 114 // allocate WSABUF arrays 115 this.readBufferArray = unsafe.allocateMemory(SIZEOF_WSABUFARRAY); 116 this.writeBufferArray = unsafe.allocateMemory(SIZEOF_WSABUFARRAY); 117 } 118 119 WindowsAsynchronousSocketChannelImpl(Iocp iocp) throws IOException { 120 this(iocp, true); 121 } 122 123 @Override 124 public AsynchronousChannelGroupImpl group() { 125 return iocp; 126 } 127 128 /** 129 * Invoked by Iocp when an I/O operation competes. 130 */ 131 @Override 132 public <V,A> PendingFuture<V,A> getByOverlapped(long overlapped) { 133 return ioCache.remove(overlapped); 134 } 135 136 // invoked by WindowsAsynchronousServerSocketChannelImpl 137 long handle() { 138 return handle; 139 } 140 141 // invoked by WindowsAsynchronousServerSocketChannelImpl when new connection 142 // accept 143 void setConnected(InetSocketAddress localAddress, 144 InetSocketAddress remoteAddress) 145 { 146 synchronized (stateLock) { 147 state = ST_CONNECTED; 148 this.localAddress = localAddress; 149 this.remoteAddress = remoteAddress; 150 } 151 } 152 153 @Override 154 void implClose() throws IOException { 155 // close socket (may cause outstanding async I/O operations to fail). 156 closesocket0(handle); 157 158 // waits until all I/O operations have completed 159 ioCache.close(); 160 161 // release arrays of WSABUF structures 162 unsafe.freeMemory(readBufferArray); 163 unsafe.freeMemory(writeBufferArray); 164 165 // finally disassociate from the completion port (key can be 0 if 166 // channel created when group is shutdown) 167 if (completionKey != 0) 168 iocp.disassociate(completionKey); 169 } 170 171 @Override 172 public void onCancel(PendingFuture<?,?> task) { 173 if (task.getContext() instanceof ConnectTask) 174 killConnect(); 175 if (task.getContext() instanceof ReadTask) 176 killReading(); 177 if (task.getContext() instanceof WriteTask) 178 killWriting(); 179 } 180 181 /** 182 * Implements the task to initiate a connection and the handler to 183 * consume the result when the connection is established (or fails). 184 */ 185 private class ConnectTask<A> implements Runnable, Iocp.ResultHandler { 186 private final InetSocketAddress remote; 187 private final PendingFuture<Void,A> result; 188 189 ConnectTask(InetSocketAddress remote, PendingFuture<Void,A> result) { 190 this.remote = remote; 191 this.result = result; 192 } 193 194 private void closeChannel() { 195 try { 196 close(); 197 } catch (IOException ignore) { } 198 } 199 200 private IOException toIOException(Throwable x) { 201 if (x instanceof IOException) { 202 if (x instanceof ClosedChannelException) 203 x = new AsynchronousCloseException(); 204 return (IOException)x; 205 } 206 return new IOException(x); 207 } 208 209 /** 210 * Invoke after a connection is successfully established. 211 */ 212 private void afterConnect() throws IOException { 213 updateConnectContext(handle); 214 synchronized (stateLock) { 215 state = ST_CONNECTED; 216 remoteAddress = remote; 217 } 218 } 219 220 /** 221 * Task to initiate a connection. 222 */ 223 @Override 224 public void run() { 225 long overlapped = 0L; 226 Throwable exc = null; 227 try { 228 begin(); 229 230 // synchronize on result to allow this thread handle the case 231 // where the connection is established immediately. 232 synchronized (result) { 233 overlapped = ioCache.add(result); 234 // initiate the connection 235 int n = connect0(handle, Net.isIPv6Available(), remote.getAddress(), 236 remote.getPort(), overlapped); 237 if (n == IOStatus.UNAVAILABLE) { 238 // connection is pending 239 return; 240 } 241 242 // connection established immediately 243 afterConnect(); 244 result.setResult(null); 245 } 246 } catch (Throwable x) { 247 if (overlapped != 0L) 248 ioCache.remove(overlapped); 249 exc = x; 250 } finally { 251 end(); 252 } 253 254 if (exc != null) { 255 closeChannel(); 256 result.setFailure(toIOException(exc)); 257 } 258 Invoker.invoke(result); 259 } 260 261 /** 262 * Invoked by handler thread when connection established. 263 */ 264 @Override 265 public void completed(int bytesTransferred, boolean canInvokeDirect) { 266 Throwable exc = null; 267 try { 268 begin(); 269 afterConnect(); 270 result.setResult(null); 271 } catch (Throwable x) { 272 // channel is closed or unable to finish connect 273 exc = x; 274 } finally { 275 end(); 276 } 277 278 // can't close channel while in begin/end block 279 if (exc != null) { 280 closeChannel(); 281 result.setFailure(toIOException(exc)); 282 } 283 284 if (canInvokeDirect) { 285 Invoker.invokeUnchecked(result); 286 } else { 287 Invoker.invoke(result); 288 } 289 } 290 291 /** 292 * Invoked by handler thread when failed to establish connection. 293 */ 294 @Override 295 public void failed(int error, IOException x) { 296 if (isOpen()) { 297 closeChannel(); 298 result.setFailure(x); 299 } else { 300 result.setFailure(new AsynchronousCloseException()); 301 } 302 Invoker.invoke(result); 303 } 304 } 305 306 private void doPrivilegedBind(final SocketAddress sa) throws IOException { 307 try { 308 AccessController.doPrivileged(new PrivilegedExceptionAction<Void>() { 309 public Void run() throws IOException { 310 bind(sa); 311 return null; 312 } 313 }); 314 } catch (PrivilegedActionException e) { 315 throw (IOException) e.getException(); 316 } 317 } 318 319 @Override 320 <A> Future<Void> implConnect(SocketAddress remote, 321 A attachment, 322 CompletionHandler<Void,? super A> handler) 323 { 324 if (!isOpen()) { 325 Throwable exc = new ClosedChannelException(); 326 if (handler == null) 327 return CompletedFuture.withFailure(exc); 328 Invoker.invoke(this, handler, attachment, null, exc); 329 return null; 330 } 331 332 InetSocketAddress isa = Net.checkAddress(remote); 333 334 // permission check 335 SecurityManager sm = System.getSecurityManager(); 336 if (sm != null) 337 sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort()); 338 339 // check and update state 340 // ConnectEx requires the socket to be bound to a local address 341 IOException bindException = null; 342 synchronized (stateLock) { 343 if (state == ST_CONNECTED) 344 throw new AlreadyConnectedException(); 345 if (state == ST_PENDING) 346 throw new ConnectionPendingException(); 347 if (localAddress == null) { 348 try { 349 SocketAddress any = new InetSocketAddress(0); 350 if (sm == null) { 351 bind(any); 352 } else { 353 doPrivilegedBind(any); 354 } 355 } catch (IOException x) { 356 bindException = x; 357 } 358 } 359 if (bindException == null) 360 state = ST_PENDING; 361 } 362 363 // handle bind failure 364 if (bindException != null) { 365 try { 366 close(); 367 } catch (IOException ignore) { } 368 if (handler == null) 369 return CompletedFuture.withFailure(bindException); 370 Invoker.invoke(this, handler, attachment, null, bindException); 371 return null; 372 } 373 374 // setup task 375 PendingFuture<Void,A> result = 376 new PendingFuture<Void,A>(this, handler, attachment); 377 ConnectTask<A> task = new ConnectTask<A>(isa, result); 378 result.setContext(task); 379 380 // initiate I/O 381 if (Iocp.supportsThreadAgnosticIo()) { 382 task.run(); 383 } else { 384 Invoker.invokeOnThreadInThreadPool(this, task); 385 } 386 return result; 387 } 388 389 /** 390 * Implements the task to initiate a read and the handler to consume the 391 * result when the read completes. 392 */ 393 private class ReadTask<V,A> implements Runnable, Iocp.ResultHandler { 394 private final ByteBuffer[] bufs; 395 private final int numBufs; 396 private final boolean scatteringRead; 397 private final PendingFuture<V,A> result; 398 399 // set by run method 400 private ByteBuffer[] shadow; 401 402 ReadTask(ByteBuffer[] bufs, 403 boolean scatteringRead, 404 PendingFuture<V,A> result) 405 { 406 this.bufs = bufs; 407 this.numBufs = (bufs.length > MAX_WSABUF) ? MAX_WSABUF : bufs.length; 408 this.scatteringRead = scatteringRead; 409 this.result = result; 410 } 411 412 /** 413 * Invoked prior to read to prepare the WSABUF array. Where necessary, 414 * it substitutes non-direct buffers with direct buffers. 415 */ 416 void prepareBuffers() { 417 shadow = new ByteBuffer[numBufs]; 418 long address = readBufferArray; 419 for (int i=0; i<numBufs; i++) { 420 ByteBuffer dst = bufs[i]; 421 int pos = dst.position(); 422 int lim = dst.limit(); 423 assert (pos <= lim); 424 int rem = (pos <= lim ? lim - pos : 0); 425 long a; 426 if (!(dst instanceof DirectBuffer)) { 427 // substitute with direct buffer 428 ByteBuffer bb = Util.getTemporaryDirectBuffer(rem); 429 shadow[i] = bb; 430 a = ((DirectBuffer)bb).address(); 431 } else { 432 shadow[i] = dst; 433 a = ((DirectBuffer)dst).address() + pos; 434 } 435 unsafe.putAddress(address + OFFSETOF_BUF, a); 436 unsafe.putInt(address + OFFSETOF_LEN, rem); 437 address += SIZEOF_WSABUF; 438 } 439 } 440 441 /** 442 * Invoked after a read has completed to update the buffer positions 443 * and release any substituted buffers. 444 */ 445 void updateBuffers(int bytesRead) { 446 for (int i=0; i<numBufs; i++) { 447 ByteBuffer nextBuffer = shadow[i]; 448 int pos = nextBuffer.position(); 449 int len = nextBuffer.remaining(); 450 if (bytesRead >= len) { 451 bytesRead -= len; 452 int newPosition = pos + len; 453 try { 454 nextBuffer.position(newPosition); 455 } catch (IllegalArgumentException x) { 456 // position changed by another 457 } 458 } else { // Buffers not completely filled 459 if (bytesRead > 0) { 460 assert(pos + bytesRead < (long)Integer.MAX_VALUE); 461 int newPosition = pos + bytesRead; 462 try { 463 nextBuffer.position(newPosition); 464 } catch (IllegalArgumentException x) { 465 // position changed by another 466 } 467 } 468 break; 469 } 470 } 471 472 // Put results from shadow into the slow buffers 473 for (int i=0; i<numBufs; i++) { 474 if (!(bufs[i] instanceof DirectBuffer)) { 475 shadow[i].flip(); 476 try { 477 bufs[i].put(shadow[i]); 478 } catch (BufferOverflowException x) { 479 // position changed by another 480 } 481 } 482 } 483 } 484 485 void releaseBuffers() { 486 for (int i=0; i<numBufs; i++) { 487 if (!(bufs[i] instanceof DirectBuffer)) { 488 Util.releaseTemporaryDirectBuffer(shadow[i]); 489 } 490 } 491 } 492 493 @Override 494 @SuppressWarnings("unchecked") 495 public void run() { 496 long overlapped = 0L; 497 boolean prepared = false; 498 boolean pending = false; 499 500 try { 501 begin(); 502 503 // substitute non-direct buffers 504 prepareBuffers(); 505 prepared = true; 506 507 // get an OVERLAPPED structure (from the cache or allocate) 508 overlapped = ioCache.add(result); 509 510 // initiate read 511 int n = read0(handle, numBufs, readBufferArray, overlapped); 512 if (n == IOStatus.UNAVAILABLE) { 513 // I/O is pending 514 pending = true; 515 return; 516 } 517 if (n == IOStatus.EOF) { 518 // input shutdown 519 enableReading(); 520 if (scatteringRead) { 521 result.setResult((V)Long.valueOf(-1L)); 522 } else { 523 result.setResult((V)Integer.valueOf(-1)); 524 } 525 } else { 526 throw new InternalError("Read completed immediately"); 527 } 528 } catch (Throwable x) { 529 // failed to initiate read 530 // reset read flag before releasing waiters 531 enableReading(); 532 if (x instanceof ClosedChannelException) 533 x = new AsynchronousCloseException(); 534 if (!(x instanceof IOException)) 535 x = new IOException(x); 536 result.setFailure(x); 537 } finally { 538 // release resources if I/O not pending 539 if (!pending) { 540 if (overlapped != 0L) 541 ioCache.remove(overlapped); 542 if (prepared) 543 releaseBuffers(); 544 } 545 end(); 546 } 547 548 // invoke completion handler 549 Invoker.invoke(result); 550 } 551 552 /** 553 * Executed when the I/O has completed 554 */ 555 @Override 556 @SuppressWarnings("unchecked") 557 public void completed(int bytesTransferred, boolean canInvokeDirect) { 558 if (bytesTransferred == 0) { 559 bytesTransferred = -1; // EOF 560 } else { 561 updateBuffers(bytesTransferred); 562 } 563 564 // return direct buffer to cache if substituted 565 releaseBuffers(); 566 567 // release waiters if not already released by timeout 568 synchronized (result) { 569 if (result.isDone()) 570 return; 571 enableReading(); 572 if (scatteringRead) { 573 result.setResult((V)Long.valueOf(bytesTransferred)); 574 } else { 575 result.setResult((V)Integer.valueOf(bytesTransferred)); 576 } 577 } 578 if (canInvokeDirect) { 579 Invoker.invokeUnchecked(result); 580 } else { 581 Invoker.invoke(result); 582 } 583 } 584 585 @Override 586 public void failed(int error, IOException x) { 587 // return direct buffer to cache if substituted 588 releaseBuffers(); 589 590 // release waiters if not already released by timeout 591 if (!isOpen()) 592 x = new AsynchronousCloseException(); 593 594 synchronized (result) { 595 if (result.isDone()) 596 return; 597 enableReading(); 598 result.setFailure(x); 599 } 600 Invoker.invoke(result); 601 } 602 603 /** 604 * Invoked if timeout expires before it is cancelled 605 */ 606 void timeout() { 607 // synchronize on result as the I/O could complete/fail 608 synchronized (result) { 609 if (result.isDone()) 610 return; 611 612 // kill further reading before releasing waiters 613 enableReading(true); 614 result.setFailure(new InterruptedByTimeoutException()); 615 } 616 617 // invoke handler without any locks 618 Invoker.invoke(result); 619 } 620 } 621 622 @Override 623 <V extends Number,A> Future<V> implRead(boolean isScatteringRead, 624 ByteBuffer dst, 625 ByteBuffer[] dsts, 626 long timeout, 627 TimeUnit unit, 628 A attachment, 629 CompletionHandler<V,? super A> handler) 630 { 631 // setup task 632 PendingFuture<V,A> result = 633 new PendingFuture<V,A>(this, handler, attachment); 634 ByteBuffer[] bufs; 635 if (isScatteringRead) { 636 bufs = dsts; 637 } else { 638 bufs = new ByteBuffer[1]; 639 bufs[0] = dst; 640 } 641 final ReadTask<V,A> readTask = 642 new ReadTask<V,A>(bufs, isScatteringRead, result); 643 result.setContext(readTask); 644 645 // schedule timeout 646 if (timeout > 0L) { 647 Future<?> timeoutTask = iocp.schedule(new Runnable() { 648 public void run() { 649 readTask.timeout(); 650 } 651 }, timeout, unit); 652 result.setTimeoutTask(timeoutTask); 653 } 654 655 // initiate I/O 656 if (Iocp.supportsThreadAgnosticIo()) { 657 readTask.run(); 658 } else { 659 Invoker.invokeOnThreadInThreadPool(this, readTask); 660 } 661 return result; 662 } 663 664 /** 665 * Implements the task to initiate a write and the handler to consume the 666 * result when the write completes. 667 */ 668 private class WriteTask<V,A> implements Runnable, Iocp.ResultHandler { 669 private final ByteBuffer[] bufs; 670 private final int numBufs; 671 private final boolean gatheringWrite; 672 private final PendingFuture<V,A> result; 673 674 // set by run method 675 private ByteBuffer[] shadow; 676 677 WriteTask(ByteBuffer[] bufs, 678 boolean gatheringWrite, 679 PendingFuture<V,A> result) 680 { 681 this.bufs = bufs; 682 this.numBufs = (bufs.length > MAX_WSABUF) ? MAX_WSABUF : bufs.length; 683 this.gatheringWrite = gatheringWrite; 684 this.result = result; 685 } 686 687 /** 688 * Invoked prior to write to prepare the WSABUF array. Where necessary, 689 * it substitutes non-direct buffers with direct buffers. 690 */ 691 void prepareBuffers() { 692 shadow = new ByteBuffer[numBufs]; 693 long address = writeBufferArray; 694 for (int i=0; i<numBufs; i++) { 695 ByteBuffer src = bufs[i]; 696 int pos = src.position(); 697 int lim = src.limit(); 698 assert (pos <= lim); 699 int rem = (pos <= lim ? lim - pos : 0); 700 long a; 701 if (!(src instanceof DirectBuffer)) { 702 // substitute with direct buffer 703 ByteBuffer bb = Util.getTemporaryDirectBuffer(rem); 704 bb.put(src); 705 bb.flip(); 706 src.position(pos); // leave heap buffer untouched for now 707 shadow[i] = bb; 708 a = ((DirectBuffer)bb).address(); 709 } else { 710 shadow[i] = src; 711 a = ((DirectBuffer)src).address() + pos; 712 } 713 unsafe.putAddress(address + OFFSETOF_BUF, a); 714 unsafe.putInt(address + OFFSETOF_LEN, rem); 715 address += SIZEOF_WSABUF; 716 } 717 } 718 719 /** 720 * Invoked after a write has completed to update the buffer positions 721 * and release any substituted buffers. 722 */ 723 void updateBuffers(int bytesWritten) { 724 // Notify the buffers how many bytes were taken 725 for (int i=0; i<numBufs; i++) { 726 ByteBuffer nextBuffer = bufs[i]; 727 int pos = nextBuffer.position(); 728 int lim = nextBuffer.limit(); 729 int len = (pos <= lim ? lim - pos : lim); 730 if (bytesWritten >= len) { 731 bytesWritten -= len; 732 int newPosition = pos + len; 733 try { 734 nextBuffer.position(newPosition); 735 } catch (IllegalArgumentException x) { 736 // position changed by someone else 737 } 738 } else { // Buffers not completely filled 739 if (bytesWritten > 0) { 740 assert(pos + bytesWritten < (long)Integer.MAX_VALUE); 741 int newPosition = pos + bytesWritten; 742 try { 743 nextBuffer.position(newPosition); 744 } catch (IllegalArgumentException x) { 745 // position changed by someone else 746 } 747 } 748 break; 749 } 750 } 751 } 752 753 void releaseBuffers() { 754 for (int i=0; i<numBufs; i++) { 755 if (!(bufs[i] instanceof DirectBuffer)) { 756 Util.releaseTemporaryDirectBuffer(shadow[i]); 757 } 758 } 759 } 760 761 @Override 762 //@SuppressWarnings("unchecked") 763 public void run() { 764 long overlapped = 0L; 765 boolean prepared = false; 766 boolean pending = false; 767 boolean shutdown = false; 768 769 try { 770 begin(); 771 772 // substitute non-direct buffers 773 prepareBuffers(); 774 prepared = true; 775 776 // get an OVERLAPPED structure (from the cache or allocate) 777 overlapped = ioCache.add(result); 778 int n = write0(handle, numBufs, writeBufferArray, overlapped); 779 if (n == IOStatus.UNAVAILABLE) { 780 // I/O is pending 781 pending = true; 782 return; 783 } 784 if (n == IOStatus.EOF) { 785 // special case for shutdown output 786 shutdown = true; 787 throw new ClosedChannelException(); 788 } 789 // write completed immediately 790 throw new InternalError("Write completed immediately"); 791 } catch (Throwable x) { 792 // write failed. Enable writing before releasing waiters. 793 enableWriting(); 794 if (!shutdown && (x instanceof ClosedChannelException)) 795 x = new AsynchronousCloseException(); 796 if (!(x instanceof IOException)) 797 x = new IOException(x); 798 result.setFailure(x); 799 } finally { 800 // release resources if I/O not pending 801 if (!pending) { 802 if (overlapped != 0L) 803 ioCache.remove(overlapped); 804 if (prepared) 805 releaseBuffers(); 806 } 807 end(); 808 } 809 810 // invoke completion handler 811 Invoker.invoke(result); 812 } 813 814 /** 815 * Executed when the I/O has completed 816 */ 817 @Override 818 @SuppressWarnings("unchecked") 819 public void completed(int bytesTransferred, boolean canInvokeDirect) { 820 updateBuffers(bytesTransferred); 821 822 // return direct buffer to cache if substituted 823 releaseBuffers(); 824 825 // release waiters if not already released by timeout 826 synchronized (result) { 827 if (result.isDone()) 828 return; 829 enableWriting(); 830 if (gatheringWrite) { 831 result.setResult((V)Long.valueOf(bytesTransferred)); 832 } else { 833 result.setResult((V)Integer.valueOf(bytesTransferred)); 834 } 835 } 836 if (canInvokeDirect) { 837 Invoker.invokeUnchecked(result); 838 } else { 839 Invoker.invoke(result); 840 } 841 } 842 843 @Override 844 public void failed(int error, IOException x) { 845 // return direct buffer to cache if substituted 846 releaseBuffers(); 847 848 // release waiters if not already released by timeout 849 if (!isOpen()) 850 x = new AsynchronousCloseException(); 851 852 synchronized (result) { 853 if (result.isDone()) 854 return; 855 enableWriting(); 856 result.setFailure(x); 857 } 858 Invoker.invoke(result); 859 } 860 861 /** 862 * Invoked if timeout expires before it is cancelled 863 */ 864 void timeout() { 865 // synchronize on result as the I/O could complete/fail 866 synchronized (result) { 867 if (result.isDone()) 868 return; 869 870 // kill further writing before releasing waiters 871 enableWriting(true); 872 result.setFailure(new InterruptedByTimeoutException()); 873 } 874 875 // invoke handler without any locks 876 Invoker.invoke(result); 877 } 878 } 879 880 @Override 881 <V extends Number,A> Future<V> implWrite(boolean gatheringWrite, 882 ByteBuffer src, 883 ByteBuffer[] srcs, 884 long timeout, 885 TimeUnit unit, 886 A attachment, 887 CompletionHandler<V,? super A> handler) 888 { 889 // setup task 890 PendingFuture<V,A> result = 891 new PendingFuture<V,A>(this, handler, attachment); 892 ByteBuffer[] bufs; 893 if (gatheringWrite) { 894 bufs = srcs; 895 } else { 896 bufs = new ByteBuffer[1]; 897 bufs[0] = src; 898 } 899 final WriteTask<V,A> writeTask = 900 new WriteTask<V,A>(bufs, gatheringWrite, result); 901 result.setContext(writeTask); 902 903 // schedule timeout 904 if (timeout > 0L) { 905 Future<?> timeoutTask = iocp.schedule(new Runnable() { 906 public void run() { 907 writeTask.timeout(); 908 } 909 }, timeout, unit); 910 result.setTimeoutTask(timeoutTask); 911 } 912 913 // initiate I/O (can only be done from thread in thread pool) 914 // initiate I/O 915 if (Iocp.supportsThreadAgnosticIo()) { 916 writeTask.run(); 917 } else { 918 Invoker.invokeOnThreadInThreadPool(this, writeTask); 919 } 920 return result; 921 } 922 923 // -- Native methods -- 924 925 private static native void initIDs(); 926 927 private static native int connect0(long socket, boolean preferIPv6, 928 InetAddress remote, int remotePort, long overlapped) throws IOException; 929 930 private static native void updateConnectContext(long socket) throws IOException; 931 932 private static native int read0(long socket, int count, long addres, long overlapped) 933 throws IOException; 934 935 private static native int write0(long socket, int count, long address, 936 long overlapped) throws IOException; 937 938 private static native void shutdown0(long socket, int how) throws IOException; 939 940 private static native void closesocket0(long socket) throws IOException; 941 942 static { 943 IOUtil.load(); 944 initIDs(); 945 } 946 }