1 /* 2 * Copyright 2008-2009 Sun Microsystems, Inc. All Rights Reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Sun designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Sun in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, 22 * CA 95054 USA or visit www.sun.com if you need additional information or 23 * have any questions. 24 */ 25 26 package sun.nio.ch; 27 28 import java.nio.channels.*; 29 import java.nio.ByteBuffer; 30 import java.nio.BufferOverflowException; 31 import java.net.*; 32 import java.util.concurrent.*; 33 import java.io.IOException; 34 import sun.misc.Unsafe; 35 36 /** 37 * Windows implementation of AsynchronousSocketChannel using overlapped I/O. 38 */ 39 40 class WindowsAsynchronousSocketChannelImpl 41 extends AsynchronousSocketChannelImpl implements Iocp.OverlappedChannel 42 { 43 private static final Unsafe unsafe = Unsafe.getUnsafe(); 44 private static int addressSize = unsafe.addressSize(); 45 46 private static int dependsArch(int value32, int value64) { 47 return (addressSize == 4) ? value32 : value64; 48 } 49 50 /* 51 * typedef struct _WSABUF { 52 * u_long len; 53 * char FAR * buf; 54 * } WSABUF; 55 */ 56 private static final int SIZEOF_WSABUF = dependsArch(8, 16); 57 private static final int OFFSETOF_LEN = 0; 58 private static final int OFFSETOF_BUF = dependsArch(4, 8); 59 60 // maximum vector size for scatter/gather I/O 61 private static final int MAX_WSABUF = 16; 62 63 private static final int SIZEOF_WSABUFARRAY = MAX_WSABUF * SIZEOF_WSABUF; 64 65 66 // socket handle. Use begin()/end() around each usage of this handle. 67 final long handle; 68 69 // I/O completion port that the socket is associated with 70 private final Iocp iocp; 71 72 // completion key to identify channel when I/O completes 73 private final int completionKey; 74 75 // Pending I/O operations are tied to an OVERLAPPED structure that can only 76 // be released when the I/O completion event is posted to the completion 77 // port. Where I/O operations complete immediately then it is possible 78 // there may be more than two OVERLAPPED structures in use. 79 private final PendingIoCache ioCache; 80 81 // per-channel arrays of WSABUF structures 82 private final long readBufferArray; 83 private final long writeBufferArray; 84 85 86 WindowsAsynchronousSocketChannelImpl(Iocp iocp, boolean failIfGroupShutdown) 87 throws IOException 88 { 89 super(iocp); 90 91 // associate socket with default completion port 92 long h = IOUtil.fdVal(fd); 93 int key = 0; 94 try { 95 key = iocp.associate(this, h); 96 } catch (ShutdownChannelGroupException x) { 97 if (failIfGroupShutdown) { 98 closesocket0(h); 99 throw x; 100 } 101 } catch (IOException x) { 102 closesocket0(h); 103 throw x; 104 } 105 106 this.handle = h; 107 this.iocp = iocp; 108 this.completionKey = key; 109 this.ioCache = new PendingIoCache(); 110 111 // allocate WSABUF arrays 112 this.readBufferArray = unsafe.allocateMemory(SIZEOF_WSABUFARRAY); 113 this.writeBufferArray = unsafe.allocateMemory(SIZEOF_WSABUFARRAY); 114 } 115 116 WindowsAsynchronousSocketChannelImpl(Iocp iocp) throws IOException { 117 this(iocp, true); 118 } 119 120 @Override 121 public AsynchronousChannelGroupImpl group() { 122 return iocp; 123 } 124 125 /** 126 * Invoked by Iocp when an I/O operation competes. 127 */ 128 @Override 129 public <V,A> PendingFuture<V,A> getByOverlapped(long overlapped) { 130 return ioCache.remove(overlapped); 131 } 132 133 // invoked by WindowsAsynchronousServerSocketChannelImpl 134 long handle() { 135 return handle; 136 } 137 138 // invoked by WindowsAsynchronousServerSocketChannelImpl when new connection 139 // accept 140 void setConnected(SocketAddress localAddress, SocketAddress remoteAddress) { 141 synchronized (stateLock) { 142 state = ST_CONNECTED; 143 this.localAddress = localAddress; 144 this.remoteAddress = remoteAddress; 145 } 146 } 147 148 @Override 149 void implClose() throws IOException { 150 // close socket (may cause outstanding async I/O operations to fail). 151 closesocket0(handle); 152 153 // waits until all I/O operations have completed 154 ioCache.close(); 155 156 // release arrays of WSABUF structures 157 unsafe.freeMemory(readBufferArray); 158 unsafe.freeMemory(writeBufferArray); 159 160 // finally disassociate from the completion port (key can be 0 if 161 // channel created when group is shutdown) 162 if (completionKey != 0) 163 iocp.disassociate(completionKey); 164 } 165 166 @Override 167 public void onCancel(PendingFuture<?,?> task) { 168 if (task.getContext() instanceof ConnectTask) 169 killConnect(); 170 if (task.getContext() instanceof ReadTask) 171 killReading(); 172 if (task.getContext() instanceof WriteTask) 173 killWriting(); 174 } 175 176 /** 177 * Implements the task to initiate a connection and the handler to 178 * consume the result when the connection is established (or fails). 179 */ 180 private class ConnectTask<A> implements Runnable, Iocp.ResultHandler { 181 private final InetSocketAddress remote; 182 private final PendingFuture<Void,A> result; 183 184 ConnectTask(InetSocketAddress remote, PendingFuture<Void,A> result) { 185 this.remote = remote; 186 this.result = result; 187 } 188 189 private void closeChannel() { 190 try { 191 close(); 192 } catch (IOException ignore) { } 193 } 194 195 private IOException toIOException(Throwable x) { 196 if (x instanceof IOException) { 197 if (x instanceof ClosedChannelException) 198 x = new AsynchronousCloseException(); 199 return (IOException)x; 200 } 201 return new IOException(x); 202 } 203 204 /** 205 * Invoke after a connection is successfully established. 206 */ 207 private void afterConnect() throws IOException { 208 updateConnectContext(handle); 209 synchronized (stateLock) { 210 state = ST_CONNECTED; 211 remoteAddress = remote; 212 } 213 } 214 215 /** 216 * Task to initiate a connection. 217 */ 218 @Override 219 public void run() { 220 long overlapped = 0L; 221 Throwable exc = null; 222 try { 223 begin(); 224 225 // synchronize on result to allow this thread handle the case 226 // where the connection is established immediately. 227 synchronized (result) { 228 overlapped = ioCache.add(result); 229 // initiate the connection 230 int n = connect0(handle, Net.isIPv6Available(), remote.getAddress(), 231 remote.getPort(), overlapped); 232 if (n == IOStatus.UNAVAILABLE) { 233 // connection is pending 234 return; 235 } 236 237 // connection established immediately 238 afterConnect(); 239 result.setResult(null); 240 } 241 } catch (Throwable x) { 242 exc = x; 243 } finally { 244 end(); 245 } 246 247 if (exc != null) { 248 if (overlapped != 0L) 249 ioCache.remove(overlapped); 250 closeChannel(); 251 result.setFailure(toIOException(exc)); 252 } 253 Invoker.invoke(result.handler(), result); 254 } 255 256 /** 257 * Invoked by handler thread when connection established. 258 */ 259 @Override 260 public void completed(int bytesTransferred) { 261 Throwable exc = null; 262 try { 263 begin(); 264 afterConnect(); 265 result.setResult(null); 266 } catch (Throwable x) { 267 // channel is closed or unable to finish connect 268 exc = x; 269 } finally { 270 end(); 271 } 272 273 // can't close channel while in begin/end block 274 if (exc != null) { 275 closeChannel(); 276 result.setFailure(toIOException(exc)); 277 } 278 279 Invoker.invoke(result.handler(), result); 280 } 281 282 /** 283 * Invoked by handler thread when failed to establish connection. 284 */ 285 @Override 286 public void failed(int error, IOException x) { 287 if (isOpen()) { 288 closeChannel(); 289 result.setFailure(x); 290 } else { 291 result.setFailure(new AsynchronousCloseException()); 292 } 293 Invoker.invoke(result.handler(), result); 294 } 295 } 296 297 @Override 298 public <A> Future<Void> connect(SocketAddress remote, 299 A attachment, 300 CompletionHandler<Void,? super A> handler) 301 { 302 if (!isOpen()) { 303 CompletedFuture<Void,A> result = CompletedFuture 304 .withFailure(this, new ClosedChannelException(), attachment); 305 Invoker.invoke(handler, result); 306 return result; 307 } 308 309 InetSocketAddress isa = Net.checkAddress(remote); 310 311 // permission check 312 SecurityManager sm = System.getSecurityManager(); 313 if (sm != null) 314 sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort()); 315 316 // check and update state 317 // ConnectEx requires the socket to be bound to a local address 318 IOException bindException = null; 319 synchronized (stateLock) { 320 if (state == ST_CONNECTED) 321 throw new AlreadyConnectedException(); 322 if (state == ST_PENDING) 323 throw new ConnectionPendingException(); 324 if (localAddress == null) { 325 try { 326 bind(new InetSocketAddress(0)); 327 } catch (IOException x) { 328 bindException = x; 329 } 330 } 331 if (bindException == null) 332 state = ST_PENDING; 333 } 334 335 // handle bind failure 336 if (bindException != null) { 337 try { 338 close(); 339 } catch (IOException ignore) { } 340 CompletedFuture<Void,A> result = CompletedFuture 341 .withFailure(this, bindException, attachment); 342 Invoker.invoke(handler, result); 343 return result; 344 } 345 346 // setup task 347 PendingFuture<Void,A> result = 348 new PendingFuture<Void,A>(this, handler, attachment); 349 ConnectTask task = new ConnectTask<A>(isa, result); 350 result.setContext(task); 351 352 // initiate I/O (can only be done from thread in thread pool) 353 Invoker.invokeOnThreadInThreadPool(this, task); 354 return result; 355 } 356 357 /** 358 * Implements the task to initiate a read and the handler to consume the 359 * result when the read completes. 360 */ 361 private class ReadTask<V,A> implements Runnable, Iocp.ResultHandler { 362 private final ByteBuffer[] bufs; 363 private final int numBufs; 364 private final boolean scatteringRead; 365 private final PendingFuture<V,A> result; 366 367 // set by run method 368 private ByteBuffer[] shadow; 369 370 ReadTask(ByteBuffer[] bufs, 371 boolean scatteringRead, 372 PendingFuture<V,A> result) 373 { 374 this.bufs = bufs; 375 this.numBufs = (bufs.length > MAX_WSABUF) ? MAX_WSABUF : bufs.length; 376 this.scatteringRead = scatteringRead; 377 this.result = result; 378 } 379 380 /** 381 * Invoked prior to read to prepare the WSABUF array. Where necessary, 382 * it substitutes non-direct buffers with direct buffers. 383 */ 384 void prepareBuffers() { 385 shadow = new ByteBuffer[numBufs]; 386 long address = readBufferArray; 387 for (int i=0; i<numBufs; i++) { 388 ByteBuffer dst = bufs[i]; 389 int pos = dst.position(); 390 int lim = dst.limit(); 391 assert (pos <= lim); 392 int rem = (pos <= lim ? lim - pos : 0); 393 long a; 394 if (!(dst instanceof DirectBuffer)) { 395 // substitute with direct buffer 396 ByteBuffer bb = Util.getTemporaryDirectBuffer(rem); 397 shadow[i] = bb; 398 a = ((DirectBuffer)bb).address(); 399 } else { 400 shadow[i] = dst; 401 a = ((DirectBuffer)dst).address() + pos; 402 } 403 unsafe.putAddress(address + OFFSETOF_BUF, a); 404 unsafe.putInt(address + OFFSETOF_LEN, rem); 405 address += SIZEOF_WSABUF; 406 } 407 } 408 409 /** 410 * Invoked after a read has completed to update the buffer positions 411 * and release any substituted buffers. 412 */ 413 void updateBuffers(int bytesRead) { 414 for (int i=0; i<numBufs; i++) { 415 ByteBuffer nextBuffer = shadow[i]; 416 int pos = nextBuffer.position(); 417 int len = nextBuffer.remaining(); 418 if (bytesRead >= len) { 419 bytesRead -= len; 420 int newPosition = pos + len; 421 try { 422 nextBuffer.position(newPosition); 423 } catch (IllegalArgumentException x) { 424 // position changed by another 425 } 426 } else { // Buffers not completely filled 427 if (bytesRead > 0) { 428 assert(pos + bytesRead < (long)Integer.MAX_VALUE); 429 int newPosition = pos + bytesRead; 430 try { 431 nextBuffer.position(newPosition); 432 } catch (IllegalArgumentException x) { 433 // position changed by another 434 } 435 } 436 break; 437 } 438 } 439 440 // Put results from shadow into the slow buffers 441 for (int i=0; i<numBufs; i++) { 442 if (!(bufs[i] instanceof DirectBuffer)) { 443 shadow[i].flip(); 444 try { 445 bufs[i].put(shadow[i]); 446 } catch (BufferOverflowException x) { 447 // position changed by another 448 } 449 } 450 } 451 } 452 453 void releaseBuffers() { 454 for (int i=0; i<numBufs; i++) { 455 if (!(bufs[i] instanceof DirectBuffer)) { 456 Util.releaseTemporaryDirectBuffer(shadow[i]); 457 } 458 } 459 } 460 461 @Override 462 @SuppressWarnings("unchecked") 463 public void run() { 464 long overlapped = 0L; 465 boolean prepared = false; 466 boolean pending = false; 467 468 try { 469 begin(); 470 471 // substitute non-direct buffers 472 prepareBuffers(); 473 prepared = true; 474 475 // get an OVERLAPPED structure (from the cache or allocate) 476 overlapped = ioCache.add(result); 477 478 // initiate read 479 int n = read0(handle, numBufs, readBufferArray, overlapped); 480 if (n == IOStatus.UNAVAILABLE) { 481 // I/O is pending 482 pending = true; 483 return; 484 } 485 if (n == IOStatus.EOF) { 486 // input shutdown 487 enableReading(); 488 if (scatteringRead) { 489 result.setResult((V)Long.valueOf(-1L)); 490 } else { 491 result.setResult((V)Integer.valueOf(-1)); 492 } 493 } else { 494 throw new InternalError("Read completed immediately"); 495 } 496 } catch (Throwable x) { 497 // failed to initiate read 498 // reset read flag before releasing waiters 499 enableReading(); 500 if (x instanceof ClosedChannelException) 501 x = new AsynchronousCloseException(); 502 if (!(x instanceof IOException)) 503 x = new IOException(x); 504 result.setFailure(x); 505 } finally { 506 // release resources if I/O not pending 507 if (!pending) { 508 if (overlapped != 0L) 509 ioCache.remove(overlapped); 510 if (prepared) 511 releaseBuffers(); 512 } 513 end(); 514 } 515 516 // invoke completion handler 517 Invoker.invoke(result.handler(), result); 518 } 519 520 /** 521 * Executed when the I/O has completed 522 */ 523 @Override 524 @SuppressWarnings("unchecked") 525 public void completed(int bytesTransferred) { 526 if (bytesTransferred == 0) { 527 bytesTransferred = -1; // EOF 528 } else { 529 updateBuffers(bytesTransferred); 530 } 531 532 // return direct buffer to cache if substituted 533 releaseBuffers(); 534 535 // release waiters if not already released by timeout 536 synchronized (result) { 537 if (result.isDone()) 538 return; 539 enableReading(); 540 if (scatteringRead) { 541 result.setResult((V)Long.valueOf(bytesTransferred)); 542 } else { 543 result.setResult((V)Integer.valueOf(bytesTransferred)); 544 } 545 } 546 Invoker.invoke(result.handler(), result); 547 } 548 549 @Override 550 public void failed(int error, IOException x) { 551 // return direct buffer to cache if substituted 552 releaseBuffers(); 553 554 // release waiters if not already released by timeout 555 if (!isOpen()) 556 x = new AsynchronousCloseException(); 557 558 synchronized (result) { 559 if (result.isDone()) 560 return; 561 enableReading(); 562 result.setFailure(x); 563 } 564 Invoker.invoke(result.handler(), result); 565 } 566 567 /** 568 * Invoked if timeout expires before it is cancelled 569 */ 570 void timeout() { 571 // synchronize on result as the I/O could complete/fail 572 synchronized (result) { 573 if (result.isDone()) 574 return; 575 576 // kill further reading before releasing waiters 577 enableReading(true); 578 result.setFailure(new InterruptedByTimeoutException()); 579 } 580 581 // invoke handler without any locks 582 Invoker.invoke(result.handler(), result); 583 } 584 } 585 586 @Override 587 <V extends Number,A> Future<V> readImpl(ByteBuffer[] bufs, 588 boolean scatteringRead, 589 long timeout, 590 TimeUnit unit, 591 A attachment, 592 CompletionHandler<V,? super A> handler) 593 { 594 // setup task 595 PendingFuture<V,A> result = 596 new PendingFuture<V,A>(this, handler, attachment); 597 final ReadTask readTask = new ReadTask<V,A>(bufs, scatteringRead, result); 598 result.setContext(readTask); 599 600 // schedule timeout 601 if (timeout > 0L) { 602 Future<?> timeoutTask = iocp.schedule(new Runnable() { 603 public void run() { 604 readTask.timeout(); 605 } 606 }, timeout, unit); 607 result.setTimeoutTask(timeoutTask); 608 } 609 610 // initiate I/O (can only be done from thread in thread pool) 611 Invoker.invokeOnThreadInThreadPool(this, readTask); 612 return result; 613 } 614 615 /** 616 * Implements the task to initiate a write and the handler to consume the 617 * result when the write completes. 618 */ 619 private class WriteTask<V,A> implements Runnable, Iocp.ResultHandler { 620 private final ByteBuffer[] bufs; 621 private final int numBufs; 622 private final boolean gatheringWrite; 623 private final PendingFuture<V,A> result; 624 625 // set by run method 626 private ByteBuffer[] shadow; 627 628 WriteTask(ByteBuffer[] bufs, 629 boolean gatheringWrite, 630 PendingFuture<V,A> result) 631 { 632 this.bufs = bufs; 633 this.numBufs = (bufs.length > MAX_WSABUF) ? MAX_WSABUF : bufs.length; 634 this.gatheringWrite = gatheringWrite; 635 this.result = result; 636 } 637 638 /** 639 * Invoked prior to write to prepare the WSABUF array. Where necessary, 640 * it substitutes non-direct buffers with direct buffers. 641 */ 642 void prepareBuffers() { 643 shadow = new ByteBuffer[numBufs]; 644 long address = writeBufferArray; 645 for (int i=0; i<numBufs; i++) { 646 ByteBuffer src = bufs[i]; 647 int pos = src.position(); 648 int lim = src.limit(); 649 assert (pos <= lim); 650 int rem = (pos <= lim ? lim - pos : 0); 651 long a; 652 if (!(src instanceof DirectBuffer)) { 653 // substitute with direct buffer 654 ByteBuffer bb = Util.getTemporaryDirectBuffer(rem); 655 bb.put(src); 656 bb.flip(); 657 src.position(pos); // leave heap buffer untouched for now 658 shadow[i] = bb; 659 a = ((DirectBuffer)bb).address(); 660 } else { 661 shadow[i] = src; 662 a = ((DirectBuffer)src).address() + pos; 663 } 664 unsafe.putAddress(address + OFFSETOF_BUF, a); 665 unsafe.putInt(address + OFFSETOF_LEN, rem); 666 address += SIZEOF_WSABUF; 667 } 668 } 669 670 /** 671 * Invoked after a write has completed to update the buffer positions 672 * and release any substituted buffers. 673 */ 674 void updateBuffers(int bytesWritten) { 675 // Notify the buffers how many bytes were taken 676 for (int i=0; i<numBufs; i++) { 677 ByteBuffer nextBuffer = bufs[i]; 678 int pos = nextBuffer.position(); 679 int lim = nextBuffer.limit(); 680 int len = (pos <= lim ? lim - pos : lim); 681 if (bytesWritten >= len) { 682 bytesWritten -= len; 683 int newPosition = pos + len; 684 try { 685 nextBuffer.position(newPosition); 686 } catch (IllegalArgumentException x) { 687 // position changed by someone else 688 } 689 } else { // Buffers not completely filled 690 if (bytesWritten > 0) { 691 assert(pos + bytesWritten < (long)Integer.MAX_VALUE); 692 int newPosition = pos + bytesWritten; 693 try { 694 nextBuffer.position(newPosition); 695 } catch (IllegalArgumentException x) { 696 // position changed by someone else 697 } 698 } 699 break; 700 } 701 } 702 } 703 704 void releaseBuffers() { 705 for (int i=0; i<numBufs; i++) { 706 if (!(bufs[i] instanceof DirectBuffer)) { 707 Util.releaseTemporaryDirectBuffer(shadow[i]); 708 } 709 } 710 } 711 712 @Override 713 @SuppressWarnings("unchecked") 714 public void run() { 715 long overlapped = 0L; 716 boolean prepared = false; 717 boolean pending = false; 718 boolean shutdown = false; 719 720 try { 721 begin(); 722 723 // substitute non-direct buffers 724 prepareBuffers(); 725 prepared = true; 726 727 // get an OVERLAPPED structure (from the cache or allocate) 728 overlapped = ioCache.add(result); 729 int n = write0(handle, numBufs, writeBufferArray, overlapped); 730 if (n == IOStatus.UNAVAILABLE) { 731 // I/O is pending 732 pending = true; 733 return; 734 } 735 if (n == IOStatus.EOF) { 736 // special case for shutdown output 737 shutdown = true; 738 throw new ClosedChannelException(); 739 } 740 // write completed immediately 741 throw new InternalError("Write completed immediately"); 742 } catch (Throwable x) { 743 // write failed. Enable writing before releasing waiters. 744 enableWriting(); 745 if (!shutdown && (x instanceof ClosedChannelException)) 746 x = new AsynchronousCloseException(); 747 if (!(x instanceof IOException)) 748 x = new IOException(x); 749 result.setFailure(x); 750 } finally { 751 // release resources if I/O not pending 752 if (!pending) { 753 if (overlapped != 0L) 754 ioCache.remove(overlapped); 755 if (prepared) 756 releaseBuffers(); 757 } 758 end(); 759 } 760 761 // invoke completion handler 762 Invoker.invoke(result.handler(), result); 763 } 764 765 /** 766 * Executed when the I/O has completed 767 */ 768 @Override 769 @SuppressWarnings("unchecked") 770 public void completed(int bytesTransferred) { 771 updateBuffers(bytesTransferred); 772 773 // return direct buffer to cache if substituted 774 releaseBuffers(); 775 776 // release waiters if not already released by timeout 777 synchronized (result) { 778 if (result.isDone()) 779 return; 780 enableWriting(); 781 if (gatheringWrite) { 782 result.setResult((V)Long.valueOf(bytesTransferred)); 783 } else { 784 result.setResult((V)Integer.valueOf(bytesTransferred)); 785 } 786 } 787 Invoker.invoke(result.handler(), result); 788 } 789 790 @Override 791 public void failed(int error, IOException x) { 792 // return direct buffer to cache if substituted 793 releaseBuffers(); 794 795 // release waiters if not already released by timeout 796 if (!isOpen()) 797 x = new AsynchronousCloseException(); 798 799 synchronized (result) { 800 if (result.isDone()) 801 return; 802 enableWriting(); 803 result.setFailure(x); 804 } 805 Invoker.invoke(result.handler(), result); 806 } 807 808 /** 809 * Invoked if timeout expires before it is cancelled 810 */ 811 void timeout() { 812 // synchronize on result as the I/O could complete/fail 813 synchronized (result) { 814 if (result.isDone()) 815 return; 816 817 // kill further writing before releasing waiters 818 enableWriting(true); 819 result.setFailure(new InterruptedByTimeoutException()); 820 } 821 822 // invoke handler without any locks 823 Invoker.invoke(result.handler(), result); 824 } 825 } 826 827 @Override 828 <V extends Number,A> Future<V> writeImpl(ByteBuffer[] bufs, 829 boolean gatheringWrite, 830 long timeout, 831 TimeUnit unit, 832 A attachment, 833 CompletionHandler<V,? super A> handler) 834 { 835 // setup task 836 PendingFuture<V,A> result = 837 new PendingFuture<V,A>(this, handler, attachment); 838 final WriteTask writeTask = new WriteTask<V,A>(bufs, gatheringWrite, result); 839 result.setContext(writeTask); 840 841 // schedule timeout 842 if (timeout > 0L) { 843 Future<?> timeoutTask = iocp.schedule(new Runnable() { 844 public void run() { 845 writeTask.timeout(); 846 } 847 }, timeout, unit); 848 result.setTimeoutTask(timeoutTask); 849 } 850 851 // initiate I/O (can only be done from thread in thread pool) 852 Invoker.invokeOnThreadInThreadPool(this, writeTask); 853 return result; 854 } 855 856 // -- Native methods -- 857 858 private static native void initIDs(); 859 860 private static native int connect0(long socket, boolean preferIPv6, 861 InetAddress remote, int remotePort, long overlapped) throws IOException; 862 863 private static native void updateConnectContext(long socket) throws IOException; 864 865 private static native int read0(long socket, int count, long addres, long overlapped) 866 throws IOException; 867 868 private static native int write0(long socket, int count, long address, 869 long overlapped) throws IOException; 870 871 private static native void shutdown0(long socket, int how) throws IOException; 872 873 private static native void closesocket0(long socket) throws IOException; 874 875 static { 876 Util.load(); 877 initIDs(); 878 } 879 }