1 /*
   2  * Copyright (c) 2008, 2013, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package sun.nio.ch;
  27 
  28 import java.nio.channels.*;
  29 import java.util.concurrent.*;
  30 import java.nio.ByteBuffer;
  31 import java.nio.BufferOverflowException;
  32 import java.io.IOException;
  33 import java.io.FileDescriptor;
  34 import sun.misc.SharedSecrets;
  35 import sun.misc.JavaIOFileDescriptorAccess;
  36 
  37 /**
  38  * Windows implementation of AsynchronousFileChannel using overlapped I/O.
  39  */
  40 
  41 public class WindowsAsynchronousFileChannelImpl
  42     extends AsynchronousFileChannelImpl
  43     implements Iocp.OverlappedChannel, Groupable
  44 {
  45     private static final JavaIOFileDescriptorAccess fdAccess =
  46         SharedSecrets.getJavaIOFileDescriptorAccess();
  47 
  48     // error when EOF is detected asynchronously.
  49     private static final int ERROR_HANDLE_EOF = 38;
  50 
  51     // Lazy initialization of default I/O completion port
  52     private static class DefaultIocpHolder {
  53         static final Iocp defaultIocp = defaultIocp();
  54         private static Iocp defaultIocp() {
  55             try {
  56                 return new Iocp(null, ThreadPool.createDefault()).start();
  57             } catch (IOException ioe) {
  58                 throw new InternalError(ioe);
  59             }
  60         }
  61     }
  62 
  63     // Used for force/truncate/size methods
  64     private static final FileDispatcher nd = new FileDispatcherImpl();
  65 
  66     // The handle is extracted for use in native methods invoked from this class.
  67     private final long handle;
  68 
  69     // The key that identifies the channel's association with the I/O port
  70     private final int completionKey;
  71 
  72     // I/O completion port (group)
  73     private final Iocp iocp;
  74 
  75     private final boolean isDefaultIocp;
  76 
  77     // Caches OVERLAPPED structure for each outstanding I/O operation
  78     private final PendingIoCache ioCache;
  79 
  80 
  81     private WindowsAsynchronousFileChannelImpl(FileDescriptor fdObj,
  82                                                boolean reading,
  83                                                boolean writing,
  84                                                Iocp iocp,
  85                                                boolean isDefaultIocp)
  86         throws IOException
  87     {
  88         super(fdObj, reading, writing, iocp.executor());
  89         this.handle = fdAccess.getHandle(fdObj);
  90         this.iocp = iocp;
  91         this.isDefaultIocp = isDefaultIocp;
  92         this.ioCache = new PendingIoCache();
  93         this.completionKey = iocp.associate(this, handle);
  94     }
  95 
  96     public static AsynchronousFileChannel open(FileDescriptor fdo,
  97                                                boolean reading,
  98                                                boolean writing,
  99                                                ThreadPool pool)
 100         throws IOException
 101     {
 102         Iocp iocp;
 103         boolean isDefaultIocp;
 104         if (pool == null) {
 105             iocp = DefaultIocpHolder.defaultIocp;
 106             isDefaultIocp = true;
 107         } else {
 108             iocp = new Iocp(null, pool).start();
 109             isDefaultIocp = false;
 110         }
 111         try {
 112             return new
 113                 WindowsAsynchronousFileChannelImpl(fdo, reading, writing, iocp, isDefaultIocp);
 114         } catch (IOException x) {
 115             // error binding to port so need to close it (if created for this channel)
 116             if (!isDefaultIocp)
 117                 iocp.implClose();
 118             throw x;
 119         }
 120     }
 121 
 122     @Override
 123     public <V,A> PendingFuture<V,A> getByOverlapped(long overlapped) {
 124         return ioCache.remove(overlapped);
 125     }
 126 
 127     @Override
 128     public void close() throws IOException {
 129         closeLock.writeLock().lock();
 130         try {
 131             if (closed)
 132                 return;     // already closed
 133             closed = true;
 134         } finally {
 135             closeLock.writeLock().unlock();
 136         }
 137 
 138         // invalidate all locks held for this channel
 139         invalidateAllLocks();
 140 
 141         // close the file
 142         close0(handle);
 143 
 144         // waits until all I/O operations have completed
 145         ioCache.close();
 146 
 147         // disassociate from port
 148         iocp.disassociate(completionKey);
 149 
 150         // for the non-default group close the port
 151         if (!isDefaultIocp)
 152             iocp.detachFromThreadPool();
 153     }
 154 
 155     @Override
 156     public AsynchronousChannelGroupImpl group() {
 157         return iocp;
 158     }
 159 
 160     /**
 161      * Translates Throwable to IOException
 162      */
 163     private static IOException toIOException(Throwable x) {
 164         if (x instanceof IOException) {
 165             if (x instanceof ClosedChannelException)
 166                 x = new AsynchronousCloseException();
 167             return (IOException)x;
 168         }
 169         return new IOException(x);
 170     }
 171 
 172     @Override
 173     public long size() throws IOException {
 174         try {
 175             begin();
 176             return nd.size(fdObj);
 177         } finally {
 178             end();
 179         }
 180     }
 181 
 182     @Override
 183     public AsynchronousFileChannel truncate(long size) throws IOException {
 184         if (size < 0)
 185             throw new IllegalArgumentException("Negative size");
 186         if (!writing)
 187             throw new NonWritableChannelException();
 188         try {
 189             begin();
 190             if (size > nd.size(fdObj))
 191                 return this;
 192             nd.truncate(fdObj, size);
 193         } finally {
 194             end();
 195         }
 196         return this;
 197     }
 198 
 199     @Override
 200     public void force(boolean metaData) throws IOException {
 201         try {
 202             begin();
 203             nd.force(fdObj, metaData);
 204         } finally {
 205             end();
 206         }
 207     }
 208 
 209     // -- file locking --
 210 
 211     /**
 212      * Task that initiates locking operation and handles completion result.
 213      */
 214     private class LockTask<A> implements Runnable, Iocp.ResultHandler {
 215         private final long position;
 216         private final FileLockImpl fli;
 217         private final PendingFuture<FileLock,A> result;
 218 
 219         LockTask(long position,
 220                  FileLockImpl fli,
 221                  PendingFuture<FileLock,A> result)
 222         {
 223             this.position = position;
 224             this.fli = fli;
 225             this.result = result;
 226         }
 227 
 228         @Override
 229         public void run() {
 230             long overlapped = 0L;
 231             boolean pending = false;
 232             try {
 233                 begin();
 234 
 235                 // allocate OVERLAPPED structure
 236                 overlapped = ioCache.add(result);
 237 
 238                 // synchronize on result to avoid race with handler thread
 239                 // when lock is acquired immediately.
 240                 synchronized (result) {
 241                     int n = lockFile(handle, position, fli.size(), fli.isShared(),
 242                                      overlapped);
 243                     if (n == IOStatus.UNAVAILABLE) {
 244                         // I/O is pending
 245                         pending = true;
 246                         return;
 247                     }
 248                     // acquired lock immediately
 249                     result.setResult(fli);
 250                 }
 251 
 252             } catch (Throwable x) {
 253                 // lock failed or channel closed
 254                 removeFromFileLockTable(fli);
 255                 result.setFailure(toIOException(x));
 256             } finally {
 257                 if (!pending && overlapped != 0L)
 258                     ioCache.remove(overlapped);
 259                 end();
 260             }
 261 
 262             // invoke completion handler
 263             Invoker.invoke(result);
 264         }
 265 
 266         @Override
 267         public void completed(int bytesTransferred, boolean canInvokeDirect) {
 268             // release waiters and invoke completion handler
 269             result.setResult(fli);
 270             if (canInvokeDirect) {
 271                 Invoker.invokeUnchecked(result);
 272             } else {
 273                 Invoker.invoke(result);
 274             }
 275         }
 276 
 277         @Override
 278         public void failed(int error, IOException x) {
 279             // lock not acquired so remove from lock table
 280             removeFromFileLockTable(fli);
 281 
 282             // release waiters
 283             if (isOpen()) {
 284                 result.setFailure(x);
 285             } else {
 286                 result.setFailure(new AsynchronousCloseException());
 287             }
 288             Invoker.invoke(result);
 289         }
 290     }
 291 
 292     @Override
 293     <A> Future<FileLock> implLock(final long position,
 294                                   final long size,
 295                                   final boolean shared,
 296                                   A attachment,
 297                                   final CompletionHandler<FileLock,? super A> handler)
 298     {
 299         if (shared && !reading)
 300             throw new NonReadableChannelException();
 301         if (!shared && !writing)
 302             throw new NonWritableChannelException();
 303 
 304         // add to lock table
 305         FileLockImpl fli = addToFileLockTable(position, size, shared);
 306         if (fli == null) {
 307             Throwable exc = new ClosedChannelException();
 308             if (handler == null)
 309                 return CompletedFuture.withFailure(exc);
 310             Invoker.invoke(this, handler, attachment, null, exc);
 311             return null;
 312         }
 313 
 314         // create Future and task that will be invoked to acquire lock
 315         PendingFuture<FileLock,A> result =
 316             new PendingFuture<FileLock,A>(this, handler, attachment);
 317         LockTask<A> lockTask = new LockTask<A>(position, fli, result);
 318         result.setContext(lockTask);
 319 
 320         // initiate I/O
 321         if (Iocp.supportsThreadAgnosticIo()) {
 322             lockTask.run();
 323         } else {
 324             boolean executed = false;
 325             try {
 326                 Invoker.invokeOnThreadInThreadPool(this, lockTask);
 327                 executed = true;
 328             } finally {
 329                 if (!executed) {
 330                     // rollback
 331                     removeFromFileLockTable(fli);
 332                 }
 333             }
 334         }
 335         return result;
 336     }
 337 
 338     static final int NO_LOCK = -1;       // Failed to lock
 339     static final int LOCKED = 0;         // Obtained requested lock
 340 
 341     @Override
 342     public FileLock tryLock(long position, long size, boolean shared)
 343         throws IOException
 344     {
 345         if (shared && !reading)
 346             throw new NonReadableChannelException();
 347         if (!shared && !writing)
 348             throw new NonWritableChannelException();
 349 
 350         // add to lock table
 351         final FileLockImpl fli = addToFileLockTable(position, size, shared);
 352         if (fli == null)
 353             throw new ClosedChannelException();
 354 
 355         boolean gotLock = false;
 356         try {
 357             begin();
 358             // try to acquire the lock
 359             int res = nd.lock(fdObj, false, position, size, shared);
 360             if (res == NO_LOCK)
 361                 return null;
 362             gotLock = true;
 363             return fli;
 364         } finally {
 365             if (!gotLock)
 366                 removeFromFileLockTable(fli);
 367             end();
 368         }
 369     }
 370 
 371     @Override
 372     protected void implRelease(FileLockImpl fli) throws IOException {
 373         nd.release(fdObj, fli.position(), fli.size());
 374     }
 375 
 376     /**
 377      * Task that initiates read operation and handles completion result.
 378      */
 379     private class ReadTask<A> implements Runnable, Iocp.ResultHandler {
 380         private final ByteBuffer dst;
 381         private final int pos, rem;     // buffer position/remaining
 382         private final long position;    // file position
 383         private final PendingFuture<Integer,A> result;
 384 
 385         // set to dst if direct; otherwise set to substituted direct buffer
 386         private volatile ByteBuffer buf;
 387 
 388         ReadTask(ByteBuffer dst,
 389                  int pos,
 390                  int rem,
 391                  long position,
 392                  PendingFuture<Integer,A> result)
 393         {
 394             this.dst = dst;
 395             this.pos = pos;
 396             this.rem = rem;
 397             this.position = position;
 398             this.result = result;
 399         }
 400 
 401         void releaseBufferIfSubstituted() {
 402             if (buf != dst)
 403                 Util.releaseTemporaryDirectBuffer(buf);
 404         }
 405 
 406         void updatePosition(int bytesTransferred) {
 407             // if the I/O succeeded then adjust buffer position
 408             if (bytesTransferred > 0) {
 409                 if (buf == dst) {
 410                     try {
 411                         dst.position(pos + bytesTransferred);
 412                     } catch (IllegalArgumentException x) {
 413                         // someone has changed the position; ignore
 414                     }
 415                 } else {
 416                     // had to substitute direct buffer
 417                     buf.position(bytesTransferred).flip();
 418                     try {
 419                         dst.put(buf);
 420                     } catch (BufferOverflowException x) {
 421                         // someone has changed the position; ignore
 422                     }
 423                 }
 424             }
 425         }
 426 
 427         @Override
 428         public void run() {
 429             int n = -1;
 430             long overlapped = 0L;
 431             long address;
 432 
 433             // Substitute a native buffer if not direct
 434             if (dst instanceof DirectBuffer) {
 435                 buf = dst;
 436                 address = ((DirectBuffer)dst).address() + pos;
 437             } else {
 438                 buf = Util.getTemporaryDirectBuffer(rem);
 439                 address = ((DirectBuffer)buf).address();
 440             }
 441 
 442             boolean pending = false;
 443             try {
 444                 begin();
 445 
 446                 // allocate OVERLAPPED
 447                 overlapped = ioCache.add(result);
 448 
 449                 // initiate read
 450                 n = readFile(handle, address, rem, position, overlapped);
 451                 if (n == IOStatus.UNAVAILABLE) {
 452                     // I/O is pending
 453                     pending = true;
 454                     return;
 455                 } else if (n == IOStatus.EOF) {
 456                     result.setResult(n);
 457                 } else {
 458                     throw new InternalError("Unexpected result: " + n);
 459                 }
 460 
 461             } catch (Throwable x) {
 462                 // failed to initiate read
 463                 result.setFailure(toIOException(x));
 464             } finally {
 465                 if (!pending) {
 466                     // release resources
 467                     if (overlapped != 0L)
 468                         ioCache.remove(overlapped);
 469                     releaseBufferIfSubstituted();
 470                 }
 471                 end();
 472             }
 473 
 474             // invoke completion handler
 475             Invoker.invoke(result);
 476         }
 477 
 478         /**
 479          * Executed when the I/O has completed
 480          */
 481         @Override
 482         public void completed(int bytesTransferred, boolean canInvokeDirect) {
 483             updatePosition(bytesTransferred);
 484 
 485             // return direct buffer to cache if substituted
 486             releaseBufferIfSubstituted();
 487 
 488             // release waiters and invoke completion handler
 489             result.setResult(bytesTransferred);
 490             if (canInvokeDirect) {
 491                 Invoker.invokeUnchecked(result);
 492             } else {
 493                 Invoker.invoke(result);
 494             }
 495         }
 496 
 497         @Override
 498         public void failed(int error, IOException x) {
 499             // if EOF detected asynchronously then it is reported as error
 500             if (error == ERROR_HANDLE_EOF) {
 501                 completed(-1, false);
 502             } else {
 503                 // return direct buffer to cache if substituted
 504                 releaseBufferIfSubstituted();
 505 
 506                 // release waiters
 507                 if (isOpen()) {
 508                     result.setFailure(x);
 509                 } else {
 510                     result.setFailure(new AsynchronousCloseException());
 511                 }
 512                 Invoker.invoke(result);
 513             }
 514         }
 515     }
 516 
 517     @Override
 518     <A> Future<Integer> implRead(ByteBuffer dst,
 519                                  long position,
 520                                  A attachment,
 521                                  CompletionHandler<Integer,? super A> handler)
 522     {
 523         if (!reading)
 524             throw new NonReadableChannelException();
 525         if (position < 0)
 526             throw new IllegalArgumentException("Negative position");
 527         if (dst.isReadOnly())
 528             throw new IllegalArgumentException("Read-only buffer");
 529 
 530         // check if channel is closed
 531         if (!isOpen()) {
 532             Throwable exc = new ClosedChannelException();
 533             if (handler == null)
 534                 return CompletedFuture.withFailure(exc);
 535             Invoker.invoke(this, handler, attachment, null, exc);
 536             return null;
 537         }
 538 
 539         int pos = dst.position();
 540         int lim = dst.limit();
 541         assert (pos <= lim);
 542         int rem = (pos <= lim ? lim - pos : 0);
 543 
 544         // no space remaining
 545         if (rem == 0) {
 546             if (handler == null)
 547                 return CompletedFuture.withResult(0);
 548             Invoker.invoke(this, handler, attachment, 0, null);
 549             return null;
 550         }
 551 
 552         // create Future and task that initiates read
 553         PendingFuture<Integer,A> result =
 554             new PendingFuture<Integer,A>(this, handler, attachment);
 555         ReadTask<A> readTask = new ReadTask<A>(dst, pos, rem, position, result);
 556         result.setContext(readTask);
 557 
 558         // initiate I/O
 559         if (Iocp.supportsThreadAgnosticIo()) {
 560             readTask.run();
 561         } else {
 562             Invoker.invokeOnThreadInThreadPool(this, readTask);
 563         }
 564         return result;
 565     }
 566 
 567     /**
 568      * Task that initiates write operation and handles completion result.
 569      */
 570     private class WriteTask<A> implements Runnable, Iocp.ResultHandler {
 571         private final ByteBuffer src;
 572         private final int pos, rem;     // buffer position/remaining
 573         private final long position;    // file position
 574         private final PendingFuture<Integer,A> result;
 575 
 576         // set to src if direct; otherwise set to substituted direct buffer
 577         private volatile ByteBuffer buf;
 578 
 579         WriteTask(ByteBuffer src,
 580                   int pos,
 581                   int rem,
 582                   long position,
 583                   PendingFuture<Integer,A> result)
 584         {
 585             this.src = src;
 586             this.pos = pos;
 587             this.rem = rem;
 588             this.position = position;
 589             this.result = result;
 590         }
 591 
 592         void releaseBufferIfSubstituted() {
 593             if (buf != src)
 594                 Util.releaseTemporaryDirectBuffer(buf);
 595         }
 596 
 597         void updatePosition(int bytesTransferred) {
 598             // if the I/O succeeded then adjust buffer position
 599             if (bytesTransferred > 0) {
 600                 try {
 601                     src.position(pos + bytesTransferred);
 602                 } catch (IllegalArgumentException x) {
 603                     // someone has changed the position
 604                 }
 605             }
 606         }
 607 
 608         @Override
 609         public void run() {
 610             int n = -1;
 611             long overlapped = 0L;
 612             long address;
 613 
 614             // Substitute a native buffer if not direct
 615             if (src instanceof DirectBuffer) {
 616                 buf = src;
 617                 address = ((DirectBuffer)src).address() + pos;
 618             } else {
 619                 buf = Util.getTemporaryDirectBuffer(rem);
 620                 buf.put(src);
 621                 buf.flip();
 622                 // temporarily restore position as we don't know how many bytes
 623                 // will be written
 624                 src.position(pos);
 625                 address = ((DirectBuffer)buf).address();
 626             }
 627 
 628             try {
 629                 begin();
 630 
 631                 // allocate an OVERLAPPED structure
 632                 overlapped = ioCache.add(result);
 633 
 634                 // initiate the write
 635                 n = writeFile(handle, address, rem, position, overlapped);
 636                 if (n == IOStatus.UNAVAILABLE) {
 637                     // I/O is pending
 638                     return;
 639                 } else {
 640                     throw new InternalError("Unexpected result: " + n);
 641                 }
 642 
 643             } catch (Throwable x) {
 644                 // failed to initiate read:
 645                 result.setFailure(toIOException(x));
 646 
 647                 // release resources
 648                 if (overlapped != 0L)
 649                     ioCache.remove(overlapped);
 650                 releaseBufferIfSubstituted();
 651 
 652             } finally {
 653                 end();
 654             }
 655 
 656             // invoke completion handler
 657             Invoker.invoke(result);
 658         }
 659 
 660         /**
 661          * Executed when the I/O has completed
 662          */
 663         @Override
 664         public void completed(int bytesTransferred, boolean canInvokeDirect) {
 665             updatePosition(bytesTransferred);
 666 
 667             // return direct buffer to cache if substituted
 668             releaseBufferIfSubstituted();
 669 
 670             // release waiters and invoke completion handler
 671             result.setResult(bytesTransferred);
 672             if (canInvokeDirect) {
 673                 Invoker.invokeUnchecked(result);
 674             } else {
 675                 Invoker.invoke(result);
 676             }
 677         }
 678 
 679         @Override
 680         public void failed(int error, IOException x) {
 681             // return direct buffer to cache if substituted
 682             releaseBufferIfSubstituted();
 683 
 684             // release waiters and invoker completion handler
 685             if (isOpen()) {
 686                 result.setFailure(x);
 687             } else {
 688                 result.setFailure(new AsynchronousCloseException());
 689             }
 690             Invoker.invoke(result);
 691         }
 692     }
 693 
 694     <A> Future<Integer> implWrite(ByteBuffer src,
 695                                   long position,
 696                                   A attachment,
 697                                   CompletionHandler<Integer,? super A> handler)
 698     {
 699         if (!writing)
 700             throw new NonWritableChannelException();
 701         if (position < 0)
 702             throw new IllegalArgumentException("Negative position");
 703 
 704         // check if channel is closed
 705         if (!isOpen()) {
 706            Throwable exc = new ClosedChannelException();
 707             if (handler == null)
 708                 return CompletedFuture.withFailure(exc);
 709             Invoker.invoke(this, handler, attachment, null, exc);
 710             return null;
 711         }
 712 
 713         int pos = src.position();
 714         int lim = src.limit();
 715         assert (pos <= lim);
 716         int rem = (pos <= lim ? lim - pos : 0);
 717 
 718         // nothing to write
 719         if (rem == 0) {
 720             if (handler == null)
 721                 return CompletedFuture.withResult(0);
 722             Invoker.invoke(this, handler, attachment, 0, null);
 723             return null;
 724         }
 725 
 726         // create Future and task to initiate write
 727         PendingFuture<Integer,A> result =
 728             new PendingFuture<Integer,A>(this, handler, attachment);
 729         WriteTask<A> writeTask = new WriteTask<A>(src, pos, rem, position, result);
 730         result.setContext(writeTask);
 731 
 732         // initiate I/O
 733         if (Iocp.supportsThreadAgnosticIo()) {
 734             writeTask.run();
 735         } else {
 736             Invoker.invokeOnThreadInThreadPool(this, writeTask);
 737         }
 738         return result;
 739     }
 740 
 741     // -- Native methods --
 742 
 743     private static native int readFile(long handle, long address, int len,
 744         long offset, long overlapped) throws IOException;
 745 
 746     private static native int writeFile(long handle, long address, int len,
 747         long offset, long overlapped) throws IOException;
 748 
 749     private static native int lockFile(long handle, long position, long size,
 750         boolean shared, long overlapped) throws IOException;
 751 
 752     private static native void close0(long handle);
 753 
 754     static {
 755         IOUtil.load();
 756     }
 757 }