1 /*
   2  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package sun.nio.ch;
  27 
  28 import java.nio.channels.*;
  29 import java.util.concurrent.*;
  30 import java.nio.ByteBuffer;
  31 import java.nio.BufferOverflowException;
  32 import java.io.IOException;
  33 import java.io.FileDescriptor;
  34 import sun.misc.SharedSecrets;
  35 import sun.misc.JavaIOFileDescriptorAccess;
  36 
  37 /**
  38  * Windows implementation of AsynchronousFileChannel using overlapped I/O.
  39  */
  40 
  41 public class WindowsAsynchronousFileChannelImpl
  42     extends AsynchronousFileChannelImpl
  43     implements Iocp.OverlappedChannel, Groupable
  44 {
  45     private static final JavaIOFileDescriptorAccess fdAccess =
  46         SharedSecrets.getJavaIOFileDescriptorAccess();
  47 
  48     // error when EOF is detected asynchronously.
  49     private static final int ERROR_HANDLE_EOF = 38;
  50 
  51     // Lazy initialization of default I/O completion port
  52     private static class DefaultIocpHolder {
  53         static final Iocp defaultIocp = defaultIocp();
  54         private static Iocp defaultIocp() {
  55             try {
  56                 return new Iocp(null, ThreadPool.createDefault()).start();
  57             } catch (IOException ioe) {
  58                 InternalError e = new InternalError();
  59                 e.initCause(ioe);
  60                 throw e;
  61             }
  62         }
  63     }
  64 
  65     // Used for force/truncate/size methods
  66     private static final FileDispatcher nd = new FileDispatcherImpl();
  67 
  68     // The handle is extracted for use in native methods invoked from this class.
  69     private final long handle;
  70 
  71     // The key that identifies the channel's association with the I/O port
  72     private final int completionKey;
  73 
  74     // I/O completion port (group)
  75     private final Iocp iocp;
  76 
  77     private final boolean isDefaultIocp;
  78 
  79     // Caches OVERLAPPED structure for each outstanding I/O operation
  80     private final PendingIoCache ioCache;
  81 
  82 
  83     private WindowsAsynchronousFileChannelImpl(FileDescriptor fdObj,
  84                                                boolean reading,
  85                                                boolean writing,
  86                                                Iocp iocp,
  87                                                boolean isDefaultIocp)
  88         throws IOException
  89     {
  90         super(fdObj, reading, writing, iocp.executor());
  91         this.handle = fdAccess.getHandle(fdObj);
  92         this.iocp = iocp;
  93         this.isDefaultIocp = isDefaultIocp;
  94         this.ioCache = new PendingIoCache();
  95         this.completionKey = iocp.associate(this, handle);
  96     }
  97 
  98     public static AsynchronousFileChannel open(FileDescriptor fdo,
  99                                                boolean reading,
 100                                                boolean writing,
 101                                                ThreadPool pool)
 102         throws IOException
 103     {
 104         Iocp iocp;
 105         boolean isDefaultIocp;
 106         if (pool == null) {
 107             iocp = DefaultIocpHolder.defaultIocp;
 108             isDefaultIocp = true;
 109         } else {
 110             iocp = new Iocp(null, pool).start();
 111             isDefaultIocp = false;
 112         }
 113         try {
 114             return new
 115                 WindowsAsynchronousFileChannelImpl(fdo, reading, writing, iocp, isDefaultIocp);
 116         } catch (IOException x) {
 117             // error binding to port so need to close it (if created for this channel)
 118             if (!isDefaultIocp)
 119                 iocp.implClose();
 120             throw x;
 121         }
 122     }
 123 
 124     @Override
 125     public <V,A> PendingFuture<V,A> getByOverlapped(long overlapped) {
 126         return ioCache.remove(overlapped);
 127     }
 128 
 129     @Override
 130     public void close() throws IOException {
 131         closeLock.writeLock().lock();
 132         try {
 133             if (closed)
 134                 return;     // already closed
 135             closed = true;
 136         } finally {
 137             closeLock.writeLock().unlock();
 138         }
 139 
 140         // invalidate all locks held for this channel
 141         invalidateAllLocks();
 142 
 143         // close the file
 144         close0(handle);
 145 
 146         // waits until all I/O operations have completed
 147         ioCache.close();
 148 
 149         // disassociate from port
 150         iocp.disassociate(completionKey);
 151 
 152         // for the non-default group close the port
 153         if (!isDefaultIocp)
 154             iocp.detachFromThreadPool();
 155     }
 156 
 157     @Override
 158     public AsynchronousChannelGroupImpl group() {
 159         return iocp;
 160     }
 161 
 162     /**
 163      * Translates Throwable to IOException
 164      */
 165     private static IOException toIOException(Throwable x) {
 166         if (x instanceof IOException) {
 167             if (x instanceof ClosedChannelException)
 168                 x = new AsynchronousCloseException();
 169             return (IOException)x;
 170         }
 171         return new IOException(x);
 172     }
 173 
 174     @Override
 175     public long size() throws IOException {
 176         try {
 177             begin();
 178             return nd.size(fdObj);
 179         } finally {
 180             end();
 181         }
 182     }
 183 
 184     @Override
 185     public AsynchronousFileChannel truncate(long size) throws IOException {
 186         if (size < 0)
 187             throw new IllegalArgumentException("Negative size");
 188         if (!writing)
 189             throw new NonWritableChannelException();
 190         try {
 191             begin();
 192             if (size > nd.size(fdObj))
 193                 return this;
 194             nd.truncate(fdObj, size);
 195         } finally {
 196             end();
 197         }
 198         return this;
 199     }
 200 
 201     @Override
 202     public void force(boolean metaData) throws IOException {
 203         try {
 204             begin();
 205             nd.force(fdObj, metaData);
 206         } finally {
 207             end();
 208         }
 209     }
 210 
 211     // -- file locking --
 212 
 213     /**
 214      * Task that initiates locking operation and handles completion result.
 215      */
 216     private class LockTask<A> implements Runnable, Iocp.ResultHandler {
 217         private final long position;
 218         private final FileLockImpl fli;
 219         private final PendingFuture<FileLock,A> result;
 220 
 221         LockTask(long position,
 222                  FileLockImpl fli,
 223                  PendingFuture<FileLock,A> result)
 224         {
 225             this.position = position;
 226             this.fli = fli;
 227             this.result = result;
 228         }
 229 
 230         @Override
 231         public void run() {
 232             long overlapped = 0L;
 233             try {
 234                 begin();
 235 
 236                 // allocate OVERLAPPED structure
 237                 overlapped = ioCache.add(result);
 238 
 239                 // synchronize on result to avoid race with handler thread
 240                 // when lock is acquired immediately.
 241                 synchronized (result) {
 242                     int n = lockFile(handle, position, fli.size(), fli.isShared(),
 243                                      overlapped);
 244                     if (n == IOStatus.UNAVAILABLE) {
 245                         // I/O is pending
 246                         return;
 247                     }
 248                     // acquired lock immediately
 249                     result.setResult(fli);
 250                 }
 251 
 252             } catch (Throwable x) {
 253                 // lock failed or channel closed
 254                 removeFromFileLockTable(fli);
 255                 if (overlapped != 0L)
 256                     ioCache.remove(overlapped);
 257                 result.setFailure(toIOException(x));
 258             } finally {
 259                 end();
 260             }
 261 
 262             // invoke completion handler
 263             Invoker.invoke(result);
 264         }
 265 
 266         @Override
 267         public void completed(int bytesTransferred, boolean canInvokeDirect) {
 268             // release waiters and invoke completion handler
 269             result.setResult(fli);
 270             if (canInvokeDirect) {
 271                 Invoker.invokeUnchecked(result);
 272             } else {
 273                 Invoker.invoke(result);
 274             }
 275         }
 276 
 277         @Override
 278         public void failed(int error, IOException x) {
 279             // lock not acquired so remove from lock table
 280             removeFromFileLockTable(fli);
 281 
 282             // release waiters
 283             if (isOpen()) {
 284                 result.setFailure(x);
 285             } else {
 286                 result.setFailure(new AsynchronousCloseException());
 287             }
 288             Invoker.invoke(result);
 289         }
 290     }
 291 
 292     @Override
 293     <A> Future<FileLock> implLock(final long position,
 294                                   final long size,
 295                                   final boolean shared,
 296                                   A attachment,
 297                                   final CompletionHandler<FileLock,? super A> handler)
 298     {
 299         if (shared && !reading)
 300             throw new NonReadableChannelException();
 301         if (!shared && !writing)
 302             throw new NonWritableChannelException();
 303 
 304         // add to lock table
 305         FileLockImpl fli = addToFileLockTable(position, size, shared);
 306         if (fli == null) {
 307             Throwable exc = new ClosedChannelException();
 308             if (handler == null)
 309                 return CompletedFuture.withFailure(exc);
 310             Invoker.invoke(this, handler, attachment, null, exc);
 311             return null;
 312         }
 313 
 314         // create Future and task that will be invoked to acquire lock
 315         PendingFuture<FileLock,A> result =
 316             new PendingFuture<FileLock,A>(this, handler, attachment);
 317         LockTask lockTask = new LockTask<A>(position, fli, result);
 318         result.setContext(lockTask);
 319 
 320         // initiate I/O
 321         if (Iocp.supportsThreadAgnosticIo()) {
 322             lockTask.run();
 323         } else {
 324             boolean executed = false;
 325             try {
 326                 Invoker.invokeOnThreadInThreadPool(this, lockTask);
 327                 executed = true;
 328             } finally {
 329                 if (!executed) {
 330                     // rollback
 331                     removeFromFileLockTable(fli);
 332                 }
 333             }
 334         }
 335         return result;
 336     }
 337 
 338     static final int NO_LOCK = -1;       // Failed to lock
 339     static final int LOCKED = 0;         // Obtained requested lock
 340 
 341     @Override
 342     public FileLock tryLock(long position, long size, boolean shared)
 343         throws IOException
 344     {
 345         if (shared && !reading)
 346             throw new NonReadableChannelException();
 347         if (!shared && !writing)
 348             throw new NonWritableChannelException();
 349 
 350         // add to lock table
 351         final FileLockImpl fli = addToFileLockTable(position, size, shared);
 352         if (fli == null)
 353             throw new ClosedChannelException();
 354 
 355         boolean gotLock = false;
 356         try {
 357             begin();
 358             // try to acquire the lock
 359             int res = nd.lock(fdObj, false, position, size, shared);
 360             if (res == NO_LOCK)
 361                 return null;
 362             gotLock = true;
 363             return fli;
 364         } finally {
 365             if (!gotLock)
 366                 removeFromFileLockTable(fli);
 367             end();
 368         }
 369     }
 370 
 371     @Override
 372     protected void implRelease(FileLockImpl fli) throws IOException {
 373         nd.release(fdObj, fli.position(), fli.size());
 374     }
 375 
 376     /**
 377      * Task that initiates read operation and handles completion result.
 378      */
 379     private class ReadTask<A> implements Runnable, Iocp.ResultHandler {
 380         private final ByteBuffer dst;
 381         private final int pos, rem;     // buffer position/remaining
 382         private final long position;    // file position
 383         private final PendingFuture<Integer,A> result;
 384 
 385         // set to dst if direct; otherwise set to substituted direct buffer
 386         private volatile ByteBuffer buf;
 387 
 388         ReadTask(ByteBuffer dst,
 389                  int pos,
 390                  int rem,
 391                  long position,
 392                  PendingFuture<Integer,A> result)
 393         {
 394             this.dst = dst;
 395             this.pos = pos;
 396             this.rem = rem;
 397             this.position = position;
 398             this.result = result;
 399         }
 400 
 401         void releaseBufferIfSubstituted() {
 402             if (buf != dst)
 403                 Util.releaseTemporaryDirectBuffer(buf);
 404         }
 405 
 406         void updatePosition(int bytesTransferred) {
 407             // if the I/O succeeded then adjust buffer position
 408             if (bytesTransferred > 0) {
 409                 if (buf == dst) {
 410                     try {
 411                         dst.position(pos + bytesTransferred);
 412                     } catch (IllegalArgumentException x) {
 413                         // someone has changed the position; ignore
 414                     }
 415                 } else {
 416                     // had to substitute direct buffer
 417                     buf.position(bytesTransferred).flip();
 418                     try {
 419                         dst.put(buf);
 420                     } catch (BufferOverflowException x) {
 421                         // someone has changed the position; ignore
 422                     }
 423                 }
 424             }
 425         }
 426 
 427         @Override
 428         public void run() {
 429             int n = -1;
 430             long overlapped = 0L;
 431             long address;
 432 
 433             // Substitute a native buffer if not direct
 434             if (dst instanceof DirectBuffer) {
 435                 buf = dst;
 436                 address = ((DirectBuffer)dst).address() + pos;
 437             } else {
 438                 buf = Util.getTemporaryDirectBuffer(rem);
 439                 address = ((DirectBuffer)buf).address();
 440             }
 441 
 442             try {
 443                 begin();
 444 
 445                 // allocate OVERLAPPED
 446                 overlapped = ioCache.add(result);
 447 
 448                 // initiate read
 449                 n = readFile(handle, address, rem, position, overlapped);
 450                 if (n == IOStatus.UNAVAILABLE) {
 451                     // I/O is pending
 452                     return;
 453                 } else if (n == IOStatus.EOF) {
 454                     result.setResult(n);
 455                 } else {
 456                     throw new InternalError("Unexpected result: " + n);
 457                 }
 458 
 459             } catch (Throwable x) {
 460                 // failed to initiate read
 461                 result.setFailure(toIOException(x));
 462             } finally {
 463                 end();
 464             }
 465 
 466             // release resources
 467             if (overlapped != 0L)
 468                 ioCache.remove(overlapped);
 469             releaseBufferIfSubstituted();
 470 
 471             // invoke completion handler
 472             Invoker.invoke(result);
 473         }
 474 
 475         /**
 476          * Executed when the I/O has completed
 477          */
 478         @Override
 479         public void completed(int bytesTransferred, boolean canInvokeDirect) {
 480             updatePosition(bytesTransferred);
 481 
 482             // return direct buffer to cache if substituted
 483             releaseBufferIfSubstituted();
 484 
 485             // release waiters and invoke completion handler
 486             result.setResult(bytesTransferred);
 487             if (canInvokeDirect) {
 488                 Invoker.invokeUnchecked(result);
 489             } else {
 490                 Invoker.invoke(result);
 491             }
 492         }
 493 
 494         @Override
 495         public void failed(int error, IOException x) {
 496             // if EOF detected asynchronously then it is reported as error
 497             if (error == ERROR_HANDLE_EOF) {
 498                 completed(-1, false);
 499             } else {
 500                 // return direct buffer to cache if substituted
 501                 releaseBufferIfSubstituted();
 502 
 503                 // release waiters
 504                 if (isOpen()) {
 505                     result.setFailure(x);
 506                 } else {
 507                     result.setFailure(new AsynchronousCloseException());
 508                 }
 509                 Invoker.invoke(result);
 510             }
 511         }
 512     }
 513 
 514     @Override
 515     <A> Future<Integer> implRead(ByteBuffer dst,
 516                                  long position,
 517                                  A attachment,
 518                                  CompletionHandler<Integer,? super A> handler)
 519     {
 520         if (!reading)
 521             throw new NonReadableChannelException();
 522         if (position < 0)
 523             throw new IllegalArgumentException("Negative position");
 524         if (dst.isReadOnly())
 525             throw new IllegalArgumentException("Read-only buffer");
 526 
 527         // check if channel is closed
 528         if (!isOpen()) {
 529             Throwable exc = new ClosedChannelException();
 530             if (handler == null)
 531                 return CompletedFuture.withFailure(exc);
 532             Invoker.invoke(this, handler, attachment, null, exc);
 533             return null;
 534         }
 535 
 536         int pos = dst.position();
 537         int lim = dst.limit();
 538         assert (pos <= lim);
 539         int rem = (pos <= lim ? lim - pos : 0);
 540 
 541         // no space remaining
 542         if (rem == 0) {
 543             if (handler == null)
 544                 return CompletedFuture.withResult(0);
 545             Invoker.invoke(this, handler, attachment, 0, null);
 546             return null;
 547         }
 548 
 549         // create Future and task that initiates read
 550         PendingFuture<Integer,A> result =
 551             new PendingFuture<Integer,A>(this, handler, attachment);
 552         ReadTask readTask = new ReadTask<A>(dst, pos, rem, position, result);
 553         result.setContext(readTask);
 554 
 555         // initiate I/O
 556         if (Iocp.supportsThreadAgnosticIo()) {
 557             readTask.run();
 558         } else {
 559             Invoker.invokeOnThreadInThreadPool(this, readTask);
 560         }
 561         return result;
 562     }
 563 
 564     /**
 565      * Task that initiates write operation and handles completion result.
 566      */
 567     private class WriteTask<A> implements Runnable, Iocp.ResultHandler {
 568         private final ByteBuffer src;
 569         private final int pos, rem;     // buffer position/remaining
 570         private final long position;    // file position
 571         private final PendingFuture<Integer,A> result;
 572 
 573         // set to src if direct; otherwise set to substituted direct buffer
 574         private volatile ByteBuffer buf;
 575 
 576         WriteTask(ByteBuffer src,
 577                   int pos,
 578                   int rem,
 579                   long position,
 580                   PendingFuture<Integer,A> result)
 581         {
 582             this.src = src;
 583             this.pos = pos;
 584             this.rem = rem;
 585             this.position = position;
 586             this.result = result;
 587         }
 588 
 589         void releaseBufferIfSubstituted() {
 590             if (buf != src)
 591                 Util.releaseTemporaryDirectBuffer(buf);
 592         }
 593 
 594         void updatePosition(int bytesTransferred) {
 595             // if the I/O succeeded then adjust buffer position
 596             if (bytesTransferred > 0) {
 597                 try {
 598                     src.position(pos + bytesTransferred);
 599                 } catch (IllegalArgumentException x) {
 600                     // someone has changed the position
 601                 }
 602             }
 603         }
 604 
 605         @Override
 606         public void run() {
 607             int n = -1;
 608             long overlapped = 0L;
 609             long address;
 610 
 611             // Substitute a native buffer if not direct
 612             if (src instanceof DirectBuffer) {
 613                 buf = src;
 614                 address = ((DirectBuffer)src).address() + pos;
 615             } else {
 616                 buf = Util.getTemporaryDirectBuffer(rem);
 617                 buf.put(src);
 618                 buf.flip();
 619                 // temporarily restore position as we don't know how many bytes
 620                 // will be written
 621                 src.position(pos);
 622                 address = ((DirectBuffer)buf).address();
 623             }
 624 
 625             try {
 626                 begin();
 627 
 628                 // allocate an OVERLAPPED structure
 629                 overlapped = ioCache.add(result);
 630 
 631                 // initiate the write
 632                 n = writeFile(handle, address, rem, position, overlapped);
 633                 if (n == IOStatus.UNAVAILABLE) {
 634                     // I/O is pending
 635                     return;
 636                 } else {
 637                     throw new InternalError("Unexpected result: " + n);
 638                 }
 639 
 640             } catch (Throwable x) {
 641                 // failed to initiate read:
 642                 result.setFailure(toIOException(x));
 643 
 644                 // release resources
 645                 if (overlapped != 0L)
 646                     ioCache.remove(overlapped);
 647                 releaseBufferIfSubstituted();
 648 
 649             } finally {
 650                 end();
 651             }
 652 
 653             // invoke completion handler
 654             Invoker.invoke(result);
 655         }
 656 
 657         /**
 658          * Executed when the I/O has completed
 659          */
 660         @Override
 661         public void completed(int bytesTransferred, boolean canInvokeDirect) {
 662             updatePosition(bytesTransferred);
 663 
 664             // return direct buffer to cache if substituted
 665             releaseBufferIfSubstituted();
 666 
 667             // release waiters and invoke completion handler
 668             result.setResult(bytesTransferred);
 669             if (canInvokeDirect) {
 670                 Invoker.invokeUnchecked(result);
 671             } else {
 672                 Invoker.invoke(result);
 673             }
 674         }
 675 
 676         @Override
 677         public void failed(int error, IOException x) {
 678             // return direct buffer to cache if substituted
 679             releaseBufferIfSubstituted();
 680 
 681             // release waiters and invoker completion handler
 682             if (isOpen()) {
 683                 result.setFailure(x);
 684             } else {
 685                 result.setFailure(new AsynchronousCloseException());
 686             }
 687             Invoker.invoke(result);
 688         }
 689     }
 690 
 691     <A> Future<Integer> implWrite(ByteBuffer src,
 692                                   long position,
 693                                   A attachment,
 694                                   CompletionHandler<Integer,? super A> handler)
 695     {
 696         if (!writing)
 697             throw new NonWritableChannelException();
 698         if (position < 0)
 699             throw new IllegalArgumentException("Negative position");
 700 
 701         // check if channel is closed
 702         if (!isOpen()) {
 703            Throwable exc = new ClosedChannelException();
 704             if (handler == null)
 705                 return CompletedFuture.withFailure(exc);
 706             Invoker.invoke(this, handler, attachment, null, exc);
 707             return null;
 708         }
 709 
 710         int pos = src.position();
 711         int lim = src.limit();
 712         assert (pos <= lim);
 713         int rem = (pos <= lim ? lim - pos : 0);
 714 
 715         // nothing to write
 716         if (rem == 0) {
 717             if (handler == null)
 718                 return CompletedFuture.withResult(0);
 719             Invoker.invoke(this, handler, attachment, 0, null);
 720             return null;
 721         }
 722 
 723         // create Future and task to initiate write
 724         PendingFuture<Integer,A> result =
 725             new PendingFuture<Integer,A>(this, handler, attachment);
 726         WriteTask writeTask = new WriteTask<A>(src, pos, rem, position, result);
 727         result.setContext(writeTask);
 728 
 729         // initiate I/O
 730         if (Iocp.supportsThreadAgnosticIo()) {
 731             writeTask.run();
 732         } else {
 733             Invoker.invokeOnThreadInThreadPool(this, writeTask);
 734         }
 735         return result;
 736     }
 737 
 738     // -- Native methods --
 739 
 740     private static native int readFile(long handle, long address, int len,
 741         long offset, long overlapped) throws IOException;
 742 
 743     private static native int writeFile(long handle, long address, int len,
 744         long offset, long overlapped) throws IOException;
 745 
 746     private static native int lockFile(long handle, long position, long size,
 747         boolean shared, long overlapped) throws IOException;
 748 
 749     private static native void close0(long handle);
 750 
 751     static {
 752         Util.load();
 753     }
 754 }