1 /*
   2  * Copyright (c) 2008, 2011, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package sun.nio.ch;
  27 
  28 import java.nio.channels.*;
  29 import java.nio.ByteBuffer;
  30 import java.nio.BufferOverflowException;
  31 import java.net.*;
  32 import java.util.concurrent.*;
  33 import java.io.IOException;
  34 import sun.misc.Unsafe;
  35 
  36 /**
  37  * Windows implementation of AsynchronousSocketChannel using overlapped I/O.
  38  */
  39 
  40 class WindowsAsynchronousSocketChannelImpl
  41     extends AsynchronousSocketChannelImpl implements Iocp.OverlappedChannel
  42 {
  43     private static final Unsafe unsafe = Unsafe.getUnsafe();
  44     private static int addressSize = unsafe.addressSize();
  45 
  46     private static int dependsArch(int value32, int value64) {
  47         return (addressSize == 4) ? value32 : value64;
  48     }
  49 
  50     /*
  51      * typedef struct _WSABUF {
  52      *     u_long      len;
  53      *     char FAR *  buf;
  54      * } WSABUF;
  55      */
  56     private static final int SIZEOF_WSABUF  = dependsArch(8, 16);
  57     private static final int OFFSETOF_LEN   = 0;
  58     private static final int OFFSETOF_BUF   = dependsArch(4, 8);
  59 
  60     // maximum vector size for scatter/gather I/O
  61     private static final int MAX_WSABUF     = 16;
  62 
  63     private static final int SIZEOF_WSABUFARRAY = MAX_WSABUF * SIZEOF_WSABUF;
  64 
  65 
  66     // socket handle. Use begin()/end() around each usage of this handle.
  67     final long handle;
  68 
  69     // I/O completion port that the socket is associated with
  70     private final Iocp iocp;
  71 
  72     // completion key to identify channel when I/O completes
  73     private final int completionKey;
  74 
  75     // Pending I/O operations are tied to an OVERLAPPED structure that can only
  76     // be released when the I/O completion event is posted to the completion
  77     // port. Where I/O operations complete immediately then it is possible
  78     // there may be more than two OVERLAPPED structures in use.
  79     private final PendingIoCache ioCache;
  80 
  81     // per-channel arrays of WSABUF structures
  82     private final long readBufferArray;
  83     private final long writeBufferArray;
  84 
  85 
  86     WindowsAsynchronousSocketChannelImpl(Iocp iocp, boolean failIfGroupShutdown)
  87         throws IOException
  88     {
  89         super(iocp);
  90 
  91         // associate socket with default completion port
  92         long h = IOUtil.fdVal(fd);
  93         int key = 0;
  94         try {
  95             key = iocp.associate(this, h);
  96         } catch (ShutdownChannelGroupException x) {
  97             if (failIfGroupShutdown) {
  98                 closesocket0(h);
  99                 throw x;
 100             }
 101         } catch (IOException x) {
 102             closesocket0(h);
 103             throw x;
 104         }
 105 
 106         this.handle = h;
 107         this.iocp = iocp;
 108         this.completionKey = key;
 109         this.ioCache = new PendingIoCache();
 110 
 111         // allocate WSABUF arrays
 112         this.readBufferArray = unsafe.allocateMemory(SIZEOF_WSABUFARRAY);
 113         this.writeBufferArray = unsafe.allocateMemory(SIZEOF_WSABUFARRAY);
 114     }
 115 
 116     WindowsAsynchronousSocketChannelImpl(Iocp iocp) throws IOException {
 117         this(iocp, true);
 118     }
 119 
 120     @Override
 121     public AsynchronousChannelGroupImpl group() {
 122         return iocp;
 123     }
 124 
 125     /**
 126      * Invoked by Iocp when an I/O operation competes.
 127      */
 128     @Override
 129     public <V,A> PendingFuture<V,A> getByOverlapped(long overlapped) {
 130         return ioCache.remove(overlapped);
 131     }
 132 
 133     // invoked by WindowsAsynchronousServerSocketChannelImpl
 134     long handle() {
 135         return handle;
 136     }
 137 
 138     // invoked by WindowsAsynchronousServerSocketChannelImpl when new connection
 139     // accept
 140     void setConnected(SocketAddress localAddress, SocketAddress remoteAddress) {
 141         synchronized (stateLock) {
 142             state = ST_CONNECTED;
 143             this.localAddress = localAddress;
 144             this.remoteAddress = remoteAddress;
 145         }
 146     }
 147 
 148     @Override
 149     void implClose() throws IOException {
 150         // close socket (may cause outstanding async I/O operations to fail).
 151         closesocket0(handle);
 152 
 153         // waits until all I/O operations have completed
 154         ioCache.close();
 155 
 156         // release arrays of WSABUF structures
 157         unsafe.freeMemory(readBufferArray);
 158         unsafe.freeMemory(writeBufferArray);
 159 
 160         // finally disassociate from the completion port (key can be 0 if
 161         // channel created when group is shutdown)
 162         if (completionKey != 0)
 163             iocp.disassociate(completionKey);
 164     }
 165 
 166     @Override
 167     public void onCancel(PendingFuture<?,?> task) {
 168         if (task.getContext() instanceof ConnectTask)
 169             killConnect();
 170         if (task.getContext() instanceof ReadTask)
 171             killReading();
 172         if (task.getContext() instanceof WriteTask)
 173             killWriting();
 174     }
 175 
 176     /**
 177      * Implements the task to initiate a connection and the handler to
 178      * consume the result when the connection is established (or fails).
 179      */
 180     private class ConnectTask<A> implements Runnable, Iocp.ResultHandler {
 181         private final InetSocketAddress remote;
 182         private final PendingFuture<Void,A> result;
 183 
 184         ConnectTask(InetSocketAddress remote, PendingFuture<Void,A> result) {
 185             this.remote = remote;
 186             this.result = result;
 187         }
 188 
 189         private void closeChannel() {
 190             try {
 191                 close();
 192             } catch (IOException ignore) { }
 193         }
 194 
 195         private IOException toIOException(Throwable x) {
 196             if (x instanceof IOException) {
 197                 if (x instanceof ClosedChannelException)
 198                     x = new AsynchronousCloseException();
 199                 return (IOException)x;
 200             }
 201             return new IOException(x);
 202         }
 203 
 204         /**
 205          * Invoke after a connection is successfully established.
 206          */
 207         private void afterConnect() throws IOException {
 208             updateConnectContext(handle);
 209             synchronized (stateLock) {
 210                 state = ST_CONNECTED;
 211                 remoteAddress = remote;
 212             }
 213         }
 214 
 215         /**
 216          * Task to initiate a connection.
 217          */
 218         @Override
 219         public void run() {
 220             long overlapped = 0L;
 221             Throwable exc = null;
 222             try {
 223                 begin();
 224 
 225                 // synchronize on result to allow this thread handle the case
 226                 // where the connection is established immediately.
 227                 synchronized (result) {
 228                     overlapped = ioCache.add(result);
 229                     // initiate the connection
 230                     int n = connect0(handle, Net.isIPv6Available(), remote.getAddress(),
 231                                      remote.getPort(), overlapped);
 232                     if (n == IOStatus.UNAVAILABLE) {
 233                         // connection is pending
 234                         return;
 235                     }
 236 
 237                     // connection established immediately
 238                     afterConnect();
 239                     result.setResult(null);
 240                 }
 241             } catch (Throwable x) {
 242                 if (overlapped != 0L)
 243                     ioCache.remove(overlapped);
 244                 exc = x;
 245             } finally {
 246                 end();
 247             }
 248 
 249             if (exc != null) {
 250                 closeChannel();
 251                 result.setFailure(toIOException(exc));
 252             }
 253             Invoker.invoke(result);
 254         }
 255 
 256         /**
 257          * Invoked by handler thread when connection established.
 258          */
 259         @Override
 260         public void completed(int bytesTransferred, boolean canInvokeDirect) {
 261             Throwable exc = null;
 262             try {
 263                 begin();
 264                 afterConnect();
 265                 result.setResult(null);
 266             } catch (Throwable x) {
 267                 // channel is closed or unable to finish connect
 268                 exc = x;
 269             } finally {
 270                 end();
 271             }
 272 
 273             // can't close channel while in begin/end block
 274             if (exc != null) {
 275                 closeChannel();
 276                 result.setFailure(toIOException(exc));
 277             }
 278 
 279             if (canInvokeDirect) {
 280                 Invoker.invokeUnchecked(result);
 281             } else {
 282                 Invoker.invoke(result);
 283             }
 284         }
 285 
 286         /**
 287          * Invoked by handler thread when failed to establish connection.
 288          */
 289         @Override
 290         public void failed(int error, IOException x) {
 291             if (isOpen()) {
 292                 closeChannel();
 293                 result.setFailure(x);
 294             } else {
 295                 result.setFailure(new AsynchronousCloseException());
 296             }
 297             Invoker.invoke(result);
 298         }
 299     }
 300 
 301     @Override
 302     <A> Future<Void> implConnect(SocketAddress remote,
 303                                  A attachment,
 304                                  CompletionHandler<Void,? super A> handler)
 305     {
 306         if (!isOpen()) {
 307             Throwable exc = new ClosedChannelException();
 308             if (handler == null)
 309                 return CompletedFuture.withFailure(exc);
 310             Invoker.invoke(this, handler, attachment, null, exc);
 311             return null;
 312         }
 313 
 314         InetSocketAddress isa = Net.checkAddress(remote);
 315 
 316         // permission check
 317         SecurityManager sm = System.getSecurityManager();
 318         if (sm != null)
 319             sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort());
 320 
 321         // check and update state
 322         // ConnectEx requires the socket to be bound to a local address
 323         IOException bindException = null;
 324         synchronized (stateLock) {
 325             if (state == ST_CONNECTED)
 326                 throw new AlreadyConnectedException();
 327             if (state == ST_PENDING)
 328                 throw new ConnectionPendingException();
 329             if (localAddress == null) {
 330                 try {
 331                     bind(new InetSocketAddress(0));
 332                 } catch (IOException x) {
 333                     bindException = x;
 334                 }
 335             }
 336             if (bindException == null)
 337                 state = ST_PENDING;
 338         }
 339 
 340         // handle bind failure
 341         if (bindException != null) {
 342             try {
 343                 close();
 344             } catch (IOException ignore) { }
 345             if (handler == null)
 346                 return CompletedFuture.withFailure(bindException);
 347             Invoker.invoke(this, handler, attachment, null, bindException);
 348             return null;
 349         }
 350 
 351         // setup task
 352         PendingFuture<Void,A> result =
 353             new PendingFuture<Void,A>(this, handler, attachment);
 354         ConnectTask task = new ConnectTask<A>(isa, result);
 355         result.setContext(task);
 356 
 357         // initiate I/O
 358         if (Iocp.supportsThreadAgnosticIo()) {
 359             task.run();
 360         } else {
 361             Invoker.invokeOnThreadInThreadPool(this, task);
 362         }
 363         return result;
 364     }
 365 
 366     /**
 367      * Implements the task to initiate a read and the handler to consume the
 368      * result when the read completes.
 369      */
 370     private class ReadTask<V,A> implements Runnable, Iocp.ResultHandler {
 371         private final ByteBuffer[] bufs;
 372         private final int numBufs;
 373         private final boolean scatteringRead;
 374         private final PendingFuture<V,A> result;
 375 
 376         // set by run method
 377         private ByteBuffer[] shadow;
 378 
 379         ReadTask(ByteBuffer[] bufs,
 380                  boolean scatteringRead,
 381                  PendingFuture<V,A> result)
 382         {
 383             this.bufs = bufs;
 384             this.numBufs = (bufs.length > MAX_WSABUF) ? MAX_WSABUF : bufs.length;
 385             this.scatteringRead = scatteringRead;
 386             this.result = result;
 387         }
 388 
 389         /**
 390          * Invoked prior to read to prepare the WSABUF array. Where necessary,
 391          * it substitutes non-direct buffers with direct buffers.
 392          */
 393         void prepareBuffers() {
 394             shadow = new ByteBuffer[numBufs];
 395             long address = readBufferArray;
 396             for (int i=0; i<numBufs; i++) {
 397                 ByteBuffer dst = bufs[i];
 398                 int pos = dst.position();
 399                 int lim = dst.limit();
 400                 assert (pos <= lim);
 401                 int rem = (pos <= lim ? lim - pos : 0);
 402                 long a;
 403                 if (!(dst instanceof DirectBuffer)) {
 404                     // substitute with direct buffer
 405                     ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
 406                     shadow[i] = bb;
 407                     a = ((DirectBuffer)bb).address();
 408                 } else {
 409                     shadow[i] = dst;
 410                     a = ((DirectBuffer)dst).address() + pos;
 411                 }
 412                 unsafe.putAddress(address + OFFSETOF_BUF, a);
 413                 unsafe.putInt(address + OFFSETOF_LEN, rem);
 414                 address += SIZEOF_WSABUF;
 415             }
 416         }
 417 
 418         /**
 419          * Invoked after a read has completed to update the buffer positions
 420          * and release any substituted buffers.
 421          */
 422         void updateBuffers(int bytesRead) {
 423             for (int i=0; i<numBufs; i++) {
 424                 ByteBuffer nextBuffer = shadow[i];
 425                 int pos = nextBuffer.position();
 426                 int len = nextBuffer.remaining();
 427                 if (bytesRead >= len) {
 428                     bytesRead -= len;
 429                     int newPosition = pos + len;
 430                     try {
 431                         nextBuffer.position(newPosition);
 432                     } catch (IllegalArgumentException x) {
 433                         // position changed by another
 434                     }
 435                 } else { // Buffers not completely filled
 436                     if (bytesRead > 0) {
 437                         assert(pos + bytesRead < (long)Integer.MAX_VALUE);
 438                         int newPosition = pos + bytesRead;
 439                         try {
 440                             nextBuffer.position(newPosition);
 441                         } catch (IllegalArgumentException x) {
 442                             // position changed by another
 443                         }
 444                     }
 445                     break;
 446                 }
 447             }
 448 
 449             // Put results from shadow into the slow buffers
 450             for (int i=0; i<numBufs; i++) {
 451                 if (!(bufs[i] instanceof DirectBuffer)) {
 452                     shadow[i].flip();
 453                     try {
 454                         bufs[i].put(shadow[i]);
 455                     } catch (BufferOverflowException x) {
 456                         // position changed by another
 457                     }
 458                 }
 459             }
 460         }
 461 
 462         void releaseBuffers() {
 463             for (int i=0; i<numBufs; i++) {
 464                 if (!(bufs[i] instanceof DirectBuffer)) {
 465                     Util.releaseTemporaryDirectBuffer(shadow[i]);
 466                 }
 467             }
 468         }
 469 
 470         @Override
 471         @SuppressWarnings("unchecked")
 472         public void run() {
 473             long overlapped = 0L;
 474             boolean prepared = false;
 475             boolean pending = false;
 476 
 477             try {
 478                 begin();
 479 
 480                 // substitute non-direct buffers
 481                 prepareBuffers();
 482                 prepared = true;
 483 
 484                 // get an OVERLAPPED structure (from the cache or allocate)
 485                 overlapped = ioCache.add(result);
 486 
 487                 // initiate read
 488                 int n = read0(handle, numBufs, readBufferArray, overlapped);
 489                 if (n == IOStatus.UNAVAILABLE) {
 490                     // I/O is pending
 491                     pending = true;
 492                     return;
 493                 }
 494                 if (n == IOStatus.EOF) {
 495                     // input shutdown
 496                     enableReading();
 497                     if (scatteringRead) {
 498                         result.setResult((V)Long.valueOf(-1L));
 499                     } else {
 500                         result.setResult((V)Integer.valueOf(-1));
 501                     }
 502                 } else {
 503                     throw new InternalError("Read completed immediately");
 504                 }
 505             } catch (Throwable x) {
 506                 // failed to initiate read
 507                 // reset read flag before releasing waiters
 508                 enableReading();
 509                 if (x instanceof ClosedChannelException)
 510                     x = new AsynchronousCloseException();
 511                 if (!(x instanceof IOException))
 512                     x = new IOException(x);
 513                 result.setFailure(x);
 514             } finally {
 515                 // release resources if I/O not pending
 516                 if (!pending) {
 517                     if (overlapped != 0L)
 518                         ioCache.remove(overlapped);
 519                     if (prepared)
 520                         releaseBuffers();
 521                 }
 522                 end();
 523             }
 524 
 525             // invoke completion handler
 526             Invoker.invoke(result);
 527         }
 528 
 529         /**
 530          * Executed when the I/O has completed
 531          */
 532         @Override
 533         @SuppressWarnings("unchecked")
 534         public void completed(int bytesTransferred, boolean canInvokeDirect) {
 535             if (bytesTransferred == 0) {
 536                 bytesTransferred = -1;  // EOF
 537             } else {
 538                 updateBuffers(bytesTransferred);
 539             }
 540 
 541             // return direct buffer to cache if substituted
 542             releaseBuffers();
 543 
 544             // release waiters if not already released by timeout
 545             synchronized (result) {
 546                 if (result.isDone())
 547                     return;
 548                 enableReading();
 549                 if (scatteringRead) {
 550                     result.setResult((V)Long.valueOf(bytesTransferred));
 551                 } else {
 552                     result.setResult((V)Integer.valueOf(bytesTransferred));
 553                 }
 554             }
 555             if (canInvokeDirect) {
 556                 Invoker.invokeUnchecked(result);
 557             } else {
 558                 Invoker.invoke(result);
 559             }
 560         }
 561 
 562         @Override
 563         public void failed(int error, IOException x) {
 564             // return direct buffer to cache if substituted
 565             releaseBuffers();
 566 
 567             // release waiters if not already released by timeout
 568             if (!isOpen())
 569                 x = new AsynchronousCloseException();
 570 
 571             synchronized (result) {
 572                 if (result.isDone())
 573                     return;
 574                 enableReading();
 575                 result.setFailure(x);
 576             }
 577             Invoker.invoke(result);
 578         }
 579 
 580         /**
 581          * Invoked if timeout expires before it is cancelled
 582          */
 583         void timeout() {
 584             // synchronize on result as the I/O could complete/fail
 585             synchronized (result) {
 586                 if (result.isDone())
 587                     return;
 588 
 589                 // kill further reading before releasing waiters
 590                 enableReading(true);
 591                 result.setFailure(new InterruptedByTimeoutException());
 592             }
 593 
 594             // invoke handler without any locks
 595             Invoker.invoke(result);
 596         }
 597     }
 598 
 599     @Override
 600     <V extends Number,A> Future<V> implRead(boolean isScatteringRead,
 601                                             ByteBuffer dst,
 602                                             ByteBuffer[] dsts,
 603                                             long timeout,
 604                                             TimeUnit unit,
 605                                             A attachment,
 606                                             CompletionHandler<V,? super A> handler)
 607     {
 608         // setup task
 609         PendingFuture<V,A> result =
 610             new PendingFuture<V,A>(this, handler, attachment);
 611         ByteBuffer[] bufs;
 612         if (isScatteringRead) {
 613             bufs = dsts;
 614         } else {
 615             bufs = new ByteBuffer[1];
 616             bufs[0] = dst;
 617         }
 618         final ReadTask readTask = new ReadTask<V,A>(bufs, isScatteringRead, result);
 619         result.setContext(readTask);
 620 
 621         // schedule timeout
 622         if (timeout > 0L) {
 623             Future<?> timeoutTask = iocp.schedule(new Runnable() {
 624                 public void run() {
 625                     readTask.timeout();
 626                 }
 627             }, timeout, unit);
 628             result.setTimeoutTask(timeoutTask);
 629         }
 630 
 631         // initiate I/O
 632         if (Iocp.supportsThreadAgnosticIo()) {
 633             readTask.run();
 634         } else {
 635             Invoker.invokeOnThreadInThreadPool(this, readTask);
 636         }
 637         return result;
 638     }
 639 
 640     /**
 641      * Implements the task to initiate a write and the handler to consume the
 642      * result when the write completes.
 643      */
 644     private class WriteTask<V,A> implements Runnable, Iocp.ResultHandler {
 645         private final ByteBuffer[] bufs;
 646         private final int numBufs;
 647         private final boolean gatheringWrite;
 648         private final PendingFuture<V,A> result;
 649 
 650         // set by run method
 651         private ByteBuffer[] shadow;
 652 
 653         WriteTask(ByteBuffer[] bufs,
 654                   boolean gatheringWrite,
 655                   PendingFuture<V,A> result)
 656         {
 657             this.bufs = bufs;
 658             this.numBufs = (bufs.length > MAX_WSABUF) ? MAX_WSABUF : bufs.length;
 659             this.gatheringWrite = gatheringWrite;
 660             this.result = result;
 661         }
 662 
 663         /**
 664          * Invoked prior to write to prepare the WSABUF array. Where necessary,
 665          * it substitutes non-direct buffers with direct buffers.
 666          */
 667         void prepareBuffers() {
 668             shadow = new ByteBuffer[numBufs];
 669             long address = writeBufferArray;
 670             for (int i=0; i<numBufs; i++) {
 671                 ByteBuffer src = bufs[i];
 672                 int pos = src.position();
 673                 int lim = src.limit();
 674                 assert (pos <= lim);
 675                 int rem = (pos <= lim ? lim - pos : 0);
 676                 long a;
 677                 if (!(src instanceof DirectBuffer)) {
 678                     // substitute with direct buffer
 679                     ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
 680                     bb.put(src);
 681                     bb.flip();
 682                     src.position(pos);  // leave heap buffer untouched for now
 683                     shadow[i] = bb;
 684                     a = ((DirectBuffer)bb).address();
 685                 } else {
 686                     shadow[i] = src;
 687                     a = ((DirectBuffer)src).address() + pos;
 688                 }
 689                 unsafe.putAddress(address + OFFSETOF_BUF, a);
 690                 unsafe.putInt(address + OFFSETOF_LEN, rem);
 691                 address += SIZEOF_WSABUF;
 692             }
 693         }
 694 
 695         /**
 696          * Invoked after a write has completed to update the buffer positions
 697          * and release any substituted buffers.
 698          */
 699         void updateBuffers(int bytesWritten) {
 700             // Notify the buffers how many bytes were taken
 701             for (int i=0; i<numBufs; i++) {
 702                 ByteBuffer nextBuffer = bufs[i];
 703                 int pos = nextBuffer.position();
 704                 int lim = nextBuffer.limit();
 705                 int len = (pos <= lim ? lim - pos : lim);
 706                 if (bytesWritten >= len) {
 707                     bytesWritten -= len;
 708                     int newPosition = pos + len;
 709                     try {
 710                         nextBuffer.position(newPosition);
 711                     } catch (IllegalArgumentException x) {
 712                         // position changed by someone else
 713                     }
 714                 } else { // Buffers not completely filled
 715                     if (bytesWritten > 0) {
 716                         assert(pos + bytesWritten < (long)Integer.MAX_VALUE);
 717                         int newPosition = pos + bytesWritten;
 718                         try {
 719                             nextBuffer.position(newPosition);
 720                         } catch (IllegalArgumentException x) {
 721                             // position changed by someone else
 722                         }
 723                     }
 724                     break;
 725                 }
 726             }
 727         }
 728 
 729         void releaseBuffers() {
 730             for (int i=0; i<numBufs; i++) {
 731                 if (!(bufs[i] instanceof DirectBuffer)) {
 732                     Util.releaseTemporaryDirectBuffer(shadow[i]);
 733                 }
 734             }
 735         }
 736 
 737         @Override
 738         //@SuppressWarnings("unchecked")
 739         public void run() {
 740             long overlapped = 0L;
 741             boolean prepared = false;
 742             boolean pending = false;
 743             boolean shutdown = false;
 744 
 745             try {
 746                 begin();
 747 
 748                 // substitute non-direct buffers
 749                 prepareBuffers();
 750                 prepared = true;
 751 
 752                 // get an OVERLAPPED structure (from the cache or allocate)
 753                 overlapped = ioCache.add(result);
 754                 int n = write0(handle, numBufs, writeBufferArray, overlapped);
 755                 if (n == IOStatus.UNAVAILABLE) {
 756                     // I/O is pending
 757                     pending = true;
 758                     return;
 759                 }
 760                 if (n == IOStatus.EOF) {
 761                     // special case for shutdown output
 762                     shutdown = true;
 763                     throw new ClosedChannelException();
 764                 }
 765                 // write completed immediately
 766                 throw new InternalError("Write completed immediately");
 767             } catch (Throwable x) {
 768                 // write failed. Enable writing before releasing waiters.
 769                 enableWriting();
 770                 if (!shutdown && (x instanceof ClosedChannelException))
 771                     x = new AsynchronousCloseException();
 772                 if (!(x instanceof IOException))
 773                     x = new IOException(x);
 774                 result.setFailure(x);
 775             } finally {
 776                 // release resources if I/O not pending
 777                 if (!pending) {
 778                     if (overlapped != 0L)
 779                         ioCache.remove(overlapped);
 780                     if (prepared)
 781                         releaseBuffers();
 782                 }
 783                 end();
 784             }
 785 
 786             // invoke completion handler
 787             Invoker.invoke(result);
 788         }
 789 
 790         /**
 791          * Executed when the I/O has completed
 792          */
 793         @Override
 794         @SuppressWarnings("unchecked")
 795         public void completed(int bytesTransferred, boolean canInvokeDirect) {
 796             updateBuffers(bytesTransferred);
 797 
 798             // return direct buffer to cache if substituted
 799             releaseBuffers();
 800 
 801             // release waiters if not already released by timeout
 802             synchronized (result) {
 803                 if (result.isDone())
 804                     return;
 805                 enableWriting();
 806                 if (gatheringWrite) {
 807                     result.setResult((V)Long.valueOf(bytesTransferred));
 808                 } else {
 809                     result.setResult((V)Integer.valueOf(bytesTransferred));
 810                 }
 811             }
 812             if (canInvokeDirect) {
 813                 Invoker.invokeUnchecked(result);
 814             } else {
 815                 Invoker.invoke(result);
 816             }
 817         }
 818 
 819         @Override
 820         public void failed(int error, IOException x) {
 821             // return direct buffer to cache if substituted
 822             releaseBuffers();
 823 
 824             // release waiters if not already released by timeout
 825             if (!isOpen())
 826                 x = new AsynchronousCloseException();
 827 
 828             synchronized (result) {
 829                 if (result.isDone())
 830                     return;
 831                 enableWriting();
 832                 result.setFailure(x);
 833             }
 834             Invoker.invoke(result);
 835         }
 836 
 837         /**
 838          * Invoked if timeout expires before it is cancelled
 839          */
 840         void timeout() {
 841             // synchronize on result as the I/O could complete/fail
 842             synchronized (result) {
 843                 if (result.isDone())
 844                     return;
 845 
 846                 // kill further writing before releasing waiters
 847                 enableWriting(true);
 848                 result.setFailure(new InterruptedByTimeoutException());
 849             }
 850 
 851             // invoke handler without any locks
 852             Invoker.invoke(result);
 853         }
 854     }
 855 
 856     @Override
 857     <V extends Number,A> Future<V> implWrite(boolean gatheringWrite,
 858                                              ByteBuffer src,
 859                                              ByteBuffer[] srcs,
 860                                              long timeout,
 861                                              TimeUnit unit,
 862                                              A attachment,
 863                                              CompletionHandler<V,? super A> handler)
 864     {
 865         // setup task
 866         PendingFuture<V,A> result =
 867             new PendingFuture<V,A>(this, handler, attachment);
 868         ByteBuffer[] bufs;
 869         if (gatheringWrite) {
 870             bufs = srcs;
 871         } else {
 872             bufs = new ByteBuffer[1];
 873             bufs[0] = src;
 874         }
 875         final WriteTask writeTask = new WriteTask<V,A>(bufs, gatheringWrite, result);
 876         result.setContext(writeTask);
 877 
 878         // schedule timeout
 879         if (timeout > 0L) {
 880             Future<?> timeoutTask = iocp.schedule(new Runnable() {
 881                 public void run() {
 882                     writeTask.timeout();
 883                 }
 884             }, timeout, unit);
 885             result.setTimeoutTask(timeoutTask);
 886         }
 887 
 888         // initiate I/O (can only be done from thread in thread pool)
 889         // initiate I/O
 890         if (Iocp.supportsThreadAgnosticIo()) {
 891             writeTask.run();
 892         } else {
 893             Invoker.invokeOnThreadInThreadPool(this, writeTask);
 894         }
 895         return result;
 896     }
 897 
 898     // -- Native methods --
 899 
 900     private static native void initIDs();
 901 
 902     private static native int connect0(long socket, boolean preferIPv6,
 903         InetAddress remote, int remotePort, long overlapped) throws IOException;
 904 
 905     private static native void updateConnectContext(long socket) throws IOException;
 906 
 907     private static native int read0(long socket, int count, long addres, long overlapped)
 908         throws IOException;
 909 
 910     private static native int write0(long socket, int count, long address,
 911         long overlapped) throws IOException;
 912 
 913     private static native void shutdown0(long socket, int how) throws IOException;
 914 
 915     private static native void closesocket0(long socket) throws IOException;
 916 
 917     static {
 918         Util.load();
 919         initIDs();
 920     }
 921 }