1 /*
   2  * Copyright 2008-2009 Sun Microsystems, Inc.  All Rights Reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Sun designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Sun in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
  22  * CA 95054 USA or visit www.sun.com if you need additional information or
  23  * have any questions.
  24  */
  25 
  26 package sun.nio.ch;
  27 
  28 import java.nio.channels.*;
  29 import java.nio.ByteBuffer;
  30 import java.nio.BufferOverflowException;
  31 import java.net.*;
  32 import java.util.concurrent.*;
  33 import java.io.IOException;
  34 import sun.misc.Unsafe;
  35 
  36 /**
  37  * Windows implementation of AsynchronousSocketChannel using overlapped I/O.
  38  */
  39 
  40 class WindowsAsynchronousSocketChannelImpl
  41     extends AsynchronousSocketChannelImpl implements Iocp.OverlappedChannel
  42 {
  43     private static final Unsafe unsafe = Unsafe.getUnsafe();
  44     private static int addressSize = unsafe.addressSize();
  45 
  46     private static int dependsArch(int value32, int value64) {
  47         return (addressSize == 4) ? value32 : value64;
  48     }
  49 
  50     /*
  51      * typedef struct _WSABUF {
  52      *     u_long      len;
  53      *     char FAR *  buf;
  54      * } WSABUF;
  55      */
  56     private static final int SIZEOF_WSABUF  = dependsArch(8, 16);
  57     private static final int OFFSETOF_LEN   = 0;
  58     private static final int OFFSETOF_BUF   = dependsArch(4, 8);
  59 
  60     // maximum vector size for scatter/gather I/O
  61     private static final int MAX_WSABUF     = 16;
  62 
  63     private static final int SIZEOF_WSABUFARRAY = MAX_WSABUF * SIZEOF_WSABUF;
  64 
  65 
  66     // socket handle. Use begin()/end() around each usage of this handle.
  67     final long handle;
  68 
  69     // I/O completion port that the socket is associated with
  70     private final Iocp iocp;
  71 
  72     // completion key to identify channel when I/O completes
  73     private final int completionKey;
  74 
  75     // Pending I/O operations are tied to an OVERLAPPED structure that can only
  76     // be released when the I/O completion event is posted to the completion
  77     // port. Where I/O operations complete immediately then it is possible
  78     // there may be more than two OVERLAPPED structures in use.
  79     private final PendingIoCache ioCache;
  80 
  81     // per-channel arrays of WSABUF structures
  82     private final long readBufferArray;
  83     private final long writeBufferArray;
  84 
  85 
  86     WindowsAsynchronousSocketChannelImpl(Iocp iocp, boolean failIfGroupShutdown)
  87         throws IOException
  88     {
  89         super(iocp);
  90 
  91         // associate socket with default completion port
  92         long h = IOUtil.fdVal(fd);
  93         int key = 0;
  94         try {
  95             key = iocp.associate(this, h);
  96         } catch (ShutdownChannelGroupException x) {
  97             if (failIfGroupShutdown) {
  98                 closesocket0(h);
  99                 throw x;
 100             }
 101         } catch (IOException x) {
 102             closesocket0(h);
 103             throw x;
 104         }
 105 
 106         this.handle = h;
 107         this.iocp = iocp;
 108         this.completionKey = key;
 109         this.ioCache = new PendingIoCache();
 110 
 111         // allocate WSABUF arrays
 112         this.readBufferArray = unsafe.allocateMemory(SIZEOF_WSABUFARRAY);
 113         this.writeBufferArray = unsafe.allocateMemory(SIZEOF_WSABUFARRAY);
 114     }
 115 
 116     WindowsAsynchronousSocketChannelImpl(Iocp iocp) throws IOException {
 117         this(iocp, true);
 118     }
 119 
 120     @Override
 121     public AsynchronousChannelGroupImpl group() {
 122         return iocp;
 123     }
 124 
 125     /**
 126      * Invoked by Iocp when an I/O operation competes.
 127      */
 128     @Override
 129     public <V,A> PendingFuture<V,A> getByOverlapped(long overlapped) {
 130         return ioCache.remove(overlapped);
 131     }
 132 
 133     // invoked by WindowsAsynchronousServerSocketChannelImpl
 134     long handle() {
 135         return handle;
 136     }
 137 
 138     // invoked by WindowsAsynchronousServerSocketChannelImpl when new connection
 139     // accept
 140     void setConnected(SocketAddress localAddress, SocketAddress remoteAddress) {
 141         synchronized (stateLock) {
 142             state = ST_CONNECTED;
 143             this.localAddress = localAddress;
 144             this.remoteAddress = remoteAddress;
 145         }
 146     }
 147 
 148     @Override
 149     void implClose() throws IOException {
 150         // close socket (may cause outstanding async I/O operations to fail).
 151         closesocket0(handle);
 152 
 153         // waits until all I/O operations have completed
 154         ioCache.close();
 155 
 156         // release arrays of WSABUF structures
 157         unsafe.freeMemory(readBufferArray);
 158         unsafe.freeMemory(writeBufferArray);
 159 
 160         // finally disassociate from the completion port (key can be 0 if
 161         // channel created when group is shutdown)
 162         if (completionKey != 0)
 163             iocp.disassociate(completionKey);
 164     }
 165 
 166     @Override
 167     public void onCancel(PendingFuture<?,?> task) {
 168         if (task.getContext() instanceof ConnectTask)
 169             killConnect();
 170         if (task.getContext() instanceof ReadTask)
 171             killReading();
 172         if (task.getContext() instanceof WriteTask)
 173             killWriting();
 174     }
 175 
 176     /**
 177      * Implements the task to initiate a connection and the handler to
 178      * consume the result when the connection is established (or fails).
 179      */
 180     private class ConnectTask<A> implements Runnable, Iocp.ResultHandler {
 181         private final InetSocketAddress remote;
 182         private final PendingFuture<Void,A> result;
 183 
 184         ConnectTask(InetSocketAddress remote, PendingFuture<Void,A> result) {
 185             this.remote = remote;
 186             this.result = result;
 187         }
 188 
 189         private void closeChannel() {
 190             try {
 191                 close();
 192             } catch (IOException ignore) { }
 193         }
 194 
 195         private IOException toIOException(Throwable x) {
 196             if (x instanceof IOException) {
 197                 if (x instanceof ClosedChannelException)
 198                     x = new AsynchronousCloseException();
 199                 return (IOException)x;
 200             }
 201             return new IOException(x);
 202         }
 203 
 204         /**
 205          * Invoke after a connection is successfully established.
 206          */
 207         private void afterConnect() throws IOException {
 208             updateConnectContext(handle);
 209             synchronized (stateLock) {
 210                 state = ST_CONNECTED;
 211                 remoteAddress = remote;
 212             }
 213         }
 214 
 215         /**
 216          * Task to initiate a connection.
 217          */
 218         @Override
 219         public void run() {
 220             long overlapped = 0L;
 221             Throwable exc = null;
 222             try {
 223                 begin();
 224 
 225                 // synchronize on result to allow this thread handle the case
 226                 // where the connection is established immediately.
 227                 synchronized (result) {
 228                     overlapped = ioCache.add(result);
 229                     // initiate the connection
 230                     int n = connect0(handle, Net.isIPv6Available(), remote.getAddress(),
 231                                      remote.getPort(), overlapped);
 232                     if (n == IOStatus.UNAVAILABLE) {
 233                         // connection is pending
 234                         return;
 235                     }
 236 
 237                     // connection established immediately
 238                     afterConnect();
 239                     result.setResult(null);
 240                 }
 241             } catch (Throwable x) {
 242                 exc = x;
 243             } finally {
 244                 end();
 245             }
 246 
 247             if (exc != null) {
 248                 if (overlapped != 0L)
 249                     ioCache.remove(overlapped);
 250                 closeChannel();
 251                 result.setFailure(toIOException(exc));
 252             }
 253             Invoker.invoke(result.handler(), result);
 254         }
 255 
 256         /**
 257          * Invoked by handler thread when connection established.
 258          */
 259         @Override
 260         public void completed(int bytesTransferred) {
 261             Throwable exc = null;
 262             try {
 263                 begin();
 264                 afterConnect();
 265                 result.setResult(null);
 266             } catch (Throwable x) {
 267                 // channel is closed or unable to finish connect
 268                 exc = x;
 269             } finally {
 270                 end();
 271             }
 272 
 273             // can't close channel while in begin/end block
 274             if (exc != null) {
 275                 closeChannel();
 276                 result.setFailure(toIOException(exc));
 277             }
 278 
 279             Invoker.invoke(result.handler(), result);
 280         }
 281 
 282         /**
 283          * Invoked by handler thread when failed to establish connection.
 284          */
 285         @Override
 286         public void failed(int error, IOException x) {
 287             if (isOpen()) {
 288                 closeChannel();
 289                 result.setFailure(x);
 290             } else {
 291                 result.setFailure(new AsynchronousCloseException());
 292             }
 293             Invoker.invoke(result.handler(), result);
 294         }
 295     }
 296 
 297     @Override
 298     public <A> Future<Void> connect(SocketAddress remote,
 299                                     A attachment,
 300                                     CompletionHandler<Void,? super A> handler)
 301     {
 302         if (!isOpen()) {
 303             CompletedFuture<Void,A> result = CompletedFuture
 304                 .withFailure(this, new ClosedChannelException(), attachment);
 305             Invoker.invoke(handler, result);
 306             return result;
 307         }
 308 
 309         InetSocketAddress isa = Net.checkAddress(remote);
 310 
 311         // permission check
 312         SecurityManager sm = System.getSecurityManager();
 313         if (sm != null)
 314             sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort());
 315 
 316         // check and update state
 317         // ConnectEx requires the socket to be bound to a local address
 318         IOException bindException = null;
 319         synchronized (stateLock) {
 320             if (state == ST_CONNECTED)
 321                 throw new AlreadyConnectedException();
 322             if (state == ST_PENDING)
 323                 throw new ConnectionPendingException();
 324             if (localAddress == null) {
 325                 try {
 326                     bind(new InetSocketAddress(0));
 327                 } catch (IOException x) {
 328                     bindException = x;
 329                 }
 330             }
 331             if (bindException == null)
 332                 state = ST_PENDING;
 333         }
 334 
 335         // handle bind failure
 336         if (bindException != null) {
 337             try {
 338                 close();
 339             } catch (IOException ignore) { }
 340             CompletedFuture<Void,A> result = CompletedFuture
 341                 .withFailure(this, bindException, attachment);
 342             Invoker.invoke(handler, result);
 343             return result;
 344         }
 345 
 346         // setup task
 347         PendingFuture<Void,A> result =
 348             new PendingFuture<Void,A>(this, handler, attachment);
 349         ConnectTask task = new ConnectTask<A>(isa, result);
 350         result.setContext(task);
 351 
 352         // initiate I/O (can only be done from thread in thread pool)
 353         Invoker.invokeOnThreadInThreadPool(this, task);
 354         return result;
 355     }
 356 
 357     /**
 358      * Implements the task to initiate a read and the handler to consume the
 359      * result when the read completes.
 360      */
 361     private class ReadTask<V,A> implements Runnable, Iocp.ResultHandler {
 362         private final ByteBuffer[] bufs;
 363         private final int numBufs;
 364         private final boolean scatteringRead;
 365         private final PendingFuture<V,A> result;
 366 
 367         // set by run method
 368         private ByteBuffer[] shadow;
 369 
 370         ReadTask(ByteBuffer[] bufs,
 371                  boolean scatteringRead,
 372                  PendingFuture<V,A> result)
 373         {
 374             this.bufs = bufs;
 375             this.numBufs = (bufs.length > MAX_WSABUF) ? MAX_WSABUF : bufs.length;
 376             this.scatteringRead = scatteringRead;
 377             this.result = result;
 378         }
 379 
 380         /**
 381          * Invoked prior to read to prepare the WSABUF array. Where necessary,
 382          * it substitutes non-direct buffers with direct buffers.
 383          */
 384         void prepareBuffers() {
 385             shadow = new ByteBuffer[numBufs];
 386             long address = readBufferArray;
 387             for (int i=0; i<numBufs; i++) {
 388                 ByteBuffer dst = bufs[i];
 389                 int pos = dst.position();
 390                 int lim = dst.limit();
 391                 assert (pos <= lim);
 392                 int rem = (pos <= lim ? lim - pos : 0);
 393                 long a;
 394                 if (!(dst instanceof DirectBuffer)) {
 395                     // substitute with direct buffer
 396                     ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
 397                     shadow[i] = bb;
 398                     a = ((DirectBuffer)bb).address();
 399                 } else {
 400                     shadow[i] = dst;
 401                     a = ((DirectBuffer)dst).address() + pos;
 402                 }
 403                 unsafe.putAddress(address + OFFSETOF_BUF, a);
 404                 unsafe.putInt(address + OFFSETOF_LEN, rem);
 405                 address += SIZEOF_WSABUF;
 406             }
 407         }
 408 
 409         /**
 410          * Invoked after a read has completed to update the buffer positions
 411          * and release any substituted buffers.
 412          */
 413         void updateBuffers(int bytesRead) {
 414             for (int i=0; i<numBufs; i++) {
 415                 ByteBuffer nextBuffer = shadow[i];
 416                 int pos = nextBuffer.position();
 417                 int len = nextBuffer.remaining();
 418                 if (bytesRead >= len) {
 419                     bytesRead -= len;
 420                     int newPosition = pos + len;
 421                     try {
 422                         nextBuffer.position(newPosition);
 423                     } catch (IllegalArgumentException x) {
 424                         // position changed by another
 425                     }
 426                 } else { // Buffers not completely filled
 427                     if (bytesRead > 0) {
 428                         assert(pos + bytesRead < (long)Integer.MAX_VALUE);
 429                         int newPosition = pos + bytesRead;
 430                         try {
 431                             nextBuffer.position(newPosition);
 432                         } catch (IllegalArgumentException x) {
 433                             // position changed by another
 434                         }
 435                     }
 436                     break;
 437                 }
 438             }
 439 
 440             // Put results from shadow into the slow buffers
 441             for (int i=0; i<numBufs; i++) {
 442                 if (!(bufs[i] instanceof DirectBuffer)) {
 443                     shadow[i].flip();
 444                     try {
 445                         bufs[i].put(shadow[i]);
 446                     } catch (BufferOverflowException x) {
 447                         // position changed by another
 448                     }
 449                 }
 450             }
 451         }
 452 
 453         void releaseBuffers() {
 454             for (int i=0; i<numBufs; i++) {
 455                 if (!(bufs[i] instanceof DirectBuffer)) {
 456                     Util.releaseTemporaryDirectBuffer(shadow[i]);
 457                 }
 458             }
 459         }
 460 
 461         @Override
 462         @SuppressWarnings("unchecked")
 463         public void run() {
 464             long overlapped = 0L;
 465             boolean prepared = false;
 466             boolean pending = false;
 467 
 468             try {
 469                 begin();
 470 
 471                 // substitute non-direct buffers
 472                 prepareBuffers();
 473                 prepared = true;
 474 
 475                 // get an OVERLAPPED structure (from the cache or allocate)
 476                 overlapped = ioCache.add(result);
 477 
 478                 // initiate read
 479                 int n = read0(handle, numBufs, readBufferArray, overlapped);
 480                 if (n == IOStatus.UNAVAILABLE) {
 481                     // I/O is pending
 482                     pending = true;
 483                     return;
 484                 }
 485                 if (n == IOStatus.EOF) {
 486                     // input shutdown
 487                     enableReading();
 488                     if (scatteringRead) {
 489                         result.setResult((V)Long.valueOf(-1L));
 490                     } else {
 491                         result.setResult((V)Integer.valueOf(-1));
 492                     }
 493                 } else {
 494                     throw new InternalError("Read completed immediately");
 495                 }
 496             } catch (Throwable x) {
 497                 // failed to initiate read
 498                 // reset read flag before releasing waiters
 499                 enableReading();
 500                 if (x instanceof ClosedChannelException)
 501                     x = new AsynchronousCloseException();
 502                 if (!(x instanceof IOException))
 503                     x = new IOException(x);
 504                 result.setFailure(x);
 505             } finally {
 506                 // release resources if I/O not pending
 507                 if (!pending) {
 508                     if (overlapped != 0L)
 509                         ioCache.remove(overlapped);
 510                     if (prepared)
 511                         releaseBuffers();
 512                 }
 513                 end();
 514             }
 515 
 516             // invoke completion handler
 517             Invoker.invoke(result.handler(), result);
 518         }
 519 
 520         /**
 521          * Executed when the I/O has completed
 522          */
 523         @Override
 524         @SuppressWarnings("unchecked")
 525         public void completed(int bytesTransferred) {
 526             if (bytesTransferred == 0) {
 527                 bytesTransferred = -1;  // EOF
 528             } else {
 529                 updateBuffers(bytesTransferred);
 530             }
 531 
 532             // return direct buffer to cache if substituted
 533             releaseBuffers();
 534 
 535             // release waiters if not already released by timeout
 536             synchronized (result) {
 537                 if (result.isDone())
 538                     return;
 539                 enableReading();
 540                 if (scatteringRead) {
 541                     result.setResult((V)Long.valueOf(bytesTransferred));
 542                 } else {
 543                     result.setResult((V)Integer.valueOf(bytesTransferred));
 544                 }
 545             }
 546             Invoker.invoke(result.handler(), result);
 547         }
 548 
 549         @Override
 550         public void failed(int error, IOException x) {
 551             // return direct buffer to cache if substituted
 552             releaseBuffers();
 553 
 554             // release waiters if not already released by timeout
 555             if (!isOpen())
 556                 x = new AsynchronousCloseException();
 557 
 558             synchronized (result) {
 559                 if (result.isDone())
 560                     return;
 561                 enableReading();
 562                 result.setFailure(x);
 563             }
 564             Invoker.invoke(result.handler(), result);
 565         }
 566 
 567         /**
 568          * Invoked if timeout expires before it is cancelled
 569          */
 570         void timeout() {
 571             // synchronize on result as the I/O could complete/fail
 572             synchronized (result) {
 573                 if (result.isDone())
 574                     return;
 575 
 576                 // kill further reading before releasing waiters
 577                 enableReading(true);
 578                 result.setFailure(new InterruptedByTimeoutException());
 579             }
 580 
 581             // invoke handler without any locks
 582             Invoker.invoke(result.handler(), result);
 583         }
 584     }
 585 
 586     @Override
 587     <V extends Number,A> Future<V> readImpl(ByteBuffer[] bufs,
 588                                             boolean scatteringRead,
 589                                             long timeout,
 590                                             TimeUnit unit,
 591                                             A attachment,
 592                                             CompletionHandler<V,? super A> handler)
 593     {
 594         // setup task
 595         PendingFuture<V,A> result =
 596             new PendingFuture<V,A>(this, handler, attachment);
 597         final ReadTask readTask = new ReadTask<V,A>(bufs, scatteringRead, result);
 598         result.setContext(readTask);
 599 
 600         // schedule timeout
 601         if (timeout > 0L) {
 602             Future<?> timeoutTask = iocp.schedule(new Runnable() {
 603                 public void run() {
 604                     readTask.timeout();
 605                 }
 606             }, timeout, unit);
 607             result.setTimeoutTask(timeoutTask);
 608         }
 609 
 610         // initiate I/O (can only be done from thread in thread pool)
 611         Invoker.invokeOnThreadInThreadPool(this, readTask);
 612         return result;
 613     }
 614 
 615     /**
 616      * Implements the task to initiate a write and the handler to consume the
 617      * result when the write completes.
 618      */
 619     private class WriteTask<V,A> implements Runnable, Iocp.ResultHandler {
 620         private final ByteBuffer[] bufs;
 621         private final int numBufs;
 622         private final boolean gatheringWrite;
 623         private final PendingFuture<V,A> result;
 624 
 625         // set by run method
 626         private ByteBuffer[] shadow;
 627 
 628         WriteTask(ByteBuffer[] bufs,
 629                   boolean gatheringWrite,
 630                   PendingFuture<V,A> result)
 631         {
 632             this.bufs = bufs;
 633             this.numBufs = (bufs.length > MAX_WSABUF) ? MAX_WSABUF : bufs.length;
 634             this.gatheringWrite = gatheringWrite;
 635             this.result = result;
 636         }
 637 
 638         /**
 639          * Invoked prior to write to prepare the WSABUF array. Where necessary,
 640          * it substitutes non-direct buffers with direct buffers.
 641          */
 642         void prepareBuffers() {
 643             shadow = new ByteBuffer[numBufs];
 644             long address = writeBufferArray;
 645             for (int i=0; i<numBufs; i++) {
 646                 ByteBuffer src = bufs[i];
 647                 int pos = src.position();
 648                 int lim = src.limit();
 649                 assert (pos <= lim);
 650                 int rem = (pos <= lim ? lim - pos : 0);
 651                 long a;
 652                 if (!(src instanceof DirectBuffer)) {
 653                     // substitute with direct buffer
 654                     ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
 655                     bb.put(src);
 656                     bb.flip();
 657                     src.position(pos);  // leave heap buffer untouched for now
 658                     shadow[i] = bb;
 659                     a = ((DirectBuffer)bb).address();
 660                 } else {
 661                     shadow[i] = src;
 662                     a = ((DirectBuffer)src).address() + pos;
 663                 }
 664                 unsafe.putAddress(address + OFFSETOF_BUF, a);
 665                 unsafe.putInt(address + OFFSETOF_LEN, rem);
 666                 address += SIZEOF_WSABUF;
 667             }
 668         }
 669 
 670         /**
 671          * Invoked after a write has completed to update the buffer positions
 672          * and release any substituted buffers.
 673          */
 674         void updateBuffers(int bytesWritten) {
 675             // Notify the buffers how many bytes were taken
 676             for (int i=0; i<numBufs; i++) {
 677                 ByteBuffer nextBuffer = bufs[i];
 678                 int pos = nextBuffer.position();
 679                 int lim = nextBuffer.limit();
 680                 int len = (pos <= lim ? lim - pos : lim);
 681                 if (bytesWritten >= len) {
 682                     bytesWritten -= len;
 683                     int newPosition = pos + len;
 684                     try {
 685                         nextBuffer.position(newPosition);
 686                     } catch (IllegalArgumentException x) {
 687                         // position changed by someone else
 688                     }
 689                 } else { // Buffers not completely filled
 690                     if (bytesWritten > 0) {
 691                         assert(pos + bytesWritten < (long)Integer.MAX_VALUE);
 692                         int newPosition = pos + bytesWritten;
 693                         try {
 694                             nextBuffer.position(newPosition);
 695                         } catch (IllegalArgumentException x) {
 696                             // position changed by someone else
 697                         }
 698                     }
 699                     break;
 700                 }
 701             }
 702         }
 703 
 704         void releaseBuffers() {
 705             for (int i=0; i<numBufs; i++) {
 706                 if (!(bufs[i] instanceof DirectBuffer)) {
 707                     Util.releaseTemporaryDirectBuffer(shadow[i]);
 708                 }
 709             }
 710         }
 711 
 712         @Override
 713         @SuppressWarnings("unchecked")
 714         public void run() {
 715             long overlapped = 0L;
 716             boolean prepared = false;
 717             boolean pending = false;
 718             boolean shutdown = false;
 719 
 720             try {
 721                 begin();
 722 
 723                 // substitute non-direct buffers
 724                 prepareBuffers();
 725                 prepared = true;
 726 
 727                 // get an OVERLAPPED structure (from the cache or allocate)
 728                 overlapped = ioCache.add(result);
 729                 int n = write0(handle, numBufs, writeBufferArray, overlapped);
 730                 if (n == IOStatus.UNAVAILABLE) {
 731                     // I/O is pending
 732                     pending = true;
 733                     return;
 734                 }
 735                 if (n == IOStatus.EOF) {
 736                     // special case for shutdown output
 737                     shutdown = true;
 738                     throw new ClosedChannelException();
 739                 }
 740                 // write completed immediately
 741                 throw new InternalError("Write completed immediately");
 742             } catch (Throwable x) {
 743                 // write failed. Enable writing before releasing waiters.
 744                 enableWriting();
 745                 if (!shutdown && (x instanceof ClosedChannelException))
 746                     x = new AsynchronousCloseException();
 747                 if (!(x instanceof IOException))
 748                     x = new IOException(x);
 749                 result.setFailure(x);
 750             } finally {
 751                 // release resources if I/O not pending
 752                 if (!pending) {
 753                     if (overlapped != 0L)
 754                         ioCache.remove(overlapped);
 755                     if (prepared)
 756                         releaseBuffers();
 757                 }
 758                 end();
 759             }
 760 
 761             // invoke completion handler
 762             Invoker.invoke(result.handler(), result);
 763         }
 764 
 765         /**
 766          * Executed when the I/O has completed
 767          */
 768         @Override
 769         @SuppressWarnings("unchecked")
 770         public void completed(int bytesTransferred) {
 771             updateBuffers(bytesTransferred);
 772 
 773             // return direct buffer to cache if substituted
 774             releaseBuffers();
 775 
 776             // release waiters if not already released by timeout
 777             synchronized (result) {
 778                 if (result.isDone())
 779                     return;
 780                 enableWriting();
 781                 if (gatheringWrite) {
 782                     result.setResult((V)Long.valueOf(bytesTransferred));
 783                 } else {
 784                     result.setResult((V)Integer.valueOf(bytesTransferred));
 785                 }
 786             }
 787             Invoker.invoke(result.handler(), result);
 788         }
 789 
 790         @Override
 791         public void failed(int error, IOException x) {
 792             // return direct buffer to cache if substituted
 793             releaseBuffers();
 794 
 795             // release waiters if not already released by timeout
 796             if (!isOpen())
 797                 x = new AsynchronousCloseException();
 798 
 799             synchronized (result) {
 800                 if (result.isDone())
 801                     return;
 802                 enableWriting();
 803                 result.setFailure(x);
 804             }
 805             Invoker.invoke(result.handler(), result);
 806         }
 807 
 808         /**
 809          * Invoked if timeout expires before it is cancelled
 810          */
 811         void timeout() {
 812             // synchronize on result as the I/O could complete/fail
 813             synchronized (result) {
 814                 if (result.isDone())
 815                     return;
 816 
 817                 // kill further writing before releasing waiters
 818                 enableWriting(true);
 819                 result.setFailure(new InterruptedByTimeoutException());
 820             }
 821 
 822             // invoke handler without any locks
 823             Invoker.invoke(result.handler(), result);
 824         }
 825     }
 826 
 827     @Override
 828     <V extends Number,A> Future<V> writeImpl(ByteBuffer[] bufs,
 829                                              boolean gatheringWrite,
 830                                              long timeout,
 831                                              TimeUnit unit,
 832                                              A attachment,
 833                                              CompletionHandler<V,? super A> handler)
 834     {
 835         // setup task
 836         PendingFuture<V,A> result =
 837             new PendingFuture<V,A>(this, handler, attachment);
 838         final WriteTask writeTask = new WriteTask<V,A>(bufs, gatheringWrite, result);
 839         result.setContext(writeTask);
 840 
 841         // schedule timeout
 842         if (timeout > 0L) {
 843             Future<?> timeoutTask = iocp.schedule(new Runnable() {
 844                 public void run() {
 845                     writeTask.timeout();
 846                 }
 847             }, timeout, unit);
 848             result.setTimeoutTask(timeoutTask);
 849         }
 850 
 851         // initiate I/O (can only be done from thread in thread pool)
 852         Invoker.invokeOnThreadInThreadPool(this, writeTask);
 853         return result;
 854     }
 855 
 856     // -- Native methods --
 857 
 858     private static native void initIDs();
 859 
 860     private static native int connect0(long socket, boolean preferIPv6,
 861         InetAddress remote, int remotePort, long overlapped) throws IOException;
 862 
 863     private static native void updateConnectContext(long socket) throws IOException;
 864 
 865     private static native int read0(long socket, int count, long addres, long overlapped)
 866         throws IOException;
 867 
 868     private static native int write0(long socket, int count, long address,
 869         long overlapped) throws IOException;
 870 
 871     private static native void shutdown0(long socket, int how) throws IOException;
 872 
 873     private static native void closesocket0(long socket) throws IOException;
 874 
 875     static {
 876         Util.load();
 877         initIDs();
 878     }
 879 }