1 /*
   2  * Copyright (c) 2008, 2013, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package sun.nio.ch;
  27 
  28 import java.nio.channels.*;
  29 import java.nio.ByteBuffer;
  30 import java.nio.BufferOverflowException;
  31 import java.net.*;
  32 import java.util.concurrent.*;
  33 import java.io.IOException;
  34 import java.security.AccessController;
  35 import java.security.PrivilegedActionException;
  36 import java.security.PrivilegedExceptionAction;
  37 import sun.misc.Unsafe;
  38 
  39 /**
  40  * Windows implementation of AsynchronousSocketChannel using overlapped I/O.
  41  */
  42 
  43 class WindowsAsynchronousSocketChannelImpl
  44     extends AsynchronousSocketChannelImpl implements Iocp.OverlappedChannel
  45 {
  46     private static final Unsafe unsafe = Unsafe.getUnsafe();
  47     private static int addressSize = unsafe.addressSize();
  48 
  49     private static int dependsArch(int value32, int value64) {
  50         return (addressSize == 4) ? value32 : value64;
  51     }
  52 
  53     /*
  54      * typedef struct _WSABUF {
  55      *     u_long      len;
  56      *     char FAR *  buf;
  57      * } WSABUF;
  58      */
  59     private static final int SIZEOF_WSABUF  = dependsArch(8, 16);
  60     private static final int OFFSETOF_LEN   = 0;
  61     private static final int OFFSETOF_BUF   = dependsArch(4, 8);
  62 
  63     // maximum vector size for scatter/gather I/O
  64     private static final int MAX_WSABUF     = 16;
  65 
  66     private static final int SIZEOF_WSABUFARRAY = MAX_WSABUF * SIZEOF_WSABUF;
  67 
  68 
  69     // socket handle. Use begin()/end() around each usage of this handle.
  70     final long handle;
  71 
  72     // I/O completion port that the socket is associated with
  73     private final Iocp iocp;
  74 
  75     // completion key to identify channel when I/O completes
  76     private final int completionKey;
  77 
  78     // Pending I/O operations are tied to an OVERLAPPED structure that can only
  79     // be released when the I/O completion event is posted to the completion
  80     // port. Where I/O operations complete immediately then it is possible
  81     // there may be more than two OVERLAPPED structures in use.
  82     private final PendingIoCache ioCache;
  83 
  84     // per-channel arrays of WSABUF structures
  85     private final long readBufferArray;
  86     private final long writeBufferArray;
  87 
  88 
  89     WindowsAsynchronousSocketChannelImpl(Iocp iocp, boolean failIfGroupShutdown)
  90         throws IOException
  91     {
  92         super(iocp);
  93 
  94         // associate socket with default completion port
  95         long h = IOUtil.fdVal(fd);
  96         int key = 0;
  97         try {
  98             key = iocp.associate(this, h);
  99         } catch (ShutdownChannelGroupException x) {
 100             if (failIfGroupShutdown) {
 101                 closesocket0(h);
 102                 throw x;
 103             }
 104         } catch (IOException x) {
 105             closesocket0(h);
 106             throw x;
 107         }
 108 
 109         this.handle = h;
 110         this.iocp = iocp;
 111         this.completionKey = key;
 112         this.ioCache = new PendingIoCache();
 113 
 114         // allocate WSABUF arrays
 115         this.readBufferArray = unsafe.allocateMemory(SIZEOF_WSABUFARRAY);
 116         this.writeBufferArray = unsafe.allocateMemory(SIZEOF_WSABUFARRAY);
 117     }
 118 
 119     WindowsAsynchronousSocketChannelImpl(Iocp iocp) throws IOException {
 120         this(iocp, true);
 121     }
 122 
 123     @Override
 124     public AsynchronousChannelGroupImpl group() {
 125         return iocp;
 126     }
 127 
 128     /**
 129      * Invoked by Iocp when an I/O operation competes.
 130      */
 131     @Override
 132     public <V,A> PendingFuture<V,A> getByOverlapped(long overlapped) {
 133         return ioCache.remove(overlapped);
 134     }
 135 
 136     // invoked by WindowsAsynchronousServerSocketChannelImpl
 137     long handle() {
 138         return handle;
 139     }
 140 
 141     // invoked by WindowsAsynchronousServerSocketChannelImpl when new connection
 142     // accept
 143     void setConnected(InetSocketAddress localAddress,
 144                       InetSocketAddress remoteAddress)
 145     {
 146         synchronized (stateLock) {
 147             state = ST_CONNECTED;
 148             this.localAddress = localAddress;
 149             this.remoteAddress = remoteAddress;
 150         }
 151     }
 152 
 153     @Override
 154     void implClose() throws IOException {
 155         // close socket (may cause outstanding async I/O operations to fail).
 156         closesocket0(handle);
 157 
 158         // waits until all I/O operations have completed
 159         ioCache.close();
 160 
 161         // release arrays of WSABUF structures
 162         unsafe.freeMemory(readBufferArray);
 163         unsafe.freeMemory(writeBufferArray);
 164 
 165         // finally disassociate from the completion port (key can be 0 if
 166         // channel created when group is shutdown)
 167         if (completionKey != 0)
 168             iocp.disassociate(completionKey);
 169     }
 170 
 171     @Override
 172     public void onCancel(PendingFuture<?,?> task) {
 173         if (task.getContext() instanceof ConnectTask)
 174             killConnect();
 175         if (task.getContext() instanceof ReadTask)
 176             killReading();
 177         if (task.getContext() instanceof WriteTask)
 178             killWriting();
 179     }
 180 
 181     /**
 182      * Implements the task to initiate a connection and the handler to
 183      * consume the result when the connection is established (or fails).
 184      */
 185     private class ConnectTask<A> implements Runnable, Iocp.ResultHandler {
 186         private final InetSocketAddress remote;
 187         private final PendingFuture<Void,A> result;
 188 
 189         ConnectTask(InetSocketAddress remote, PendingFuture<Void,A> result) {
 190             this.remote = remote;
 191             this.result = result;
 192         }
 193 
 194         private void closeChannel() {
 195             try {
 196                 close();
 197             } catch (IOException ignore) { }
 198         }
 199 
 200         private IOException toIOException(Throwable x) {
 201             if (x instanceof IOException) {
 202                 if (x instanceof ClosedChannelException)
 203                     x = new AsynchronousCloseException();
 204                 return (IOException)x;
 205             }
 206             return new IOException(x);
 207         }
 208 
 209         /**
 210          * Invoke after a connection is successfully established.
 211          */
 212         private void afterConnect() throws IOException {
 213             updateConnectContext(handle);
 214             synchronized (stateLock) {
 215                 state = ST_CONNECTED;
 216                 remoteAddress = remote;
 217             }
 218         }
 219 
 220         /**
 221          * Task to initiate a connection.
 222          */
 223         @Override
 224         public void run() {
 225             long overlapped = 0L;
 226             Throwable exc = null;
 227             try {
 228                 begin();
 229 
 230                 // synchronize on result to allow this thread handle the case
 231                 // where the connection is established immediately.
 232                 synchronized (result) {
 233                     overlapped = ioCache.add(result);
 234                     // initiate the connection
 235                     int n = connect0(handle, Net.isIPv6Available(), remote.getAddress(),
 236                                      remote.getPort(), overlapped);
 237                     if (n == IOStatus.UNAVAILABLE) {
 238                         // connection is pending
 239                         return;
 240                     }
 241 
 242                     // connection established immediately
 243                     afterConnect();
 244                     result.setResult(null);
 245                 }
 246             } catch (Throwable x) {
 247                 if (overlapped != 0L)
 248                     ioCache.remove(overlapped);
 249                 exc = x;
 250             } finally {
 251                 end();
 252             }
 253 
 254             if (exc != null) {
 255                 closeChannel();
 256                 result.setFailure(toIOException(exc));
 257             }
 258             Invoker.invoke(result);
 259         }
 260 
 261         /**
 262          * Invoked by handler thread when connection established.
 263          */
 264         @Override
 265         public void completed(int bytesTransferred, boolean canInvokeDirect) {
 266             Throwable exc = null;
 267             try {
 268                 begin();
 269                 afterConnect();
 270                 result.setResult(null);
 271             } catch (Throwable x) {
 272                 // channel is closed or unable to finish connect
 273                 exc = x;
 274             } finally {
 275                 end();
 276             }
 277 
 278             // can't close channel while in begin/end block
 279             if (exc != null) {
 280                 closeChannel();
 281                 result.setFailure(toIOException(exc));
 282             }
 283 
 284             if (canInvokeDirect) {
 285                 Invoker.invokeUnchecked(result);
 286             } else {
 287                 Invoker.invoke(result);
 288             }
 289         }
 290 
 291         /**
 292          * Invoked by handler thread when failed to establish connection.
 293          */
 294         @Override
 295         public void failed(int error, IOException x) {
 296             if (isOpen()) {
 297                 closeChannel();
 298                 result.setFailure(x);
 299             } else {
 300                 result.setFailure(new AsynchronousCloseException());
 301             }
 302             Invoker.invoke(result);
 303         }
 304     }
 305 
 306     private void doPrivilegedBind(final SocketAddress sa) throws IOException {
 307         try {
 308             AccessController.doPrivileged(new PrivilegedExceptionAction<Void>() {
 309                 public Void run() throws IOException {
 310                     bind(sa);
 311                     return null;
 312                 }
 313             });
 314         } catch (PrivilegedActionException e) {
 315             throw (IOException) e.getException();
 316         }
 317     }
 318 
 319     @Override
 320     <A> Future<Void> implConnect(SocketAddress remote,
 321                                  A attachment,
 322                                  CompletionHandler<Void,? super A> handler)
 323     {
 324         if (!isOpen()) {
 325             Throwable exc = new ClosedChannelException();
 326             if (handler == null)
 327                 return CompletedFuture.withFailure(exc);
 328             Invoker.invoke(this, handler, attachment, null, exc);
 329             return null;
 330         }
 331 
 332         InetSocketAddress isa = Net.checkAddress(remote);
 333 
 334         // permission check
 335         SecurityManager sm = System.getSecurityManager();
 336         if (sm != null)
 337             sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort());
 338 
 339         // check and update state
 340         // ConnectEx requires the socket to be bound to a local address
 341         IOException bindException = null;
 342         synchronized (stateLock) {
 343             if (state == ST_CONNECTED)
 344                 throw new AlreadyConnectedException();
 345             if (state == ST_PENDING)
 346                 throw new ConnectionPendingException();
 347             if (localAddress == null) {
 348                 try {
 349                     SocketAddress any = new InetSocketAddress(0);
 350                     if (sm == null) {
 351                         bind(any);
 352                     } else {
 353                         doPrivilegedBind(any);
 354                     }
 355                 } catch (IOException x) {
 356                     bindException = x;
 357                 }
 358             }
 359             if (bindException == null)
 360                 state = ST_PENDING;
 361         }
 362 
 363         // handle bind failure
 364         if (bindException != null) {
 365             try {
 366                 close();
 367             } catch (IOException ignore) { }
 368             if (handler == null)
 369                 return CompletedFuture.withFailure(bindException);
 370             Invoker.invoke(this, handler, attachment, null, bindException);
 371             return null;
 372         }
 373 
 374         // setup task
 375         PendingFuture<Void,A> result =
 376             new PendingFuture<Void,A>(this, handler, attachment);
 377         ConnectTask<A> task = new ConnectTask<A>(isa, result);
 378         result.setContext(task);
 379 
 380         // initiate I/O
 381         if (Iocp.supportsThreadAgnosticIo()) {
 382             task.run();
 383         } else {
 384             Invoker.invokeOnThreadInThreadPool(this, task);
 385         }
 386         return result;
 387     }
 388 
 389     /**
 390      * Implements the task to initiate a read and the handler to consume the
 391      * result when the read completes.
 392      */
 393     private class ReadTask<V,A> implements Runnable, Iocp.ResultHandler {
 394         private final ByteBuffer[] bufs;
 395         private final int numBufs;
 396         private final boolean scatteringRead;
 397         private final PendingFuture<V,A> result;
 398 
 399         // set by run method
 400         private ByteBuffer[] shadow;
 401 
 402         ReadTask(ByteBuffer[] bufs,
 403                  boolean scatteringRead,
 404                  PendingFuture<V,A> result)
 405         {
 406             this.bufs = bufs;
 407             this.numBufs = (bufs.length > MAX_WSABUF) ? MAX_WSABUF : bufs.length;
 408             this.scatteringRead = scatteringRead;
 409             this.result = result;
 410         }
 411 
 412         /**
 413          * Invoked prior to read to prepare the WSABUF array. Where necessary,
 414          * it substitutes non-direct buffers with direct buffers.
 415          */
 416         void prepareBuffers() {
 417             shadow = new ByteBuffer[numBufs];
 418             long address = readBufferArray;
 419             for (int i=0; i<numBufs; i++) {
 420                 ByteBuffer dst = bufs[i];
 421                 int pos = dst.position();
 422                 int lim = dst.limit();
 423                 assert (pos <= lim);
 424                 int rem = (pos <= lim ? lim - pos : 0);
 425                 long a;
 426                 if (!(dst instanceof DirectBuffer)) {
 427                     // substitute with direct buffer
 428                     ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
 429                     shadow[i] = bb;
 430                     a = ((DirectBuffer)bb).address();
 431                 } else {
 432                     shadow[i] = dst;
 433                     a = ((DirectBuffer)dst).address() + pos;
 434                 }
 435                 unsafe.putAddress(address + OFFSETOF_BUF, a);
 436                 unsafe.putInt(address + OFFSETOF_LEN, rem);
 437                 address += SIZEOF_WSABUF;
 438             }
 439         }
 440 
 441         /**
 442          * Invoked after a read has completed to update the buffer positions
 443          * and release any substituted buffers.
 444          */
 445         void updateBuffers(int bytesRead) {
 446             for (int i=0; i<numBufs; i++) {
 447                 ByteBuffer nextBuffer = shadow[i];
 448                 int pos = nextBuffer.position();
 449                 int len = nextBuffer.remaining();
 450                 if (bytesRead >= len) {
 451                     bytesRead -= len;
 452                     int newPosition = pos + len;
 453                     try {
 454                         nextBuffer.position(newPosition);
 455                     } catch (IllegalArgumentException x) {
 456                         // position changed by another
 457                     }
 458                 } else { // Buffers not completely filled
 459                     if (bytesRead > 0) {
 460                         assert(pos + bytesRead < (long)Integer.MAX_VALUE);
 461                         int newPosition = pos + bytesRead;
 462                         try {
 463                             nextBuffer.position(newPosition);
 464                         } catch (IllegalArgumentException x) {
 465                             // position changed by another
 466                         }
 467                     }
 468                     break;
 469                 }
 470             }
 471 
 472             // Put results from shadow into the slow buffers
 473             for (int i=0; i<numBufs; i++) {
 474                 if (!(bufs[i] instanceof DirectBuffer)) {
 475                     shadow[i].flip();
 476                     try {
 477                         bufs[i].put(shadow[i]);
 478                     } catch (BufferOverflowException x) {
 479                         // position changed by another
 480                     }
 481                 }
 482             }
 483         }
 484 
 485         void releaseBuffers() {
 486             for (int i=0; i<numBufs; i++) {
 487                 if (!(bufs[i] instanceof DirectBuffer)) {
 488                     Util.releaseTemporaryDirectBuffer(shadow[i]);
 489                 }
 490             }
 491         }
 492 
 493         @Override
 494         @SuppressWarnings("unchecked")
 495         public void run() {
 496             long overlapped = 0L;
 497             boolean prepared = false;
 498             boolean pending = false;
 499 
 500             try {
 501                 begin();
 502 
 503                 // substitute non-direct buffers
 504                 prepareBuffers();
 505                 prepared = true;
 506 
 507                 // get an OVERLAPPED structure (from the cache or allocate)
 508                 overlapped = ioCache.add(result);
 509 
 510                 // initiate read
 511                 int n = read0(handle, numBufs, readBufferArray, overlapped);
 512                 if (n == IOStatus.UNAVAILABLE) {
 513                     // I/O is pending
 514                     pending = true;
 515                     return;
 516                 }
 517                 if (n == IOStatus.EOF) {
 518                     // input shutdown
 519                     enableReading();
 520                     if (scatteringRead) {
 521                         result.setResult((V)Long.valueOf(-1L));
 522                     } else {
 523                         result.setResult((V)Integer.valueOf(-1));
 524                     }
 525                 } else {
 526                     throw new InternalError("Read completed immediately");
 527                 }
 528             } catch (Throwable x) {
 529                 // failed to initiate read
 530                 // reset read flag before releasing waiters
 531                 enableReading();
 532                 if (x instanceof ClosedChannelException)
 533                     x = new AsynchronousCloseException();
 534                 if (!(x instanceof IOException))
 535                     x = new IOException(x);
 536                 result.setFailure(x);
 537             } finally {
 538                 // release resources if I/O not pending
 539                 if (!pending) {
 540                     if (overlapped != 0L)
 541                         ioCache.remove(overlapped);
 542                     if (prepared)
 543                         releaseBuffers();
 544                 }
 545                 end();
 546             }
 547 
 548             // invoke completion handler
 549             Invoker.invoke(result);
 550         }
 551 
 552         /**
 553          * Executed when the I/O has completed
 554          */
 555         @Override
 556         @SuppressWarnings("unchecked")
 557         public void completed(int bytesTransferred, boolean canInvokeDirect) {
 558             if (bytesTransferred == 0) {
 559                 bytesTransferred = -1;  // EOF
 560             } else {
 561                 updateBuffers(bytesTransferred);
 562             }
 563 
 564             // return direct buffer to cache if substituted
 565             releaseBuffers();
 566 
 567             // release waiters if not already released by timeout
 568             synchronized (result) {
 569                 if (result.isDone())
 570                     return;
 571                 enableReading();
 572                 if (scatteringRead) {
 573                     result.setResult((V)Long.valueOf(bytesTransferred));
 574                 } else {
 575                     result.setResult((V)Integer.valueOf(bytesTransferred));
 576                 }
 577             }
 578             if (canInvokeDirect) {
 579                 Invoker.invokeUnchecked(result);
 580             } else {
 581                 Invoker.invoke(result);
 582             }
 583         }
 584 
 585         @Override
 586         public void failed(int error, IOException x) {
 587             // return direct buffer to cache if substituted
 588             releaseBuffers();
 589 
 590             // release waiters if not already released by timeout
 591             if (!isOpen())
 592                 x = new AsynchronousCloseException();
 593 
 594             synchronized (result) {
 595                 if (result.isDone())
 596                     return;
 597                 enableReading();
 598                 result.setFailure(x);
 599             }
 600             Invoker.invoke(result);
 601         }
 602 
 603         /**
 604          * Invoked if timeout expires before it is cancelled
 605          */
 606         void timeout() {
 607             // synchronize on result as the I/O could complete/fail
 608             synchronized (result) {
 609                 if (result.isDone())
 610                     return;
 611 
 612                 // kill further reading before releasing waiters
 613                 enableReading(true);
 614                 result.setFailure(new InterruptedByTimeoutException());
 615             }
 616 
 617             // invoke handler without any locks
 618             Invoker.invoke(result);
 619         }
 620     }
 621 
 622     @Override
 623     <V extends Number,A> Future<V> implRead(boolean isScatteringRead,
 624                                             ByteBuffer dst,
 625                                             ByteBuffer[] dsts,
 626                                             long timeout,
 627                                             TimeUnit unit,
 628                                             A attachment,
 629                                             CompletionHandler<V,? super A> handler)
 630     {
 631         // setup task
 632         PendingFuture<V,A> result =
 633             new PendingFuture<V,A>(this, handler, attachment);
 634         ByteBuffer[] bufs;
 635         if (isScatteringRead) {
 636             bufs = dsts;
 637         } else {
 638             bufs = new ByteBuffer[1];
 639             bufs[0] = dst;
 640         }
 641         final ReadTask<V,A> readTask =
 642                 new ReadTask<V,A>(bufs, isScatteringRead, result);
 643         result.setContext(readTask);
 644 
 645         // schedule timeout
 646         if (timeout > 0L) {
 647             Future<?> timeoutTask = iocp.schedule(new Runnable() {
 648                 public void run() {
 649                     readTask.timeout();
 650                 }
 651             }, timeout, unit);
 652             result.setTimeoutTask(timeoutTask);
 653         }
 654 
 655         // initiate I/O
 656         if (Iocp.supportsThreadAgnosticIo()) {
 657             readTask.run();
 658         } else {
 659             Invoker.invokeOnThreadInThreadPool(this, readTask);
 660         }
 661         return result;
 662     }
 663 
 664     /**
 665      * Implements the task to initiate a write and the handler to consume the
 666      * result when the write completes.
 667      */
 668     private class WriteTask<V,A> implements Runnable, Iocp.ResultHandler {
 669         private final ByteBuffer[] bufs;
 670         private final int numBufs;
 671         private final boolean gatheringWrite;
 672         private final PendingFuture<V,A> result;
 673 
 674         // set by run method
 675         private ByteBuffer[] shadow;
 676 
 677         WriteTask(ByteBuffer[] bufs,
 678                   boolean gatheringWrite,
 679                   PendingFuture<V,A> result)
 680         {
 681             this.bufs = bufs;
 682             this.numBufs = (bufs.length > MAX_WSABUF) ? MAX_WSABUF : bufs.length;
 683             this.gatheringWrite = gatheringWrite;
 684             this.result = result;
 685         }
 686 
 687         /**
 688          * Invoked prior to write to prepare the WSABUF array. Where necessary,
 689          * it substitutes non-direct buffers with direct buffers.
 690          */
 691         void prepareBuffers() {
 692             shadow = new ByteBuffer[numBufs];
 693             long address = writeBufferArray;
 694             for (int i=0; i<numBufs; i++) {
 695                 ByteBuffer src = bufs[i];
 696                 int pos = src.position();
 697                 int lim = src.limit();
 698                 assert (pos <= lim);
 699                 int rem = (pos <= lim ? lim - pos : 0);
 700                 long a;
 701                 if (!(src instanceof DirectBuffer)) {
 702                     // substitute with direct buffer
 703                     ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
 704                     bb.put(src);
 705                     bb.flip();
 706                     src.position(pos);  // leave heap buffer untouched for now
 707                     shadow[i] = bb;
 708                     a = ((DirectBuffer)bb).address();
 709                 } else {
 710                     shadow[i] = src;
 711                     a = ((DirectBuffer)src).address() + pos;
 712                 }
 713                 unsafe.putAddress(address + OFFSETOF_BUF, a);
 714                 unsafe.putInt(address + OFFSETOF_LEN, rem);
 715                 address += SIZEOF_WSABUF;
 716             }
 717         }
 718 
 719         /**
 720          * Invoked after a write has completed to update the buffer positions
 721          * and release any substituted buffers.
 722          */
 723         void updateBuffers(int bytesWritten) {
 724             // Notify the buffers how many bytes were taken
 725             for (int i=0; i<numBufs; i++) {
 726                 ByteBuffer nextBuffer = bufs[i];
 727                 int pos = nextBuffer.position();
 728                 int lim = nextBuffer.limit();
 729                 int len = (pos <= lim ? lim - pos : lim);
 730                 if (bytesWritten >= len) {
 731                     bytesWritten -= len;
 732                     int newPosition = pos + len;
 733                     try {
 734                         nextBuffer.position(newPosition);
 735                     } catch (IllegalArgumentException x) {
 736                         // position changed by someone else
 737                     }
 738                 } else { // Buffers not completely filled
 739                     if (bytesWritten > 0) {
 740                         assert(pos + bytesWritten < (long)Integer.MAX_VALUE);
 741                         int newPosition = pos + bytesWritten;
 742                         try {
 743                             nextBuffer.position(newPosition);
 744                         } catch (IllegalArgumentException x) {
 745                             // position changed by someone else
 746                         }
 747                     }
 748                     break;
 749                 }
 750             }
 751         }
 752 
 753         void releaseBuffers() {
 754             for (int i=0; i<numBufs; i++) {
 755                 if (!(bufs[i] instanceof DirectBuffer)) {
 756                     Util.releaseTemporaryDirectBuffer(shadow[i]);
 757                 }
 758             }
 759         }
 760 
 761         @Override
 762         //@SuppressWarnings("unchecked")
 763         public void run() {
 764             long overlapped = 0L;
 765             boolean prepared = false;
 766             boolean pending = false;
 767             boolean shutdown = false;
 768 
 769             try {
 770                 begin();
 771 
 772                 // substitute non-direct buffers
 773                 prepareBuffers();
 774                 prepared = true;
 775 
 776                 // get an OVERLAPPED structure (from the cache or allocate)
 777                 overlapped = ioCache.add(result);
 778                 int n = write0(handle, numBufs, writeBufferArray, overlapped);
 779                 if (n == IOStatus.UNAVAILABLE) {
 780                     // I/O is pending
 781                     pending = true;
 782                     return;
 783                 }
 784                 if (n == IOStatus.EOF) {
 785                     // special case for shutdown output
 786                     shutdown = true;
 787                     throw new ClosedChannelException();
 788                 }
 789                 // write completed immediately
 790                 throw new InternalError("Write completed immediately");
 791             } catch (Throwable x) {
 792                 // write failed. Enable writing before releasing waiters.
 793                 enableWriting();
 794                 if (!shutdown && (x instanceof ClosedChannelException))
 795                     x = new AsynchronousCloseException();
 796                 if (!(x instanceof IOException))
 797                     x = new IOException(x);
 798                 result.setFailure(x);
 799             } finally {
 800                 // release resources if I/O not pending
 801                 if (!pending) {
 802                     if (overlapped != 0L)
 803                         ioCache.remove(overlapped);
 804                     if (prepared)
 805                         releaseBuffers();
 806                 }
 807                 end();
 808             }
 809 
 810             // invoke completion handler
 811             Invoker.invoke(result);
 812         }
 813 
 814         /**
 815          * Executed when the I/O has completed
 816          */
 817         @Override
 818         @SuppressWarnings("unchecked")
 819         public void completed(int bytesTransferred, boolean canInvokeDirect) {
 820             updateBuffers(bytesTransferred);
 821 
 822             // return direct buffer to cache if substituted
 823             releaseBuffers();
 824 
 825             // release waiters if not already released by timeout
 826             synchronized (result) {
 827                 if (result.isDone())
 828                     return;
 829                 enableWriting();
 830                 if (gatheringWrite) {
 831                     result.setResult((V)Long.valueOf(bytesTransferred));
 832                 } else {
 833                     result.setResult((V)Integer.valueOf(bytesTransferred));
 834                 }
 835             }
 836             if (canInvokeDirect) {
 837                 Invoker.invokeUnchecked(result);
 838             } else {
 839                 Invoker.invoke(result);
 840             }
 841         }
 842 
 843         @Override
 844         public void failed(int error, IOException x) {
 845             // return direct buffer to cache if substituted
 846             releaseBuffers();
 847 
 848             // release waiters if not already released by timeout
 849             if (!isOpen())
 850                 x = new AsynchronousCloseException();
 851 
 852             synchronized (result) {
 853                 if (result.isDone())
 854                     return;
 855                 enableWriting();
 856                 result.setFailure(x);
 857             }
 858             Invoker.invoke(result);
 859         }
 860 
 861         /**
 862          * Invoked if timeout expires before it is cancelled
 863          */
 864         void timeout() {
 865             // synchronize on result as the I/O could complete/fail
 866             synchronized (result) {
 867                 if (result.isDone())
 868                     return;
 869 
 870                 // kill further writing before releasing waiters
 871                 enableWriting(true);
 872                 result.setFailure(new InterruptedByTimeoutException());
 873             }
 874 
 875             // invoke handler without any locks
 876             Invoker.invoke(result);
 877         }
 878     }
 879 
 880     @Override
 881     <V extends Number,A> Future<V> implWrite(boolean gatheringWrite,
 882                                              ByteBuffer src,
 883                                              ByteBuffer[] srcs,
 884                                              long timeout,
 885                                              TimeUnit unit,
 886                                              A attachment,
 887                                              CompletionHandler<V,? super A> handler)
 888     {
 889         // setup task
 890         PendingFuture<V,A> result =
 891             new PendingFuture<V,A>(this, handler, attachment);
 892         ByteBuffer[] bufs;
 893         if (gatheringWrite) {
 894             bufs = srcs;
 895         } else {
 896             bufs = new ByteBuffer[1];
 897             bufs[0] = src;
 898         }
 899         final WriteTask<V,A> writeTask =
 900                 new WriteTask<V,A>(bufs, gatheringWrite, result);
 901         result.setContext(writeTask);
 902 
 903         // schedule timeout
 904         if (timeout > 0L) {
 905             Future<?> timeoutTask = iocp.schedule(new Runnable() {
 906                 public void run() {
 907                     writeTask.timeout();
 908                 }
 909             }, timeout, unit);
 910             result.setTimeoutTask(timeoutTask);
 911         }
 912 
 913         // initiate I/O (can only be done from thread in thread pool)
 914         // initiate I/O
 915         if (Iocp.supportsThreadAgnosticIo()) {
 916             writeTask.run();
 917         } else {
 918             Invoker.invokeOnThreadInThreadPool(this, writeTask);
 919         }
 920         return result;
 921     }
 922 
 923     // -- Native methods --
 924 
 925     private static native void initIDs();
 926 
 927     private static native int connect0(long socket, boolean preferIPv6,
 928         InetAddress remote, int remotePort, long overlapped) throws IOException;
 929 
 930     private static native void updateConnectContext(long socket) throws IOException;
 931 
 932     private static native int read0(long socket, int count, long addres, long overlapped)
 933         throws IOException;
 934 
 935     private static native int write0(long socket, int count, long address,
 936         long overlapped) throws IOException;
 937 
 938     private static native void shutdown0(long socket, int how) throws IOException;
 939 
 940     private static native void closesocket0(long socket) throws IOException;
 941 
 942     static {
 943         IOUtil.load();
 944         initIDs();
 945     }
 946 }