1 /* 2 * Copyright (c) 2008, 2011, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package sun.nio.ch; 27 28 import java.nio.channels.*; 29 import java.nio.ByteBuffer; 30 import java.nio.BufferOverflowException; 31 import java.net.*; 32 import java.util.concurrent.*; 33 import java.io.IOException; 34 import sun.misc.Unsafe; 35 36 /** 37 * Windows implementation of AsynchronousSocketChannel using overlapped I/O. 38 */ 39 40 class WindowsAsynchronousSocketChannelImpl 41 extends AsynchronousSocketChannelImpl implements Iocp.OverlappedChannel 42 { 43 private static final Unsafe unsafe = Unsafe.getUnsafe(); 44 private static int addressSize = unsafe.addressSize(); 45 46 private static int dependsArch(int value32, int value64) { 47 return (addressSize == 4) ? value32 : value64; 48 } 49 50 /* 51 * typedef struct _WSABUF { 52 * u_long len; 53 * char FAR * buf; 54 * } WSABUF; 55 */ 56 private static final int SIZEOF_WSABUF = dependsArch(8, 16); 57 private static final int OFFSETOF_LEN = 0; 58 private static final int OFFSETOF_BUF = dependsArch(4, 8); 59 60 // maximum vector size for scatter/gather I/O 61 private static final int MAX_WSABUF = 16; 62 63 private static final int SIZEOF_WSABUFARRAY = MAX_WSABUF * SIZEOF_WSABUF; 64 65 66 // socket handle. Use begin()/end() around each usage of this handle. 67 final long handle; 68 69 // I/O completion port that the socket is associated with 70 private final Iocp iocp; 71 72 // completion key to identify channel when I/O completes 73 private final int completionKey; 74 75 // Pending I/O operations are tied to an OVERLAPPED structure that can only 76 // be released when the I/O completion event is posted to the completion 77 // port. Where I/O operations complete immediately then it is possible 78 // there may be more than two OVERLAPPED structures in use. 79 private final PendingIoCache ioCache; 80 81 // per-channel arrays of WSABUF structures 82 private final long readBufferArray; 83 private final long writeBufferArray; 84 85 86 WindowsAsynchronousSocketChannelImpl(Iocp iocp, boolean failIfGroupShutdown) 87 throws IOException 88 { 89 super(iocp); 90 91 // associate socket with default completion port 92 long h = IOUtil.fdVal(fd); 93 int key = 0; 94 try { 95 key = iocp.associate(this, h); 96 } catch (ShutdownChannelGroupException x) { 97 if (failIfGroupShutdown) { 98 closesocket0(h); 99 throw x; 100 } 101 } catch (IOException x) { 102 closesocket0(h); 103 throw x; 104 } 105 106 this.handle = h; 107 this.iocp = iocp; 108 this.completionKey = key; 109 this.ioCache = new PendingIoCache(); 110 111 // allocate WSABUF arrays 112 this.readBufferArray = unsafe.allocateMemory(SIZEOF_WSABUFARRAY); 113 this.writeBufferArray = unsafe.allocateMemory(SIZEOF_WSABUFARRAY); 114 } 115 116 WindowsAsynchronousSocketChannelImpl(Iocp iocp) throws IOException { 117 this(iocp, true); 118 } 119 120 @Override 121 public AsynchronousChannelGroupImpl group() { 122 return iocp; 123 } 124 125 /** 126 * Invoked by Iocp when an I/O operation competes. 127 */ 128 @Override 129 public <V,A> PendingFuture<V,A> getByOverlapped(long overlapped) { 130 return ioCache.remove(overlapped); 131 } 132 133 // invoked by WindowsAsynchronousServerSocketChannelImpl 134 long handle() { 135 return handle; 136 } 137 138 // invoked by WindowsAsynchronousServerSocketChannelImpl when new connection 139 // accept 140 void setConnected(SocketAddress localAddress, SocketAddress remoteAddress) { 141 synchronized (stateLock) { 142 state = ST_CONNECTED; 143 this.localAddress = localAddress; 144 this.remoteAddress = remoteAddress; 145 } 146 } 147 148 @Override 149 void implClose() throws IOException { 150 // close socket (may cause outstanding async I/O operations to fail). 151 closesocket0(handle); 152 153 // waits until all I/O operations have completed 154 ioCache.close(); 155 156 // release arrays of WSABUF structures 157 unsafe.freeMemory(readBufferArray); 158 unsafe.freeMemory(writeBufferArray); 159 160 // finally disassociate from the completion port (key can be 0 if 161 // channel created when group is shutdown) 162 if (completionKey != 0) 163 iocp.disassociate(completionKey); 164 } 165 166 @Override 167 public void onCancel(PendingFuture<?,?> task) { 168 if (task.getContext() instanceof ConnectTask) 169 killConnect(); 170 if (task.getContext() instanceof ReadTask) 171 killReading(); 172 if (task.getContext() instanceof WriteTask) 173 killWriting(); 174 } 175 176 /** 177 * Implements the task to initiate a connection and the handler to 178 * consume the result when the connection is established (or fails). 179 */ 180 private class ConnectTask<A> implements Runnable, Iocp.ResultHandler { 181 private final InetSocketAddress remote; 182 private final PendingFuture<Void,A> result; 183 184 ConnectTask(InetSocketAddress remote, PendingFuture<Void,A> result) { 185 this.remote = remote; 186 this.result = result; 187 } 188 189 private void closeChannel() { 190 try { 191 close(); 192 } catch (IOException ignore) { } 193 } 194 195 private IOException toIOException(Throwable x) { 196 if (x instanceof IOException) { 197 if (x instanceof ClosedChannelException) 198 x = new AsynchronousCloseException(); 199 return (IOException)x; 200 } 201 return new IOException(x); 202 } 203 204 /** 205 * Invoke after a connection is successfully established. 206 */ 207 private void afterConnect() throws IOException { 208 updateConnectContext(handle); 209 synchronized (stateLock) { 210 state = ST_CONNECTED; 211 remoteAddress = remote; 212 } 213 } 214 215 /** 216 * Task to initiate a connection. 217 */ 218 @Override 219 public void run() { 220 long overlapped = 0L; 221 Throwable exc = null; 222 try { 223 begin(); 224 225 // synchronize on result to allow this thread handle the case 226 // where the connection is established immediately. 227 synchronized (result) { 228 overlapped = ioCache.add(result); 229 // initiate the connection 230 int n = connect0(handle, Net.isIPv6Available(), remote.getAddress(), 231 remote.getPort(), overlapped); 232 if (n == IOStatus.UNAVAILABLE) { 233 // connection is pending 234 return; 235 } 236 237 // connection established immediately 238 afterConnect(); 239 result.setResult(null); 240 } 241 } catch (Throwable x) { 242 if (overlapped != 0L) 243 ioCache.remove(overlapped); 244 exc = x; 245 } finally { 246 end(); 247 } 248 249 if (exc != null) { 250 closeChannel(); 251 result.setFailure(toIOException(exc)); 252 } 253 Invoker.invoke(result); 254 } 255 256 /** 257 * Invoked by handler thread when connection established. 258 */ 259 @Override 260 public void completed(int bytesTransferred, boolean canInvokeDirect) { 261 Throwable exc = null; 262 try { 263 begin(); 264 afterConnect(); 265 result.setResult(null); 266 } catch (Throwable x) { 267 // channel is closed or unable to finish connect 268 exc = x; 269 } finally { 270 end(); 271 } 272 273 // can't close channel while in begin/end block 274 if (exc != null) { 275 closeChannel(); 276 result.setFailure(toIOException(exc)); 277 } 278 279 if (canInvokeDirect) { 280 Invoker.invokeUnchecked(result); 281 } else { 282 Invoker.invoke(result); 283 } 284 } 285 286 /** 287 * Invoked by handler thread when failed to establish connection. 288 */ 289 @Override 290 public void failed(int error, IOException x) { 291 if (isOpen()) { 292 closeChannel(); 293 result.setFailure(x); 294 } else { 295 result.setFailure(new AsynchronousCloseException()); 296 } 297 Invoker.invoke(result); 298 } 299 } 300 301 @Override 302 <A> Future<Void> implConnect(SocketAddress remote, 303 A attachment, 304 CompletionHandler<Void,? super A> handler) 305 { 306 if (!isOpen()) { 307 Throwable exc = new ClosedChannelException(); 308 if (handler == null) 309 return CompletedFuture.withFailure(exc); 310 Invoker.invoke(this, handler, attachment, null, exc); 311 return null; 312 } 313 314 InetSocketAddress isa = Net.checkAddress(remote); 315 316 // permission check 317 SecurityManager sm = System.getSecurityManager(); 318 if (sm != null) 319 sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort()); 320 321 // check and update state 322 // ConnectEx requires the socket to be bound to a local address 323 IOException bindException = null; 324 synchronized (stateLock) { 325 if (state == ST_CONNECTED) 326 throw new AlreadyConnectedException(); 327 if (state == ST_PENDING) 328 throw new ConnectionPendingException(); 329 if (localAddress == null) { 330 try { 331 bind(new InetSocketAddress(0)); 332 } catch (IOException x) { 333 bindException = x; 334 } 335 } 336 if (bindException == null) 337 state = ST_PENDING; 338 } 339 340 // handle bind failure 341 if (bindException != null) { 342 try { 343 close(); 344 } catch (IOException ignore) { } 345 if (handler == null) 346 return CompletedFuture.withFailure(bindException); 347 Invoker.invoke(this, handler, attachment, null, bindException); 348 return null; 349 } 350 351 // setup task 352 PendingFuture<Void,A> result = 353 new PendingFuture<Void,A>(this, handler, attachment); 354 ConnectTask task = new ConnectTask<A>(isa, result); 355 result.setContext(task); 356 357 // initiate I/O 358 if (Iocp.supportsThreadAgnosticIo()) { 359 task.run(); 360 } else { 361 Invoker.invokeOnThreadInThreadPool(this, task); 362 } 363 return result; 364 } 365 366 /** 367 * Implements the task to initiate a read and the handler to consume the 368 * result when the read completes. 369 */ 370 private class ReadTask<V,A> implements Runnable, Iocp.ResultHandler { 371 private final ByteBuffer[] bufs; 372 private final int numBufs; 373 private final boolean scatteringRead; 374 private final PendingFuture<V,A> result; 375 376 // set by run method 377 private ByteBuffer[] shadow; 378 379 ReadTask(ByteBuffer[] bufs, 380 boolean scatteringRead, 381 PendingFuture<V,A> result) 382 { 383 this.bufs = bufs; 384 this.numBufs = (bufs.length > MAX_WSABUF) ? MAX_WSABUF : bufs.length; 385 this.scatteringRead = scatteringRead; 386 this.result = result; 387 } 388 389 /** 390 * Invoked prior to read to prepare the WSABUF array. Where necessary, 391 * it substitutes non-direct buffers with direct buffers. 392 */ 393 void prepareBuffers() { 394 shadow = new ByteBuffer[numBufs]; 395 long address = readBufferArray; 396 for (int i=0; i<numBufs; i++) { 397 ByteBuffer dst = bufs[i]; 398 int pos = dst.position(); 399 int lim = dst.limit(); 400 assert (pos <= lim); 401 int rem = (pos <= lim ? lim - pos : 0); 402 long a; 403 if (!(dst instanceof DirectBuffer)) { 404 // substitute with direct buffer 405 ByteBuffer bb = Util.getTemporaryDirectBuffer(rem); 406 shadow[i] = bb; 407 a = ((DirectBuffer)bb).address(); 408 } else { 409 shadow[i] = dst; 410 a = ((DirectBuffer)dst).address() + pos; 411 } 412 unsafe.putAddress(address + OFFSETOF_BUF, a); 413 unsafe.putInt(address + OFFSETOF_LEN, rem); 414 address += SIZEOF_WSABUF; 415 } 416 } 417 418 /** 419 * Invoked after a read has completed to update the buffer positions 420 * and release any substituted buffers. 421 */ 422 void updateBuffers(int bytesRead) { 423 for (int i=0; i<numBufs; i++) { 424 ByteBuffer nextBuffer = shadow[i]; 425 int pos = nextBuffer.position(); 426 int len = nextBuffer.remaining(); 427 if (bytesRead >= len) { 428 bytesRead -= len; 429 int newPosition = pos + len; 430 try { 431 nextBuffer.position(newPosition); 432 } catch (IllegalArgumentException x) { 433 // position changed by another 434 } 435 } else { // Buffers not completely filled 436 if (bytesRead > 0) { 437 assert(pos + bytesRead < (long)Integer.MAX_VALUE); 438 int newPosition = pos + bytesRead; 439 try { 440 nextBuffer.position(newPosition); 441 } catch (IllegalArgumentException x) { 442 // position changed by another 443 } 444 } 445 break; 446 } 447 } 448 449 // Put results from shadow into the slow buffers 450 for (int i=0; i<numBufs; i++) { 451 if (!(bufs[i] instanceof DirectBuffer)) { 452 shadow[i].flip(); 453 try { 454 bufs[i].put(shadow[i]); 455 } catch (BufferOverflowException x) { 456 // position changed by another 457 } 458 } 459 } 460 } 461 462 void releaseBuffers() { 463 for (int i=0; i<numBufs; i++) { 464 if (!(bufs[i] instanceof DirectBuffer)) { 465 Util.releaseTemporaryDirectBuffer(shadow[i]); 466 } 467 } 468 } 469 470 @Override 471 @SuppressWarnings("unchecked") 472 public void run() { 473 long overlapped = 0L; 474 boolean prepared = false; 475 boolean pending = false; 476 477 try { 478 begin(); 479 480 // substitute non-direct buffers 481 prepareBuffers(); 482 prepared = true; 483 484 // get an OVERLAPPED structure (from the cache or allocate) 485 overlapped = ioCache.add(result); 486 487 // initiate read 488 int n = read0(handle, numBufs, readBufferArray, overlapped); 489 if (n == IOStatus.UNAVAILABLE) { 490 // I/O is pending 491 pending = true; 492 return; 493 } 494 if (n == IOStatus.EOF) { 495 // input shutdown 496 enableReading(); 497 if (scatteringRead) { 498 result.setResult((V)Long.valueOf(-1L)); 499 } else { 500 result.setResult((V)Integer.valueOf(-1)); 501 } 502 } else { 503 throw new InternalError("Read completed immediately"); 504 } 505 } catch (Throwable x) { 506 // failed to initiate read 507 // reset read flag before releasing waiters 508 enableReading(); 509 if (x instanceof ClosedChannelException) 510 x = new AsynchronousCloseException(); 511 if (!(x instanceof IOException)) 512 x = new IOException(x); 513 result.setFailure(x); 514 } finally { 515 // release resources if I/O not pending 516 if (!pending) { 517 if (overlapped != 0L) 518 ioCache.remove(overlapped); 519 if (prepared) 520 releaseBuffers(); 521 } 522 end(); 523 } 524 525 // invoke completion handler 526 Invoker.invoke(result); 527 } 528 529 /** 530 * Executed when the I/O has completed 531 */ 532 @Override 533 @SuppressWarnings("unchecked") 534 public void completed(int bytesTransferred, boolean canInvokeDirect) { 535 if (bytesTransferred == 0) { 536 bytesTransferred = -1; // EOF 537 } else { 538 updateBuffers(bytesTransferred); 539 } 540 541 // return direct buffer to cache if substituted 542 releaseBuffers(); 543 544 // release waiters if not already released by timeout 545 synchronized (result) { 546 if (result.isDone()) 547 return; 548 enableReading(); 549 if (scatteringRead) { 550 result.setResult((V)Long.valueOf(bytesTransferred)); 551 } else { 552 result.setResult((V)Integer.valueOf(bytesTransferred)); 553 } 554 } 555 if (canInvokeDirect) { 556 Invoker.invokeUnchecked(result); 557 } else { 558 Invoker.invoke(result); 559 } 560 } 561 562 @Override 563 public void failed(int error, IOException x) { 564 // return direct buffer to cache if substituted 565 releaseBuffers(); 566 567 // release waiters if not already released by timeout 568 if (!isOpen()) 569 x = new AsynchronousCloseException(); 570 571 synchronized (result) { 572 if (result.isDone()) 573 return; 574 enableReading(); 575 result.setFailure(x); 576 } 577 Invoker.invoke(result); 578 } 579 580 /** 581 * Invoked if timeout expires before it is cancelled 582 */ 583 void timeout() { 584 // synchronize on result as the I/O could complete/fail 585 synchronized (result) { 586 if (result.isDone()) 587 return; 588 589 // kill further reading before releasing waiters 590 enableReading(true); 591 result.setFailure(new InterruptedByTimeoutException()); 592 } 593 594 // invoke handler without any locks 595 Invoker.invoke(result); 596 } 597 } 598 599 @Override 600 <V extends Number,A> Future<V> implRead(boolean isScatteringRead, 601 ByteBuffer dst, 602 ByteBuffer[] dsts, 603 long timeout, 604 TimeUnit unit, 605 A attachment, 606 CompletionHandler<V,? super A> handler) 607 { 608 // setup task 609 PendingFuture<V,A> result = 610 new PendingFuture<V,A>(this, handler, attachment); 611 ByteBuffer[] bufs; 612 if (isScatteringRead) { 613 bufs = dsts; 614 } else { 615 bufs = new ByteBuffer[1]; 616 bufs[0] = dst; 617 } 618 final ReadTask readTask = new ReadTask<V,A>(bufs, isScatteringRead, result); 619 result.setContext(readTask); 620 621 // schedule timeout 622 if (timeout > 0L) { 623 Future<?> timeoutTask = iocp.schedule(new Runnable() { 624 public void run() { 625 readTask.timeout(); 626 } 627 }, timeout, unit); 628 result.setTimeoutTask(timeoutTask); 629 } 630 631 // initiate I/O 632 if (Iocp.supportsThreadAgnosticIo()) { 633 readTask.run(); 634 } else { 635 Invoker.invokeOnThreadInThreadPool(this, readTask); 636 } 637 return result; 638 } 639 640 /** 641 * Implements the task to initiate a write and the handler to consume the 642 * result when the write completes. 643 */ 644 private class WriteTask<V,A> implements Runnable, Iocp.ResultHandler { 645 private final ByteBuffer[] bufs; 646 private final int numBufs; 647 private final boolean gatheringWrite; 648 private final PendingFuture<V,A> result; 649 650 // set by run method 651 private ByteBuffer[] shadow; 652 653 WriteTask(ByteBuffer[] bufs, 654 boolean gatheringWrite, 655 PendingFuture<V,A> result) 656 { 657 this.bufs = bufs; 658 this.numBufs = (bufs.length > MAX_WSABUF) ? MAX_WSABUF : bufs.length; 659 this.gatheringWrite = gatheringWrite; 660 this.result = result; 661 } 662 663 /** 664 * Invoked prior to write to prepare the WSABUF array. Where necessary, 665 * it substitutes non-direct buffers with direct buffers. 666 */ 667 void prepareBuffers() { 668 shadow = new ByteBuffer[numBufs]; 669 long address = writeBufferArray; 670 for (int i=0; i<numBufs; i++) { 671 ByteBuffer src = bufs[i]; 672 int pos = src.position(); 673 int lim = src.limit(); 674 assert (pos <= lim); 675 int rem = (pos <= lim ? lim - pos : 0); 676 long a; 677 if (!(src instanceof DirectBuffer)) { 678 // substitute with direct buffer 679 ByteBuffer bb = Util.getTemporaryDirectBuffer(rem); 680 bb.put(src); 681 bb.flip(); 682 src.position(pos); // leave heap buffer untouched for now 683 shadow[i] = bb; 684 a = ((DirectBuffer)bb).address(); 685 } else { 686 shadow[i] = src; 687 a = ((DirectBuffer)src).address() + pos; 688 } 689 unsafe.putAddress(address + OFFSETOF_BUF, a); 690 unsafe.putInt(address + OFFSETOF_LEN, rem); 691 address += SIZEOF_WSABUF; 692 } 693 } 694 695 /** 696 * Invoked after a write has completed to update the buffer positions 697 * and release any substituted buffers. 698 */ 699 void updateBuffers(int bytesWritten) { 700 // Notify the buffers how many bytes were taken 701 for (int i=0; i<numBufs; i++) { 702 ByteBuffer nextBuffer = bufs[i]; 703 int pos = nextBuffer.position(); 704 int lim = nextBuffer.limit(); 705 int len = (pos <= lim ? lim - pos : lim); 706 if (bytesWritten >= len) { 707 bytesWritten -= len; 708 int newPosition = pos + len; 709 try { 710 nextBuffer.position(newPosition); 711 } catch (IllegalArgumentException x) { 712 // position changed by someone else 713 } 714 } else { // Buffers not completely filled 715 if (bytesWritten > 0) { 716 assert(pos + bytesWritten < (long)Integer.MAX_VALUE); 717 int newPosition = pos + bytesWritten; 718 try { 719 nextBuffer.position(newPosition); 720 } catch (IllegalArgumentException x) { 721 // position changed by someone else 722 } 723 } 724 break; 725 } 726 } 727 } 728 729 void releaseBuffers() { 730 for (int i=0; i<numBufs; i++) { 731 if (!(bufs[i] instanceof DirectBuffer)) { 732 Util.releaseTemporaryDirectBuffer(shadow[i]); 733 } 734 } 735 } 736 737 @Override 738 //@SuppressWarnings("unchecked") 739 public void run() { 740 long overlapped = 0L; 741 boolean prepared = false; 742 boolean pending = false; 743 boolean shutdown = false; 744 745 try { 746 begin(); 747 748 // substitute non-direct buffers 749 prepareBuffers(); 750 prepared = true; 751 752 // get an OVERLAPPED structure (from the cache or allocate) 753 overlapped = ioCache.add(result); 754 int n = write0(handle, numBufs, writeBufferArray, overlapped); 755 if (n == IOStatus.UNAVAILABLE) { 756 // I/O is pending 757 pending = true; 758 return; 759 } 760 if (n == IOStatus.EOF) { 761 // special case for shutdown output 762 shutdown = true; 763 throw new ClosedChannelException(); 764 } 765 // write completed immediately 766 throw new InternalError("Write completed immediately"); 767 } catch (Throwable x) { 768 // write failed. Enable writing before releasing waiters. 769 enableWriting(); 770 if (!shutdown && (x instanceof ClosedChannelException)) 771 x = new AsynchronousCloseException(); 772 if (!(x instanceof IOException)) 773 x = new IOException(x); 774 result.setFailure(x); 775 } finally { 776 // release resources if I/O not pending 777 if (!pending) { 778 if (overlapped != 0L) 779 ioCache.remove(overlapped); 780 if (prepared) 781 releaseBuffers(); 782 } 783 end(); 784 } 785 786 // invoke completion handler 787 Invoker.invoke(result); 788 } 789 790 /** 791 * Executed when the I/O has completed 792 */ 793 @Override 794 @SuppressWarnings("unchecked") 795 public void completed(int bytesTransferred, boolean canInvokeDirect) { 796 updateBuffers(bytesTransferred); 797 798 // return direct buffer to cache if substituted 799 releaseBuffers(); 800 801 // release waiters if not already released by timeout 802 synchronized (result) { 803 if (result.isDone()) 804 return; 805 enableWriting(); 806 if (gatheringWrite) { 807 result.setResult((V)Long.valueOf(bytesTransferred)); 808 } else { 809 result.setResult((V)Integer.valueOf(bytesTransferred)); 810 } 811 } 812 if (canInvokeDirect) { 813 Invoker.invokeUnchecked(result); 814 } else { 815 Invoker.invoke(result); 816 } 817 } 818 819 @Override 820 public void failed(int error, IOException x) { 821 // return direct buffer to cache if substituted 822 releaseBuffers(); 823 824 // release waiters if not already released by timeout 825 if (!isOpen()) 826 x = new AsynchronousCloseException(); 827 828 synchronized (result) { 829 if (result.isDone()) 830 return; 831 enableWriting(); 832 result.setFailure(x); 833 } 834 Invoker.invoke(result); 835 } 836 837 /** 838 * Invoked if timeout expires before it is cancelled 839 */ 840 void timeout() { 841 // synchronize on result as the I/O could complete/fail 842 synchronized (result) { 843 if (result.isDone()) 844 return; 845 846 // kill further writing before releasing waiters 847 enableWriting(true); 848 result.setFailure(new InterruptedByTimeoutException()); 849 } 850 851 // invoke handler without any locks 852 Invoker.invoke(result); 853 } 854 } 855 856 @Override 857 <V extends Number,A> Future<V> implWrite(boolean gatheringWrite, 858 ByteBuffer src, 859 ByteBuffer[] srcs, 860 long timeout, 861 TimeUnit unit, 862 A attachment, 863 CompletionHandler<V,? super A> handler) 864 { 865 // setup task 866 PendingFuture<V,A> result = 867 new PendingFuture<V,A>(this, handler, attachment); 868 ByteBuffer[] bufs; 869 if (gatheringWrite) { 870 bufs = srcs; 871 } else { 872 bufs = new ByteBuffer[1]; 873 bufs[0] = src; 874 } 875 final WriteTask writeTask = new WriteTask<V,A>(bufs, gatheringWrite, result); 876 result.setContext(writeTask); 877 878 // schedule timeout 879 if (timeout > 0L) { 880 Future<?> timeoutTask = iocp.schedule(new Runnable() { 881 public void run() { 882 writeTask.timeout(); 883 } 884 }, timeout, unit); 885 result.setTimeoutTask(timeoutTask); 886 } 887 888 // initiate I/O (can only be done from thread in thread pool) 889 // initiate I/O 890 if (Iocp.supportsThreadAgnosticIo()) { 891 writeTask.run(); 892 } else { 893 Invoker.invokeOnThreadInThreadPool(this, writeTask); 894 } 895 return result; 896 } 897 898 // -- Native methods -- 899 900 private static native void initIDs(); 901 902 private static native int connect0(long socket, boolean preferIPv6, 903 InetAddress remote, int remotePort, long overlapped) throws IOException; 904 905 private static native void updateConnectContext(long socket) throws IOException; 906 907 private static native int read0(long socket, int count, long addres, long overlapped) 908 throws IOException; 909 910 private static native int write0(long socket, int count, long address, 911 long overlapped) throws IOException; 912 913 private static native void shutdown0(long socket, int how) throws IOException; 914 915 private static native void closesocket0(long socket) throws IOException; 916 917 static { 918 Util.load(); 919 initIDs(); 920 } 921 }