1 /*
   2  * Copyright 2008-2009 Sun Microsystems, Inc.  All Rights Reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Sun designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Sun in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
  22  * CA 95054 USA or visit www.sun.com if you need additional information or
  23  * have any questions.
  24  */
  25 
  26 package sun.nio.ch;
  27 
  28 import java.nio.channels.*;
  29 import java.nio.ByteBuffer;
  30 import java.nio.BufferOverflowException;
  31 import java.net.*;
  32 import java.util.concurrent.*;
  33 import java.io.IOException;
  34 import sun.misc.Unsafe;
  35 
  36 /**
  37  * Windows implementation of AsynchronousSocketChannel using overlapped I/O.
  38  */
  39 
  40 class WindowsAsynchronousSocketChannelImpl
  41     extends AsynchronousSocketChannelImpl implements Iocp.OverlappedChannel
  42 {
  43     private static final Unsafe unsafe = Unsafe.getUnsafe();
  44     private static int addressSize = unsafe.addressSize();
  45 
  46     private static int dependsArch(int value32, int value64) {
  47         return (addressSize == 4) ? value32 : value64;
  48     }
  49 
  50     /*
  51      * typedef struct _WSABUF {
  52      *     u_long      len;
  53      *     char FAR *  buf;
  54      * } WSABUF;
  55      */
  56     private static final int SIZEOF_WSABUF  = dependsArch(8, 16);
  57     private static final int OFFSETOF_LEN   = 0;
  58     private static final int OFFSETOF_BUF   = dependsArch(4, 8);
  59 
  60     // maximum vector size for scatter/gather I/O
  61     private static final int MAX_WSABUF     = 16;
  62 
  63     private static final int SIZEOF_WSABUFARRAY = MAX_WSABUF * SIZEOF_WSABUF;
  64 
  65 
  66     // socket handle. Use begin()/end() around each usage of this handle.
  67     final long handle;
  68 
  69     // I/O completion port that the socket is associated with
  70     private final Iocp iocp;
  71 
  72     // completion key to identify channel when I/O completes
  73     private final int completionKey;
  74 
  75     // Pending I/O operations are tied to an OVERLAPPED structure that can only
  76     // be released when the I/O completion event is posted to the completion
  77     // port. Where I/O operations complete immediately then it is possible
  78     // there may be more than two OVERLAPPED structures in use.
  79     private final PendingIoCache ioCache;
  80 
  81     // per-channel arrays of WSABUF structures
  82     private final long readBufferArray;
  83     private final long writeBufferArray;
  84 
  85 
  86     WindowsAsynchronousSocketChannelImpl(Iocp iocp, boolean failIfGroupShutdown)
  87         throws IOException
  88     {
  89         super(iocp);
  90 
  91         // associate socket with default completion port
  92         long h = IOUtil.fdVal(fd);
  93         int key = 0;
  94         try {
  95             key = iocp.associate(this, h);
  96         } catch (ShutdownChannelGroupException x) {
  97             if (failIfGroupShutdown) {
  98                 closesocket0(h);
  99                 throw x;
 100             }
 101         } catch (IOException x) {
 102             closesocket0(h);
 103             throw x;
 104         }
 105 
 106         this.handle = h;
 107         this.iocp = iocp;
 108         this.completionKey = key;
 109         this.ioCache = new PendingIoCache();
 110 
 111         // allocate WSABUF arrays
 112         this.readBufferArray = unsafe.allocateMemory(SIZEOF_WSABUFARRAY);
 113         this.writeBufferArray = unsafe.allocateMemory(SIZEOF_WSABUFARRAY);
 114     }
 115 
 116     WindowsAsynchronousSocketChannelImpl(Iocp iocp) throws IOException {
 117         this(iocp, true);
 118     }
 119 
 120     @Override
 121     public AsynchronousChannelGroupImpl group() {
 122         return iocp;
 123     }
 124 
 125     /**
 126      * Invoked by Iocp when an I/O operation competes.
 127      */
 128     @Override
 129     public <V,A> PendingFuture<V,A> getByOverlapped(long overlapped) {
 130         return ioCache.remove(overlapped);
 131     }
 132 
 133     // invoked by WindowsAsynchronousServerSocketChannelImpl
 134     long handle() {
 135         return handle;
 136     }
 137 
 138     // invoked by WindowsAsynchronousServerSocketChannelImpl when new connection
 139     // accept
 140     void setConnected(SocketAddress localAddress, SocketAddress remoteAddress) {
 141         synchronized (stateLock) {
 142             state = ST_CONNECTED;
 143             this.localAddress = localAddress;
 144             this.remoteAddress = remoteAddress;
 145         }
 146     }
 147 
 148     @Override
 149     void implClose() throws IOException {
 150         // close socket (may cause outstanding async I/O operations to fail).
 151         closesocket0(handle);
 152 
 153         // waits until all I/O operations have completed
 154         ioCache.close();
 155 
 156         // release arrays of WSABUF structures
 157         unsafe.freeMemory(readBufferArray);
 158         unsafe.freeMemory(writeBufferArray);
 159 
 160         // finally disassociate from the completion port (key can be 0 if
 161         // channel created when group is shutdown)
 162         if (completionKey != 0)
 163             iocp.disassociate(completionKey);
 164     }
 165 
 166     @Override
 167     public void onCancel(PendingFuture<?,?> task) {
 168         if (task.getContext() instanceof ConnectTask)
 169             killConnect();
 170         if (task.getContext() instanceof ReadTask)
 171             killReading();
 172         if (task.getContext() instanceof WriteTask)
 173             killWriting();
 174     }
 175 
 176     /**
 177      * Implements the task to initiate a connection and the handler to
 178      * consume the result when the connection is established (or fails).
 179      */
 180     private class ConnectTask<A> implements Runnable, Iocp.ResultHandler {
 181         private final InetSocketAddress remote;
 182         private final PendingFuture<Void,A> result;
 183 
 184         ConnectTask(InetSocketAddress remote, PendingFuture<Void,A> result) {
 185             this.remote = remote;
 186             this.result = result;
 187         }
 188 
 189         private void closeChannel() {
 190             try {
 191                 close();
 192             } catch (IOException ignore) { }
 193         }
 194 
 195         private IOException toIOException(Throwable x) {
 196             if (x instanceof IOException) {
 197                 if (x instanceof ClosedChannelException)
 198                     x = new AsynchronousCloseException();
 199                 return (IOException)x;
 200             }
 201             return new IOException(x);
 202         }
 203 
 204         /**
 205          * Invoke after a connection is successfully established.
 206          */
 207         private void afterConnect() throws IOException {
 208             updateConnectContext(handle);
 209             synchronized (stateLock) {
 210                 state = ST_CONNECTED;
 211                 remoteAddress = remote;
 212             }
 213         }
 214 
 215         /**
 216          * Task to initiate a connection.
 217          */
 218         @Override
 219         public void run() {
 220             long overlapped = 0L;
 221             Throwable exc = null;
 222             try {
 223                 begin();
 224 
 225                 // synchronize on result to allow this thread handle the case
 226                 // where the connection is established immediately.
 227                 synchronized (result) {
 228                     overlapped = ioCache.add(result);
 229                     // initiate the connection
 230                     int n = connect0(handle, Net.isIPv6Available(), remote.getAddress(),
 231                                      remote.getPort(), overlapped);
 232                     if (n == IOStatus.UNAVAILABLE) {
 233                         // connection is pending
 234                         return;
 235                     }
 236 
 237                     // connection established immediately
 238                     afterConnect();
 239                     result.setResult(null);
 240                 }
 241             } catch (Throwable x) {
 242                 exc = x;
 243             } finally {
 244                 end();
 245             }
 246 
 247             if (exc != null) {
 248                 if (overlapped != 0L)
 249                     ioCache.remove(overlapped);
 250                 closeChannel();
 251                 result.setFailure(toIOException(exc));
 252             }
 253             Invoker.invoke(result.handler(), result);
 254         }
 255 
 256         /**
 257          * Invoked by handler thread when connection established.
 258          */
 259         @Override
 260         public void completed(int bytesTransferred) {
 261             Throwable exc = null;
 262             try {
 263                 begin();
 264                 afterConnect();
 265                 result.setResult(null);
 266             } catch (Throwable x) {
 267                 // channel is closed or unable to finish connect
 268                 exc = x;
 269             } finally {
 270                 end();
 271             }
 272 
 273             // can't close channel while in begin/end block
 274             if (exc != null) {
 275                 closeChannel();
 276                 result.setFailure(toIOException(exc));
 277             }
 278 
 279             Invoker.invoke(result.handler(), result);
 280         }
 281 
 282         /**
 283          * Invoked by handler thread when failed to establish connection.
 284          */
 285         @Override
 286         public void failed(int error, IOException x) {
 287             if (isOpen()) {
 288                 closeChannel();
 289                 result.setFailure(x);
 290             } else {
 291                 result.setFailure(new AsynchronousCloseException());
 292             }
 293             Invoker.invoke(result.handler(), result);
 294         }
 295     }
 296 
 297     @Override
 298     public <A> Future<Void> connect(SocketAddress remote,
 299                                     A attachment,
 300                                     CompletionHandler<Void,? super A> handler)
 301     {
 302         if (!isOpen()) {
 303             CompletedFuture<Void,A> result = CompletedFuture
 304                 .withFailure(this, new ClosedChannelException(), attachment);
 305             Invoker.invoke(handler, result);
 306             return result;
 307         }
 308 
 309         InetSocketAddress isa = Net.checkAddress(remote);
 310 
 311         // permission check
 312         SecurityManager sm = System.getSecurityManager();
 313         if (sm != null)
 314             sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort());
 315 
 316         // check and update state
 317         // ConnectEx requires the socket to be bound to a local address
 318         IOException bindException = null;
 319         synchronized (stateLock) {
 320             if (state == ST_CONNECTED)
 321                 throw new AlreadyConnectedException();
 322             if (state == ST_PENDING)
 323                 throw new ConnectionPendingException();
 324             if (localAddress == null) {
 325                 try {
 326                     bind(new InetSocketAddress(0));
 327                 } catch (IOException x) {
 328                     bindException = x;
 329                 }
 330             }
 331             if (bindException == null)
 332                 state = ST_PENDING;
 333         }
 334 
 335         // handle bind failure
 336         if (bindException != null) {
 337             try {
 338                 close();
 339             } catch (IOException ignore) { }
 340             CompletedFuture<Void,A> result = CompletedFuture
 341                 .withFailure(this, bindException, attachment);
 342             Invoker.invoke(handler, result);
 343             return result;
 344         }
 345 
 346         // setup task
 347         PendingFuture<Void,A> result =
 348             new PendingFuture<Void,A>(this, handler, attachment);
 349         ConnectTask task = new ConnectTask<A>(isa, result);
 350         result.setContext(task);
 351 
 352         // initiate I/O (can only be done from thread in thread pool)
 353         Invoker.invokeOnThreadInThreadPool(this, task);
 354         return result;
 355     }
 356 
 357     /**
 358      * Implements the task to initiate a read and the handler to consume the
 359      * result when the read completes.
 360      */
 361     private class ReadTask<V,A> implements Runnable, Iocp.ResultHandler {
 362         private final ByteBuffer[] bufs;
 363         private final int numBufs;
 364         private final boolean scatteringRead;
 365         private final PendingFuture<V,A> result;
 366 
 367         // set by run method
 368         private ByteBuffer[] shadow;
 369 
 370         ReadTask(ByteBuffer[] bufs,
 371                  boolean scatteringRead,
 372                  PendingFuture<V,A> result)
 373         {
 374             this.bufs = bufs;
 375             this.numBufs = (bufs.length > MAX_WSABUF) ? MAX_WSABUF : bufs.length;
 376             this.scatteringRead = scatteringRead;
 377             this.result = result;
 378         }
 379 
 380         /**
 381          * Invoked prior to read to prepare the WSABUF array. Where necessary,
 382          * it substitutes non-direct buffers with direct buffers.
 383          */
 384         void prepareBuffers() {
 385             shadow = new ByteBuffer[numBufs];
 386             long address = readBufferArray;
 387             for (int i=0; i<numBufs; i++) {
 388                 ByteBuffer dst = bufs[i];
 389                 int pos = dst.position();
 390                 int lim = dst.limit();
 391                 assert (pos <= lim);
 392                 int rem = (pos <= lim ? lim - pos : 0);
 393                 long a;
 394                 if (!(dst instanceof DirectBuffer)) {
 395                     // substitute with direct buffer
 396                     ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
 397                     shadow[i] = bb;
 398                     a = ((DirectBuffer)bb).address();
 399                 } else {
 400                     shadow[i] = dst;
 401                     a = ((DirectBuffer)dst).address() + pos;
 402                 }
 403                 unsafe.putAddress(address + OFFSETOF_BUF, a);
 404                 unsafe.putInt(address + OFFSETOF_LEN, rem);
 405                 address += SIZEOF_WSABUF;
 406             }
 407         }
 408 
 409         /**
 410          * Invoked after a read has completed to update the buffer positions
 411          * and release any substituted buffers.
 412          */
 413         void updateBuffers(int bytesRead) {
 414             for (int i=0; i<numBufs; i++) {
 415                 ByteBuffer nextBuffer = shadow[i];
 416                 int pos = nextBuffer.position();
 417                 int len = nextBuffer.remaining();
 418                 if (bytesRead >= len) {
 419                     bytesRead -= len;
 420                     int newPosition = pos + len;
 421                     try {
 422                         nextBuffer.position(newPosition);
 423                     } catch (IllegalArgumentException x) {
 424                         // position changed by another
 425                     }
 426                 } else { // Buffers not completely filled
 427                     if (bytesRead > 0) {
 428                         assert(pos + bytesRead < (long)Integer.MAX_VALUE);
 429                         int newPosition = pos + bytesRead;
 430                         try {
 431                             nextBuffer.position(newPosition);
 432                         } catch (IllegalArgumentException x) {
 433                             // position changed by another
 434                         }
 435                     }
 436                     break;
 437                 }
 438             }
 439 
 440             // Put results from shadow into the slow buffers
 441             for (int i=0; i<numBufs; i++) {
 442                 if (!(bufs[i] instanceof DirectBuffer)) {
 443                     shadow[i].flip();
 444                     try {
 445                         bufs[i].put(shadow[i]);
 446                     } catch (BufferOverflowException x) {
 447                         // position changed by another
 448                     }
 449                 }
 450             }
 451         }
 452 
 453         void releaseBuffers() {
 454             for (int i=0; i<numBufs; i++) {
 455                 if (!(bufs[i] instanceof DirectBuffer)) {
 456                     Util.releaseTemporaryDirectBuffer(shadow[i]);
 457                 }
 458             }
 459         }
 460 
 461         @Override
 462         @SuppressWarnings("unchecked")
 463         public void run() {
 464             long overlapped = 0L;
 465             boolean prepared = false;
 466             boolean pending = false;
 467 
 468             try {
 469                 begin();
 470 
 471                 // substitute non-direct buffers
 472                 prepareBuffers();
 473                 prepared = true;
 474 
 475                 // get an OVERLAPPED structure (from the cache or allocate)
 476                 overlapped = ioCache.add(result);
 477 
 478                 // synchronize on result to allow this thread handle the case
 479                 // where the read completes immediately.
 480                 synchronized (result) {
 481                     int n = read0(handle, numBufs, readBufferArray, overlapped);
 482                     if (n == IOStatus.UNAVAILABLE) {
 483                         // I/O is pending
 484                         pending = true;
 485                         return;
 486                     }
 487                     // read completed immediately:
 488                     // 1. update buffer position
 489                     // 2. reset read flag
 490                     // 3. release waiters
 491                     if (n == 0) {
 492                         n = -1;
 493                     } else {
 494                         updateBuffers(n);
 495                     }
 496                     enableReading();
 497 
 498                     if (scatteringRead) {
 499                         result.setResult((V)Long.valueOf(n));
 500                     } else {
 501                         result.setResult((V)Integer.valueOf(n));
 502                     }
 503                 }
 504             } catch (Throwable x) {
 505                 // failed to initiate read:
 506                 // 1. reset read flag
 507                 // 2. free resources
 508                 // 3. release waiters
 509                 enableReading();
 510                 if (overlapped != 0L)
 511                     ioCache.remove(overlapped);
 512                 if (x instanceof ClosedChannelException)
 513                     x = new AsynchronousCloseException();
 514                 if (!(x instanceof IOException))
 515                     x = new IOException(x);
 516                 result.setFailure(x);
 517             } finally {
 518                 if (prepared && !pending) {
 519                     // return direct buffer(s) to cache if substituted
 520                     releaseBuffers();
 521                 }
 522                 end();
 523             }
 524 
 525             // invoke completion handler
 526             Invoker.invoke(result.handler(), result);
 527         }
 528 
 529         /**
 530          * Executed when the I/O has completed
 531          */
 532         @Override
 533         @SuppressWarnings("unchecked")
 534         public void completed(int bytesTransferred) {
 535             if (bytesTransferred == 0) {
 536                 bytesTransferred = -1;  // EOF
 537             } else {
 538                 updateBuffers(bytesTransferred);
 539             }
 540 
 541             // return direct buffer to cache if substituted
 542             releaseBuffers();
 543 
 544             // release waiters if not already released by timeout
 545             synchronized (result) {
 546                 if (result.isDone())
 547                     return;
 548                 enableReading();
 549                 if (scatteringRead) {
 550                     result.setResult((V)Long.valueOf(bytesTransferred));
 551                 } else {
 552                     result.setResult((V)Integer.valueOf(bytesTransferred));
 553                 }
 554             }
 555             Invoker.invoke(result.handler(), result);
 556         }
 557 
 558         @Override
 559         public void failed(int error, IOException x) {
 560             // return direct buffer to cache if substituted
 561             releaseBuffers();
 562 
 563             // release waiters if not already released by timeout
 564             if (!isOpen())
 565                 x = new AsynchronousCloseException();
 566 
 567             synchronized (result) {
 568                 if (result.isDone())
 569                     return;
 570                 enableReading();
 571                 result.setFailure(x);
 572             }
 573             Invoker.invoke(result.handler(), result);
 574         }
 575 
 576         /**
 577          * Invoked if timeout expires before it is cancelled
 578          */
 579         void timeout() {
 580             // synchronize on result as the I/O could complete/fail
 581             synchronized (result) {
 582                 if (result.isDone())
 583                     return;
 584 
 585                 // kill further reading before releasing waiters
 586                 enableReading(true);
 587                 result.setFailure(new InterruptedByTimeoutException());
 588             }
 589 
 590             // invoke handler without any locks
 591             Invoker.invoke(result.handler(), result);
 592         }
 593     }
 594 
 595     @Override
 596     <V extends Number,A> Future<V> readImpl(ByteBuffer[] bufs,
 597                                             boolean scatteringRead,
 598                                             long timeout,
 599                                             TimeUnit unit,
 600                                             A attachment,
 601                                             CompletionHandler<V,? super A> handler)
 602     {
 603         // setup task
 604         PendingFuture<V,A> result =
 605             new PendingFuture<V,A>(this, handler, attachment);
 606         final ReadTask readTask = new ReadTask<V,A>(bufs, scatteringRead, result);
 607         result.setContext(readTask);
 608 
 609         // schedule timeout
 610         if (timeout > 0L) {
 611             Future<?> timeoutTask = iocp.schedule(new Runnable() {
 612                 public void run() {
 613                     readTask.timeout();
 614                 }
 615             }, timeout, unit);
 616             result.setTimeoutTask(timeoutTask);
 617         }
 618 
 619         // initiate I/O (can only be done from thread in thread pool)
 620         Invoker.invokeOnThreadInThreadPool(this, readTask);
 621         return result;
 622     }
 623 
 624     /**
 625      * Implements the task to initiate a write and the handler to consume the
 626      * result when the write completes.
 627      */
 628     private class WriteTask<V,A> implements Runnable, Iocp.ResultHandler {
 629         private final ByteBuffer[] bufs;
 630         private final int numBufs;
 631         private final boolean gatheringWrite;
 632         private final PendingFuture<V,A> result;
 633 
 634         // set by run method
 635         private ByteBuffer[] shadow;
 636 
 637         WriteTask(ByteBuffer[] bufs,
 638                   boolean gatheringWrite,
 639                   PendingFuture<V,A> result)
 640         {
 641             this.bufs = bufs;
 642             this.numBufs = (bufs.length > MAX_WSABUF) ? MAX_WSABUF : bufs.length;
 643             this.gatheringWrite = gatheringWrite;
 644             this.result = result;
 645         }
 646 
 647         /**
 648          * Invoked prior to write to prepare the WSABUF array. Where necessary,
 649          * it substitutes non-direct buffers with direct buffers.
 650          */
 651         void prepareBuffers() {
 652             shadow = new ByteBuffer[numBufs];
 653             long address = writeBufferArray;
 654             for (int i=0; i<numBufs; i++) {
 655                 ByteBuffer src = bufs[i];
 656                 int pos = src.position();
 657                 int lim = src.limit();
 658                 assert (pos <= lim);
 659                 int rem = (pos <= lim ? lim - pos : 0);
 660                 long a;
 661                 if (!(src instanceof DirectBuffer)) {
 662                     // substitute with direct buffer
 663                     ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
 664                     bb.put(src);
 665                     bb.flip();
 666                     src.position(pos);  // leave heap buffer untouched for now
 667                     shadow[i] = bb;
 668                     a = ((DirectBuffer)bb).address();
 669                 } else {
 670                     shadow[i] = src;
 671                     a = ((DirectBuffer)src).address() + pos;
 672                 }
 673                 unsafe.putAddress(address + OFFSETOF_BUF, a);
 674                 unsafe.putInt(address + OFFSETOF_LEN, rem);
 675                 address += SIZEOF_WSABUF;
 676             }
 677         }
 678 
 679         /**
 680          * Invoked after a write has completed to update the buffer positions
 681          * and release any substituted buffers.
 682          */
 683         void updateBuffers(int bytesWritten) {
 684             // Notify the buffers how many bytes were taken
 685             for (int i=0; i<numBufs; i++) {
 686                 ByteBuffer nextBuffer = bufs[i];
 687                 int pos = nextBuffer.position();
 688                 int lim = nextBuffer.limit();
 689                 int len = (pos <= lim ? lim - pos : lim);
 690                 if (bytesWritten >= len) {
 691                     bytesWritten -= len;
 692                     int newPosition = pos + len;
 693                     try {
 694                         nextBuffer.position(newPosition);
 695                     } catch (IllegalArgumentException x) {
 696                         // position changed by someone else
 697                     }
 698                 } else { // Buffers not completely filled
 699                     if (bytesWritten > 0) {
 700                         assert(pos + bytesWritten < (long)Integer.MAX_VALUE);
 701                         int newPosition = pos + bytesWritten;
 702                         try {
 703                             nextBuffer.position(newPosition);
 704                         } catch (IllegalArgumentException x) {
 705                             // position changed by someone else
 706                         }
 707                     }
 708                     break;
 709                 }
 710             }
 711         }
 712 
 713         void releaseBuffers() {
 714             for (int i=0; i<numBufs; i++) {
 715                 if (!(bufs[i] instanceof DirectBuffer)) {
 716                     Util.releaseTemporaryDirectBuffer(shadow[i]);
 717                 }
 718             }
 719         }
 720 
 721         @Override
 722         @SuppressWarnings("unchecked")
 723         public void run() {
 724             int n = -1;
 725             long overlapped = 0L;
 726             boolean prepared = false;
 727             boolean pending = false;
 728             boolean shutdown = false;
 729 
 730             try {
 731                 begin();
 732 
 733                 // substitute non-direct buffers
 734                 prepareBuffers();
 735                 prepared = true;
 736 
 737                 // get an OVERLAPPED structure (from the cache or allocate)
 738                 overlapped = ioCache.add(result);
 739 
 740                 // synchronize on result to allow this thread handle the case
 741                 // where the read completes immediately.
 742                 synchronized (result) {
 743                     n = write0(handle, numBufs, writeBufferArray, overlapped);
 744                     if (n == IOStatus.UNAVAILABLE) {
 745                         // I/O is pending
 746                         pending = true;
 747                         return;
 748                     }
 749 
 750                     enableWriting();
 751 
 752                     if (n == IOStatus.EOF) {
 753                         // special case for shutdown output
 754                         shutdown = true;
 755                         throw new ClosedChannelException();
 756                     }
 757 
 758                     // write completed immediately:
 759                     // 1. enable writing
 760                     // 2. update buffer position
 761                     // 3. release waiters
 762                     updateBuffers(n);
 763 
 764                     // result is a Long or Integer
 765                     if (gatheringWrite) {
 766                         result.setResult((V)Long.valueOf(n));
 767                     } else {
 768                         result.setResult((V)Integer.valueOf(n));
 769                     }
 770                 }
 771             } catch (Throwable x) {
 772                 enableWriting();
 773 
 774                 // failed to initiate read:
 775                 if (!shutdown && (x instanceof ClosedChannelException))
 776                     x = new AsynchronousCloseException();
 777                 if (!(x instanceof IOException))
 778                     x = new IOException(x);
 779                 result.setFailure(x);
 780 
 781                 // release resources
 782                 if (overlapped != 0L)
 783                     ioCache.remove(overlapped);
 784 
 785             } finally {
 786                 if (prepared && !pending) {
 787                     // return direct buffer(s) to cache if substituted
 788                     releaseBuffers();
 789                 }
 790                 end();
 791             }
 792 
 793             // invoke completion handler
 794             Invoker.invoke(result.handler(), result);
 795         }
 796 
 797         /**
 798          * Executed when the I/O has completed
 799          */
 800         @Override
 801         @SuppressWarnings("unchecked")
 802         public void completed(int bytesTransferred) {
 803             updateBuffers(bytesTransferred);
 804 
 805             // return direct buffer to cache if substituted
 806             releaseBuffers();
 807 
 808             // release waiters if not already released by timeout
 809             synchronized (result) {
 810                 if (result.isDone())
 811                     return;
 812                 enableWriting();
 813                 if (gatheringWrite) {
 814                     result.setResult((V)Long.valueOf(bytesTransferred));
 815                 } else {
 816                     result.setResult((V)Integer.valueOf(bytesTransferred));
 817                 }
 818             }
 819             Invoker.invoke(result.handler(), result);
 820         }
 821 
 822         @Override
 823         public void failed(int error, IOException x) {
 824             // return direct buffer to cache if substituted
 825             releaseBuffers();
 826 
 827             // release waiters if not already released by timeout
 828             if (!isOpen())
 829                 x = new AsynchronousCloseException();
 830 
 831             synchronized (result) {
 832                 if (result.isDone())
 833                     return;
 834                 enableWriting();
 835                 result.setFailure(x);
 836             }
 837             Invoker.invoke(result.handler(), result);
 838         }
 839 
 840         /**
 841          * Invoked if timeout expires before it is cancelled
 842          */
 843         void timeout() {
 844             // synchronize on result as the I/O could complete/fail
 845             synchronized (result) {
 846                 if (result.isDone())
 847                     return;
 848 
 849                 // kill further writing before releasing waiters
 850                 enableWriting(true);
 851                 result.setFailure(new InterruptedByTimeoutException());
 852             }
 853 
 854             // invoke handler without any locks
 855             Invoker.invoke(result.handler(), result);
 856         }
 857     }
 858 
 859     @Override
 860     <V extends Number,A> Future<V> writeImpl(ByteBuffer[] bufs,
 861                                              boolean gatheringWrite,
 862                                              long timeout,
 863                                              TimeUnit unit,
 864                                              A attachment,
 865                                              CompletionHandler<V,? super A> handler)
 866     {
 867         // setup task
 868         PendingFuture<V,A> result =
 869             new PendingFuture<V,A>(this, handler, attachment);
 870         final WriteTask writeTask = new WriteTask<V,A>(bufs, gatheringWrite, result);
 871         result.setContext(writeTask);
 872 
 873         // schedule timeout
 874         if (timeout > 0L) {
 875             Future<?> timeoutTask = iocp.schedule(new Runnable() {
 876                 public void run() {
 877                     writeTask.timeout();
 878                 }
 879             }, timeout, unit);
 880             result.setTimeoutTask(timeoutTask);
 881         }
 882 
 883         // initiate I/O (can only be done from thread in thread pool)
 884         Invoker.invokeOnThreadInThreadPool(this, writeTask);
 885         return result;
 886     }
 887 
 888     // -- Native methods --
 889 
 890     private static native void initIDs();
 891 
 892     private static native int connect0(long socket, boolean preferIPv6,
 893         InetAddress remote, int remotePort, long overlapped) throws IOException;
 894 
 895     private static native void updateConnectContext(long socket) throws IOException;
 896 
 897     private static native int read0(long socket, int count, long addres, long overlapped)
 898         throws IOException;
 899 
 900     private static native int write0(long socket, int count, long address,
 901         long overlapped) throws IOException;
 902 
 903     private static native void shutdown0(long socket, int how) throws IOException;
 904 
 905     private static native void closesocket0(long socket) throws IOException;
 906 
 907     static {
 908         Util.load();
 909         initIDs();
 910     }
 911 }