1 /* 2 * Copyright 2008-2009 Sun Microsystems, Inc. All Rights Reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Sun designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Sun in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, 22 * CA 95054 USA or visit www.sun.com if you need additional information or 23 * have any questions. 24 */ 25 26 package sun.nio.ch; 27 28 import java.nio.channels.*; 29 import java.nio.ByteBuffer; 30 import java.nio.BufferOverflowException; 31 import java.net.*; 32 import java.util.concurrent.*; 33 import java.io.IOException; 34 import sun.misc.Unsafe; 35 36 /** 37 * Windows implementation of AsynchronousSocketChannel using overlapped I/O. 38 */ 39 40 class WindowsAsynchronousSocketChannelImpl 41 extends AsynchronousSocketChannelImpl implements Iocp.OverlappedChannel 42 { 43 private static final Unsafe unsafe = Unsafe.getUnsafe(); 44 private static int addressSize = unsafe.addressSize(); 45 46 private static int dependsArch(int value32, int value64) { 47 return (addressSize == 4) ? value32 : value64; 48 } 49 50 /* 51 * typedef struct _WSABUF { 52 * u_long len; 53 * char FAR * buf; 54 * } WSABUF; 55 */ 56 private static final int SIZEOF_WSABUF = dependsArch(8, 16); 57 private static final int OFFSETOF_LEN = 0; 58 private static final int OFFSETOF_BUF = dependsArch(4, 8); 59 60 // maximum vector size for scatter/gather I/O 61 private static final int MAX_WSABUF = 16; 62 63 private static final int SIZEOF_WSABUFARRAY = MAX_WSABUF * SIZEOF_WSABUF; 64 65 66 // socket handle. Use begin()/end() around each usage of this handle. 67 final long handle; 68 69 // I/O completion port that the socket is associated with 70 private final Iocp iocp; 71 72 // completion key to identify channel when I/O completes 73 private final int completionKey; 74 75 // Pending I/O operations are tied to an OVERLAPPED structure that can only 76 // be released when the I/O completion event is posted to the completion 77 // port. Where I/O operations complete immediately then it is possible 78 // there may be more than two OVERLAPPED structures in use. 79 private final PendingIoCache ioCache; 80 81 // per-channel arrays of WSABUF structures 82 private final long readBufferArray; 83 private final long writeBufferArray; 84 85 86 WindowsAsynchronousSocketChannelImpl(Iocp iocp, boolean failIfGroupShutdown) 87 throws IOException 88 { 89 super(iocp); 90 91 // associate socket with default completion port 92 long h = IOUtil.fdVal(fd); 93 int key = 0; 94 try { 95 key = iocp.associate(this, h); 96 } catch (ShutdownChannelGroupException x) { 97 if (failIfGroupShutdown) { 98 closesocket0(h); 99 throw x; 100 } 101 } catch (IOException x) { 102 closesocket0(h); 103 throw x; 104 } 105 106 this.handle = h; 107 this.iocp = iocp; 108 this.completionKey = key; 109 this.ioCache = new PendingIoCache(); 110 111 // allocate WSABUF arrays 112 this.readBufferArray = unsafe.allocateMemory(SIZEOF_WSABUFARRAY); 113 this.writeBufferArray = unsafe.allocateMemory(SIZEOF_WSABUFARRAY); 114 } 115 116 WindowsAsynchronousSocketChannelImpl(Iocp iocp) throws IOException { 117 this(iocp, true); 118 } 119 120 @Override 121 public AsynchronousChannelGroupImpl group() { 122 return iocp; 123 } 124 125 /** 126 * Invoked by Iocp when an I/O operation competes. 127 */ 128 @Override 129 public <V,A> PendingFuture<V,A> getByOverlapped(long overlapped) { 130 return ioCache.remove(overlapped); 131 } 132 133 // invoked by WindowsAsynchronousServerSocketChannelImpl 134 long handle() { 135 return handle; 136 } 137 138 // invoked by WindowsAsynchronousServerSocketChannelImpl when new connection 139 // accept 140 void setConnected(SocketAddress localAddress, SocketAddress remoteAddress) { 141 synchronized (stateLock) { 142 state = ST_CONNECTED; 143 this.localAddress = localAddress; 144 this.remoteAddress = remoteAddress; 145 } 146 } 147 148 @Override 149 void implClose() throws IOException { 150 // close socket (may cause outstanding async I/O operations to fail). 151 closesocket0(handle); 152 153 // waits until all I/O operations have completed 154 ioCache.close(); 155 156 // release arrays of WSABUF structures 157 unsafe.freeMemory(readBufferArray); 158 unsafe.freeMemory(writeBufferArray); 159 160 // finally disassociate from the completion port (key can be 0 if 161 // channel created when group is shutdown) 162 if (completionKey != 0) 163 iocp.disassociate(completionKey); 164 } 165 166 @Override 167 public void onCancel(PendingFuture<?,?> task) { 168 if (task.getContext() instanceof ConnectTask) 169 killConnect(); 170 if (task.getContext() instanceof ReadTask) 171 killReading(); 172 if (task.getContext() instanceof WriteTask) 173 killWriting(); 174 } 175 176 /** 177 * Implements the task to initiate a connection and the handler to 178 * consume the result when the connection is established (or fails). 179 */ 180 private class ConnectTask<A> implements Runnable, Iocp.ResultHandler { 181 private final InetSocketAddress remote; 182 private final PendingFuture<Void,A> result; 183 184 ConnectTask(InetSocketAddress remote, PendingFuture<Void,A> result) { 185 this.remote = remote; 186 this.result = result; 187 } 188 189 private void closeChannel() { 190 try { 191 close(); 192 } catch (IOException ignore) { } 193 } 194 195 private IOException toIOException(Throwable x) { 196 if (x instanceof IOException) { 197 if (x instanceof ClosedChannelException) 198 x = new AsynchronousCloseException(); 199 return (IOException)x; 200 } 201 return new IOException(x); 202 } 203 204 /** 205 * Invoke after a connection is successfully established. 206 */ 207 private void afterConnect() throws IOException { 208 updateConnectContext(handle); 209 synchronized (stateLock) { 210 state = ST_CONNECTED; 211 remoteAddress = remote; 212 } 213 } 214 215 /** 216 * Task to initiate a connection. 217 */ 218 @Override 219 public void run() { 220 long overlapped = 0L; 221 Throwable exc = null; 222 try { 223 begin(); 224 225 // synchronize on result to allow this thread handle the case 226 // where the connection is established immediately. 227 synchronized (result) { 228 overlapped = ioCache.add(result); 229 // initiate the connection 230 int n = connect0(handle, Net.isIPv6Available(), remote.getAddress(), 231 remote.getPort(), overlapped); 232 if (n == IOStatus.UNAVAILABLE) { 233 // connection is pending 234 return; 235 } 236 237 // connection established immediately 238 afterConnect(); 239 result.setResult(null); 240 } 241 } catch (Throwable x) { 242 exc = x; 243 } finally { 244 end(); 245 } 246 247 if (exc != null) { 248 if (overlapped != 0L) 249 ioCache.remove(overlapped); 250 closeChannel(); 251 result.setFailure(toIOException(exc)); 252 } 253 Invoker.invoke(result.handler(), result); 254 } 255 256 /** 257 * Invoked by handler thread when connection established. 258 */ 259 @Override 260 public void completed(int bytesTransferred) { 261 Throwable exc = null; 262 try { 263 begin(); 264 afterConnect(); 265 result.setResult(null); 266 } catch (Throwable x) { 267 // channel is closed or unable to finish connect 268 exc = x; 269 } finally { 270 end(); 271 } 272 273 // can't close channel while in begin/end block 274 if (exc != null) { 275 closeChannel(); 276 result.setFailure(toIOException(exc)); 277 } 278 279 Invoker.invoke(result.handler(), result); 280 } 281 282 /** 283 * Invoked by handler thread when failed to establish connection. 284 */ 285 @Override 286 public void failed(int error, IOException x) { 287 if (isOpen()) { 288 closeChannel(); 289 result.setFailure(x); 290 } else { 291 result.setFailure(new AsynchronousCloseException()); 292 } 293 Invoker.invoke(result.handler(), result); 294 } 295 } 296 297 @Override 298 public <A> Future<Void> connect(SocketAddress remote, 299 A attachment, 300 CompletionHandler<Void,? super A> handler) 301 { 302 if (!isOpen()) { 303 CompletedFuture<Void,A> result = CompletedFuture 304 .withFailure(this, new ClosedChannelException(), attachment); 305 Invoker.invoke(handler, result); 306 return result; 307 } 308 309 InetSocketAddress isa = Net.checkAddress(remote); 310 311 // permission check 312 SecurityManager sm = System.getSecurityManager(); 313 if (sm != null) 314 sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort()); 315 316 // check and update state 317 // ConnectEx requires the socket to be bound to a local address 318 IOException bindException = null; 319 synchronized (stateLock) { 320 if (state == ST_CONNECTED) 321 throw new AlreadyConnectedException(); 322 if (state == ST_PENDING) 323 throw new ConnectionPendingException(); 324 if (localAddress == null) { 325 try { 326 bind(new InetSocketAddress(0)); 327 } catch (IOException x) { 328 bindException = x; 329 } 330 } 331 if (bindException == null) 332 state = ST_PENDING; 333 } 334 335 // handle bind failure 336 if (bindException != null) { 337 try { 338 close(); 339 } catch (IOException ignore) { } 340 CompletedFuture<Void,A> result = CompletedFuture 341 .withFailure(this, bindException, attachment); 342 Invoker.invoke(handler, result); 343 return result; 344 } 345 346 // setup task 347 PendingFuture<Void,A> result = 348 new PendingFuture<Void,A>(this, handler, attachment); 349 ConnectTask task = new ConnectTask<A>(isa, result); 350 result.setContext(task); 351 352 // initiate I/O (can only be done from thread in thread pool) 353 Invoker.invokeOnThreadInThreadPool(this, task); 354 return result; 355 } 356 357 /** 358 * Implements the task to initiate a read and the handler to consume the 359 * result when the read completes. 360 */ 361 private class ReadTask<V,A> implements Runnable, Iocp.ResultHandler { 362 private final ByteBuffer[] bufs; 363 private final int numBufs; 364 private final boolean scatteringRead; 365 private final PendingFuture<V,A> result; 366 367 // set by run method 368 private ByteBuffer[] shadow; 369 370 ReadTask(ByteBuffer[] bufs, 371 boolean scatteringRead, 372 PendingFuture<V,A> result) 373 { 374 this.bufs = bufs; 375 this.numBufs = (bufs.length > MAX_WSABUF) ? MAX_WSABUF : bufs.length; 376 this.scatteringRead = scatteringRead; 377 this.result = result; 378 } 379 380 /** 381 * Invoked prior to read to prepare the WSABUF array. Where necessary, 382 * it substitutes non-direct buffers with direct buffers. 383 */ 384 void prepareBuffers() { 385 shadow = new ByteBuffer[numBufs]; 386 long address = readBufferArray; 387 for (int i=0; i<numBufs; i++) { 388 ByteBuffer dst = bufs[i]; 389 int pos = dst.position(); 390 int lim = dst.limit(); 391 assert (pos <= lim); 392 int rem = (pos <= lim ? lim - pos : 0); 393 long a; 394 if (!(dst instanceof DirectBuffer)) { 395 // substitute with direct buffer 396 ByteBuffer bb = Util.getTemporaryDirectBuffer(rem); 397 shadow[i] = bb; 398 a = ((DirectBuffer)bb).address(); 399 } else { 400 shadow[i] = dst; 401 a = ((DirectBuffer)dst).address() + pos; 402 } 403 unsafe.putAddress(address + OFFSETOF_BUF, a); 404 unsafe.putInt(address + OFFSETOF_LEN, rem); 405 address += SIZEOF_WSABUF; 406 } 407 } 408 409 /** 410 * Invoked after a read has completed to update the buffer positions 411 * and release any substituted buffers. 412 */ 413 void updateBuffers(int bytesRead) { 414 for (int i=0; i<numBufs; i++) { 415 ByteBuffer nextBuffer = shadow[i]; 416 int pos = nextBuffer.position(); 417 int len = nextBuffer.remaining(); 418 if (bytesRead >= len) { 419 bytesRead -= len; 420 int newPosition = pos + len; 421 try { 422 nextBuffer.position(newPosition); 423 } catch (IllegalArgumentException x) { 424 // position changed by another 425 } 426 } else { // Buffers not completely filled 427 if (bytesRead > 0) { 428 assert(pos + bytesRead < (long)Integer.MAX_VALUE); 429 int newPosition = pos + bytesRead; 430 try { 431 nextBuffer.position(newPosition); 432 } catch (IllegalArgumentException x) { 433 // position changed by another 434 } 435 } 436 break; 437 } 438 } 439 440 // Put results from shadow into the slow buffers 441 for (int i=0; i<numBufs; i++) { 442 if (!(bufs[i] instanceof DirectBuffer)) { 443 shadow[i].flip(); 444 try { 445 bufs[i].put(shadow[i]); 446 } catch (BufferOverflowException x) { 447 // position changed by another 448 } 449 } 450 } 451 } 452 453 void releaseBuffers() { 454 for (int i=0; i<numBufs; i++) { 455 if (!(bufs[i] instanceof DirectBuffer)) { 456 Util.releaseTemporaryDirectBuffer(shadow[i]); 457 } 458 } 459 } 460 461 @Override 462 @SuppressWarnings("unchecked") 463 public void run() { 464 long overlapped = 0L; 465 boolean prepared = false; 466 boolean pending = false; 467 468 try { 469 begin(); 470 471 // substitute non-direct buffers 472 prepareBuffers(); 473 prepared = true; 474 475 // get an OVERLAPPED structure (from the cache or allocate) 476 overlapped = ioCache.add(result); 477 478 // synchronize on result to allow this thread handle the case 479 // where the read completes immediately. 480 synchronized (result) { 481 int n = read0(handle, numBufs, readBufferArray, overlapped); 482 if (n == IOStatus.UNAVAILABLE) { 483 // I/O is pending 484 pending = true; 485 return; 486 } 487 // read completed immediately: 488 // 1. update buffer position 489 // 2. reset read flag 490 // 3. release waiters 491 if (n == 0) { 492 n = -1; 493 } else { 494 updateBuffers(n); 495 } 496 enableReading(); 497 498 if (scatteringRead) { 499 result.setResult((V)Long.valueOf(n)); 500 } else { 501 result.setResult((V)Integer.valueOf(n)); 502 } 503 } 504 } catch (Throwable x) { 505 // failed to initiate read: 506 // 1. reset read flag 507 // 2. free resources 508 // 3. release waiters 509 enableReading(); 510 if (overlapped != 0L) 511 ioCache.remove(overlapped); 512 if (x instanceof ClosedChannelException) 513 x = new AsynchronousCloseException(); 514 if (!(x instanceof IOException)) 515 x = new IOException(x); 516 result.setFailure(x); 517 } finally { 518 if (prepared && !pending) { 519 // return direct buffer(s) to cache if substituted 520 releaseBuffers(); 521 } 522 end(); 523 } 524 525 // invoke completion handler 526 Invoker.invoke(result.handler(), result); 527 } 528 529 /** 530 * Executed when the I/O has completed 531 */ 532 @Override 533 @SuppressWarnings("unchecked") 534 public void completed(int bytesTransferred) { 535 if (bytesTransferred == 0) { 536 bytesTransferred = -1; // EOF 537 } else { 538 updateBuffers(bytesTransferred); 539 } 540 541 // return direct buffer to cache if substituted 542 releaseBuffers(); 543 544 // release waiters if not already released by timeout 545 synchronized (result) { 546 if (result.isDone()) 547 return; 548 enableReading(); 549 if (scatteringRead) { 550 result.setResult((V)Long.valueOf(bytesTransferred)); 551 } else { 552 result.setResult((V)Integer.valueOf(bytesTransferred)); 553 } 554 } 555 Invoker.invoke(result.handler(), result); 556 } 557 558 @Override 559 public void failed(int error, IOException x) { 560 // return direct buffer to cache if substituted 561 releaseBuffers(); 562 563 // release waiters if not already released by timeout 564 if (!isOpen()) 565 x = new AsynchronousCloseException(); 566 567 synchronized (result) { 568 if (result.isDone()) 569 return; 570 enableReading(); 571 result.setFailure(x); 572 } 573 Invoker.invoke(result.handler(), result); 574 } 575 576 /** 577 * Invoked if timeout expires before it is cancelled 578 */ 579 void timeout() { 580 // synchronize on result as the I/O could complete/fail 581 synchronized (result) { 582 if (result.isDone()) 583 return; 584 585 // kill further reading before releasing waiters 586 enableReading(true); 587 result.setFailure(new InterruptedByTimeoutException()); 588 } 589 590 // invoke handler without any locks 591 Invoker.invoke(result.handler(), result); 592 } 593 } 594 595 @Override 596 <V extends Number,A> Future<V> readImpl(ByteBuffer[] bufs, 597 boolean scatteringRead, 598 long timeout, 599 TimeUnit unit, 600 A attachment, 601 CompletionHandler<V,? super A> handler) 602 { 603 // setup task 604 PendingFuture<V,A> result = 605 new PendingFuture<V,A>(this, handler, attachment); 606 final ReadTask readTask = new ReadTask<V,A>(bufs, scatteringRead, result); 607 result.setContext(readTask); 608 609 // schedule timeout 610 if (timeout > 0L) { 611 Future<?> timeoutTask = iocp.schedule(new Runnable() { 612 public void run() { 613 readTask.timeout(); 614 } 615 }, timeout, unit); 616 result.setTimeoutTask(timeoutTask); 617 } 618 619 // initiate I/O (can only be done from thread in thread pool) 620 Invoker.invokeOnThreadInThreadPool(this, readTask); 621 return result; 622 } 623 624 /** 625 * Implements the task to initiate a write and the handler to consume the 626 * result when the write completes. 627 */ 628 private class WriteTask<V,A> implements Runnable, Iocp.ResultHandler { 629 private final ByteBuffer[] bufs; 630 private final int numBufs; 631 private final boolean gatheringWrite; 632 private final PendingFuture<V,A> result; 633 634 // set by run method 635 private ByteBuffer[] shadow; 636 637 WriteTask(ByteBuffer[] bufs, 638 boolean gatheringWrite, 639 PendingFuture<V,A> result) 640 { 641 this.bufs = bufs; 642 this.numBufs = (bufs.length > MAX_WSABUF) ? MAX_WSABUF : bufs.length; 643 this.gatheringWrite = gatheringWrite; 644 this.result = result; 645 } 646 647 /** 648 * Invoked prior to write to prepare the WSABUF array. Where necessary, 649 * it substitutes non-direct buffers with direct buffers. 650 */ 651 void prepareBuffers() { 652 shadow = new ByteBuffer[numBufs]; 653 long address = writeBufferArray; 654 for (int i=0; i<numBufs; i++) { 655 ByteBuffer src = bufs[i]; 656 int pos = src.position(); 657 int lim = src.limit(); 658 assert (pos <= lim); 659 int rem = (pos <= lim ? lim - pos : 0); 660 long a; 661 if (!(src instanceof DirectBuffer)) { 662 // substitute with direct buffer 663 ByteBuffer bb = Util.getTemporaryDirectBuffer(rem); 664 bb.put(src); 665 bb.flip(); 666 src.position(pos); // leave heap buffer untouched for now 667 shadow[i] = bb; 668 a = ((DirectBuffer)bb).address(); 669 } else { 670 shadow[i] = src; 671 a = ((DirectBuffer)src).address() + pos; 672 } 673 unsafe.putAddress(address + OFFSETOF_BUF, a); 674 unsafe.putInt(address + OFFSETOF_LEN, rem); 675 address += SIZEOF_WSABUF; 676 } 677 } 678 679 /** 680 * Invoked after a write has completed to update the buffer positions 681 * and release any substituted buffers. 682 */ 683 void updateBuffers(int bytesWritten) { 684 // Notify the buffers how many bytes were taken 685 for (int i=0; i<numBufs; i++) { 686 ByteBuffer nextBuffer = bufs[i]; 687 int pos = nextBuffer.position(); 688 int lim = nextBuffer.limit(); 689 int len = (pos <= lim ? lim - pos : lim); 690 if (bytesWritten >= len) { 691 bytesWritten -= len; 692 int newPosition = pos + len; 693 try { 694 nextBuffer.position(newPosition); 695 } catch (IllegalArgumentException x) { 696 // position changed by someone else 697 } 698 } else { // Buffers not completely filled 699 if (bytesWritten > 0) { 700 assert(pos + bytesWritten < (long)Integer.MAX_VALUE); 701 int newPosition = pos + bytesWritten; 702 try { 703 nextBuffer.position(newPosition); 704 } catch (IllegalArgumentException x) { 705 // position changed by someone else 706 } 707 } 708 break; 709 } 710 } 711 } 712 713 void releaseBuffers() { 714 for (int i=0; i<numBufs; i++) { 715 if (!(bufs[i] instanceof DirectBuffer)) { 716 Util.releaseTemporaryDirectBuffer(shadow[i]); 717 } 718 } 719 } 720 721 @Override 722 @SuppressWarnings("unchecked") 723 public void run() { 724 int n = -1; 725 long overlapped = 0L; 726 boolean prepared = false; 727 boolean pending = false; 728 boolean shutdown = false; 729 730 try { 731 begin(); 732 733 // substitute non-direct buffers 734 prepareBuffers(); 735 prepared = true; 736 737 // get an OVERLAPPED structure (from the cache or allocate) 738 overlapped = ioCache.add(result); 739 740 // synchronize on result to allow this thread handle the case 741 // where the read completes immediately. 742 synchronized (result) { 743 n = write0(handle, numBufs, writeBufferArray, overlapped); 744 if (n == IOStatus.UNAVAILABLE) { 745 // I/O is pending 746 pending = true; 747 return; 748 } 749 750 enableWriting(); 751 752 if (n == IOStatus.EOF) { 753 // special case for shutdown output 754 shutdown = true; 755 throw new ClosedChannelException(); 756 } 757 758 // write completed immediately: 759 // 1. enable writing 760 // 2. update buffer position 761 // 3. release waiters 762 updateBuffers(n); 763 764 // result is a Long or Integer 765 if (gatheringWrite) { 766 result.setResult((V)Long.valueOf(n)); 767 } else { 768 result.setResult((V)Integer.valueOf(n)); 769 } 770 } 771 } catch (Throwable x) { 772 enableWriting(); 773 774 // failed to initiate read: 775 if (!shutdown && (x instanceof ClosedChannelException)) 776 x = new AsynchronousCloseException(); 777 if (!(x instanceof IOException)) 778 x = new IOException(x); 779 result.setFailure(x); 780 781 // release resources 782 if (overlapped != 0L) 783 ioCache.remove(overlapped); 784 785 } finally { 786 if (prepared && !pending) { 787 // return direct buffer(s) to cache if substituted 788 releaseBuffers(); 789 } 790 end(); 791 } 792 793 // invoke completion handler 794 Invoker.invoke(result.handler(), result); 795 } 796 797 /** 798 * Executed when the I/O has completed 799 */ 800 @Override 801 @SuppressWarnings("unchecked") 802 public void completed(int bytesTransferred) { 803 updateBuffers(bytesTransferred); 804 805 // return direct buffer to cache if substituted 806 releaseBuffers(); 807 808 // release waiters if not already released by timeout 809 synchronized (result) { 810 if (result.isDone()) 811 return; 812 enableWriting(); 813 if (gatheringWrite) { 814 result.setResult((V)Long.valueOf(bytesTransferred)); 815 } else { 816 result.setResult((V)Integer.valueOf(bytesTransferred)); 817 } 818 } 819 Invoker.invoke(result.handler(), result); 820 } 821 822 @Override 823 public void failed(int error, IOException x) { 824 // return direct buffer to cache if substituted 825 releaseBuffers(); 826 827 // release waiters if not already released by timeout 828 if (!isOpen()) 829 x = new AsynchronousCloseException(); 830 831 synchronized (result) { 832 if (result.isDone()) 833 return; 834 enableWriting(); 835 result.setFailure(x); 836 } 837 Invoker.invoke(result.handler(), result); 838 } 839 840 /** 841 * Invoked if timeout expires before it is cancelled 842 */ 843 void timeout() { 844 // synchronize on result as the I/O could complete/fail 845 synchronized (result) { 846 if (result.isDone()) 847 return; 848 849 // kill further writing before releasing waiters 850 enableWriting(true); 851 result.setFailure(new InterruptedByTimeoutException()); 852 } 853 854 // invoke handler without any locks 855 Invoker.invoke(result.handler(), result); 856 } 857 } 858 859 @Override 860 <V extends Number,A> Future<V> writeImpl(ByteBuffer[] bufs, 861 boolean gatheringWrite, 862 long timeout, 863 TimeUnit unit, 864 A attachment, 865 CompletionHandler<V,? super A> handler) 866 { 867 // setup task 868 PendingFuture<V,A> result = 869 new PendingFuture<V,A>(this, handler, attachment); 870 final WriteTask writeTask = new WriteTask<V,A>(bufs, gatheringWrite, result); 871 result.setContext(writeTask); 872 873 // schedule timeout 874 if (timeout > 0L) { 875 Future<?> timeoutTask = iocp.schedule(new Runnable() { 876 public void run() { 877 writeTask.timeout(); 878 } 879 }, timeout, unit); 880 result.setTimeoutTask(timeoutTask); 881 } 882 883 // initiate I/O (can only be done from thread in thread pool) 884 Invoker.invokeOnThreadInThreadPool(this, writeTask); 885 return result; 886 } 887 888 // -- Native methods -- 889 890 private static native void initIDs(); 891 892 private static native int connect0(long socket, boolean preferIPv6, 893 InetAddress remote, int remotePort, long overlapped) throws IOException; 894 895 private static native void updateConnectContext(long socket) throws IOException; 896 897 private static native int read0(long socket, int count, long addres, long overlapped) 898 throws IOException; 899 900 private static native int write0(long socket, int count, long address, 901 long overlapped) throws IOException; 902 903 private static native void shutdown0(long socket, int how) throws IOException; 904 905 private static native void closesocket0(long socket) throws IOException; 906 907 static { 908 Util.load(); 909 initIDs(); 910 } 911 }