1 /*
   2  * Copyright (c) 2008, 2013, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package sun.nio.ch;
  27 
  28 import java.nio.channels.*;
  29 import java.util.concurrent.*;
  30 import java.nio.ByteBuffer;
  31 import java.nio.BufferOverflowException;
  32 import java.io.IOException;
  33 import java.io.FileDescriptor;
  34 import jdk.internal.misc.SharedSecrets;
  35 import jdk.internal.misc.JavaIOFileDescriptorAccess;
  36 
  37 /**
  38  * Windows implementation of AsynchronousFileChannel using overlapped I/O.
  39  */
  40 
  41 public class WindowsAsynchronousFileChannelImpl
  42     extends AsynchronousFileChannelImpl
  43     implements Iocp.OverlappedChannel, Groupable
  44 {
  45     private static final JavaIOFileDescriptorAccess fdAccess =
  46         SharedSecrets.getJavaIOFileDescriptorAccess();
  47 
  48     // error when EOF is detected asynchronously.
  49     private static final int ERROR_HANDLE_EOF = 38;
  50 
  51     // Lazy initialization of default I/O completion port
  52     private static class DefaultIocpHolder {
  53         static final Iocp defaultIocp = defaultIocp();
  54         private static Iocp defaultIocp() {
  55             try {
  56                 return new Iocp(null, ThreadPool.createDefault()).start();
  57             } catch (IOException ioe) {
  58                 throw new InternalError(ioe);
  59             }
  60         }
  61     }
  62 
  63     // Used for force/truncate/size methods
  64     private static final FileDispatcher nd = new FileDispatcherImpl();
  65 
  66     // The handle is extracted for use in native methods invoked from this class.
  67     private final long handle;
  68 
  69     // The key that identifies the channel's association with the I/O port
  70     private final int completionKey;
  71 
  72     // I/O completion port (group)
  73     private final Iocp iocp;
  74 
  75     private final boolean isDefaultIocp;
  76 
  77     // Caches OVERLAPPED structure for each outstanding I/O operation
  78     private final PendingIoCache ioCache;
  79 
  80 
  81     private WindowsAsynchronousFileChannelImpl(FileDescriptor fdObj,
  82                                                boolean reading,
  83                                                boolean writing,
  84                                                Iocp iocp,
  85                                                boolean isDefaultIocp)
  86         throws IOException
  87     {
  88         super(fdObj, reading, writing, iocp.executor());
  89         this.handle = fdAccess.getHandle(fdObj);
  90         this.iocp = iocp;
  91         this.isDefaultIocp = isDefaultIocp;
  92         this.ioCache = new PendingIoCache();
  93         this.completionKey = iocp.associate(this, handle);
  94     }
  95 
  96     public static AsynchronousFileChannel open(FileDescriptor fdo,
  97                                                boolean reading,
  98                                                boolean writing,
  99                                                ThreadPool pool)
 100         throws IOException
 101     {
 102         Iocp iocp;
 103         boolean isDefaultIocp;
 104         if (pool == null) {
 105             iocp = DefaultIocpHolder.defaultIocp;
 106             isDefaultIocp = true;
 107         } else {
 108             iocp = new Iocp(null, pool).start();
 109             isDefaultIocp = false;
 110         }
 111         try {
 112             return new
 113                 WindowsAsynchronousFileChannelImpl(fdo, reading, writing, iocp, isDefaultIocp);
 114         } catch (IOException x) {
 115             // error binding to port so need to close it (if created for this channel)
 116             if (!isDefaultIocp)
 117                 iocp.implClose();
 118             throw x;
 119         }
 120     }
 121 
 122     @Override
 123     public <V,A> PendingFuture<V,A> getByOverlapped(long overlapped) {
 124         return ioCache.remove(overlapped);
 125     }
 126 
 127     @Override
 128     public void close() throws IOException {
 129         closeLock.writeLock().lock();
 130         try {
 131             if (closed)
 132                 return;     // already closed
 133             closed = true;
 134         } finally {
 135             closeLock.writeLock().unlock();
 136         }
 137 
 138         // invalidate all locks held for this channel
 139         invalidateAllLocks();
 140 
 141         // close the file
 142         close0(handle);
 143 
 144         // waits until all I/O operations have completed
 145         ioCache.close();
 146 
 147         // disassociate from port
 148         iocp.disassociate(completionKey);
 149 
 150         // for the non-default group close the port
 151         if (!isDefaultIocp)
 152             iocp.detachFromThreadPool();
 153     }
 154 
 155     @Override
 156     public AsynchronousChannelGroupImpl group() {
 157         return iocp;
 158     }
 159 
 160     /**
 161      * Translates Throwable to IOException
 162      */
 163     private static IOException toIOException(Throwable x) {
 164         if (x instanceof IOException) {
 165             if (x instanceof ClosedChannelException)
 166                 x = new AsynchronousCloseException();
 167             return (IOException)x;
 168         }
 169         return new IOException(x);
 170     }
 171 
 172     @Override
 173     public long size() throws IOException {
 174         try {
 175             begin();
 176             return nd.size(fdObj);
 177         } finally {
 178             end();
 179         }
 180     }
 181 
 182     @Override
 183     public AsynchronousFileChannel truncate(long size) throws IOException {
 184         if (size < 0)
 185             throw new IllegalArgumentException("Negative size");
 186         if (!writing)
 187             throw new NonWritableChannelException();
 188         try {
 189             begin();
 190             if (size > nd.size(fdObj))
 191                 return this;
 192             nd.truncate(fdObj, size);
 193         } finally {
 194             end();
 195         }
 196         return this;
 197     }
 198 
 199     @Override
 200     public void force(boolean metaData) throws IOException {
 201         try {
 202             begin();
 203             nd.force(fdObj, metaData);
 204         } finally {
 205             end();
 206         }
 207     }
 208 
 209     // -- file locking --
 210 
 211     /**
 212      * Task that initiates locking operation and handles completion result.
 213      */
 214     private class LockTask<A> implements Runnable, Iocp.ResultHandler {
 215         private final long position;
 216         private final FileLockImpl fli;
 217         private final PendingFuture<FileLock,A> result;
 218 
 219         LockTask(long position,
 220                  FileLockImpl fli,
 221                  PendingFuture<FileLock,A> result)
 222         {
 223             this.position = position;
 224             this.fli = fli;
 225             this.result = result;
 226         }
 227 
 228         @Override
 229         public void run() {
 230             long overlapped = 0L;
 231             boolean pending = false;
 232             try {
 233                 begin();
 234 
 235                 // allocate OVERLAPPED structure
 236                 overlapped = ioCache.add(result);
 237 
 238                 // synchronize on result to avoid race with handler thread
 239                 // when lock is acquired immediately.
 240                 synchronized (result) {
 241                     int n = lockFile(handle, position, fli.size(), fli.isShared(),
 242                                      overlapped);
 243                     if (n == IOStatus.UNAVAILABLE) {
 244                         // I/O is pending
 245                         pending = true;
 246                         return;
 247                     }
 248                     // acquired lock immediately
 249                     result.setResult(fli);
 250                 }
 251 
 252             } catch (Throwable x) {
 253                 // lock failed or channel closed
 254                 removeFromFileLockTable(fli);
 255                 result.setFailure(toIOException(x));
 256             } finally {
 257                 if (!pending && overlapped != 0L)
 258                     ioCache.remove(overlapped);
 259                 end();
 260             }
 261 
 262             // invoke completion handler
 263             Invoker.invoke(result);
 264         }
 265 
 266         @Override
 267         public void completed(int bytesTransferred, boolean canInvokeDirect) {
 268             // release waiters and invoke completion handler
 269             result.setResult(fli);
 270             if (canInvokeDirect) {
 271                 Invoker.invokeUnchecked(result);
 272             } else {
 273                 Invoker.invoke(result);
 274             }
 275         }
 276 
 277         @Override
 278         public void failed(int error, IOException x) {
 279             // lock not acquired so remove from lock table
 280             removeFromFileLockTable(fli);
 281 
 282             // release waiters
 283             if (isOpen()) {
 284                 result.setFailure(x);
 285             } else {
 286                 result.setFailure(new AsynchronousCloseException());
 287             }
 288             Invoker.invoke(result);
 289         }
 290     }
 291 
 292     @Override
 293     <A> Future<FileLock> implLock(final long position,
 294                                   final long size,
 295                                   final boolean shared,
 296                                   A attachment,
 297                                   final CompletionHandler<FileLock,? super A> handler)
 298     {
 299         if (shared && !reading)
 300             throw new NonReadableChannelException();
 301         if (!shared && !writing)
 302             throw new NonWritableChannelException();
 303 
 304         // add to lock table
 305         FileLockImpl fli = addToFileLockTable(position, size, shared);
 306         if (fli == null) {
 307             Throwable exc = new ClosedChannelException();
 308             if (handler == null)
 309                 return CompletedFuture.withFailure(exc);
 310             Invoker.invoke(this, handler, attachment, null, exc);
 311             return null;
 312         }
 313 
 314         // create Future and task that will be invoked to acquire lock
 315         PendingFuture<FileLock,A> result =
 316             new PendingFuture<FileLock,A>(this, handler, attachment);
 317         LockTask<A> lockTask = new LockTask<A>(position, fli, result);
 318         result.setContext(lockTask);
 319 
 320         // initiate I/O
 321         lockTask.run();
 322         return result;
 323     }
 324 
 325     static final int NO_LOCK = -1;       // Failed to lock
 326     static final int LOCKED = 0;         // Obtained requested lock
 327 
 328     @Override
 329     public FileLock tryLock(long position, long size, boolean shared)
 330         throws IOException
 331     {
 332         if (shared && !reading)
 333             throw new NonReadableChannelException();
 334         if (!shared && !writing)
 335             throw new NonWritableChannelException();
 336 
 337         // add to lock table
 338         final FileLockImpl fli = addToFileLockTable(position, size, shared);
 339         if (fli == null)
 340             throw new ClosedChannelException();
 341 
 342         boolean gotLock = false;
 343         try {
 344             begin();
 345             // try to acquire the lock
 346             int res = nd.lock(fdObj, false, position, size, shared);
 347             if (res == NO_LOCK)
 348                 return null;
 349             gotLock = true;
 350             return fli;
 351         } finally {
 352             if (!gotLock)
 353                 removeFromFileLockTable(fli);
 354             end();
 355         }
 356     }
 357 
 358     @Override
 359     protected void implRelease(FileLockImpl fli) throws IOException {
 360         nd.release(fdObj, fli.position(), fli.size());
 361     }
 362 
 363     /**
 364      * Task that initiates read operation and handles completion result.
 365      */
 366     private class ReadTask<A> implements Runnable, Iocp.ResultHandler {
 367         private final ByteBuffer dst;
 368         private final int pos, rem;     // buffer position/remaining
 369         private final long position;    // file position
 370         private final PendingFuture<Integer,A> result;
 371 
 372         // set to dst if direct; otherwise set to substituted direct buffer
 373         private volatile ByteBuffer buf;
 374 
 375         ReadTask(ByteBuffer dst,
 376                  int pos,
 377                  int rem,
 378                  long position,
 379                  PendingFuture<Integer,A> result)
 380         {
 381             this.dst = dst;
 382             this.pos = pos;
 383             this.rem = rem;
 384             this.position = position;
 385             this.result = result;
 386         }
 387 
 388         void releaseBufferIfSubstituted() {
 389             if (buf != dst)
 390                 Util.releaseTemporaryDirectBuffer(buf);
 391         }
 392 
 393         void updatePosition(int bytesTransferred) {
 394             // if the I/O succeeded then adjust buffer position
 395             if (bytesTransferred > 0) {
 396                 if (buf == dst) {
 397                     try {
 398                         dst.position(pos + bytesTransferred);
 399                     } catch (IllegalArgumentException x) {
 400                         // someone has changed the position; ignore
 401                     }
 402                 } else {
 403                     // had to substitute direct buffer
 404                     buf.position(bytesTransferred).flip();
 405                     try {
 406                         dst.put(buf);
 407                     } catch (BufferOverflowException x) {
 408                         // someone has changed the position; ignore
 409                     }
 410                 }
 411             }
 412         }
 413 
 414         @Override
 415         public void run() {
 416             int n = -1;
 417             long overlapped = 0L;
 418             long address;
 419 
 420             // Substitute a native buffer if not direct
 421             if (dst instanceof DirectBuffer) {
 422                 buf = dst;
 423                 address = ((DirectBuffer)dst).address() + pos;
 424             } else {
 425                 buf = Util.getTemporaryDirectBuffer(rem);
 426                 address = ((DirectBuffer)buf).address();
 427             }
 428 
 429             boolean pending = false;
 430             try {
 431                 begin();
 432 
 433                 // allocate OVERLAPPED
 434                 overlapped = ioCache.add(result);
 435 
 436                 // initiate read
 437                 n = readFile(handle, address, rem, position, overlapped);
 438                 if (n == IOStatus.UNAVAILABLE) {
 439                     // I/O is pending
 440                     pending = true;
 441                     return;
 442                 } else if (n == IOStatus.EOF) {
 443                     result.setResult(n);
 444                 } else {
 445                     throw new InternalError("Unexpected result: " + n);
 446                 }
 447 
 448             } catch (Throwable x) {
 449                 // failed to initiate read
 450                 result.setFailure(toIOException(x));
 451             } finally {
 452                 if (!pending) {
 453                     // release resources
 454                     if (overlapped != 0L)
 455                         ioCache.remove(overlapped);
 456                     releaseBufferIfSubstituted();
 457                 }
 458                 end();
 459             }
 460 
 461             // invoke completion handler
 462             Invoker.invoke(result);
 463         }
 464 
 465         /**
 466          * Executed when the I/O has completed
 467          */
 468         @Override
 469         public void completed(int bytesTransferred, boolean canInvokeDirect) {
 470             updatePosition(bytesTransferred);
 471 
 472             // return direct buffer to cache if substituted
 473             releaseBufferIfSubstituted();
 474 
 475             // release waiters and invoke completion handler
 476             result.setResult(bytesTransferred);
 477             if (canInvokeDirect) {
 478                 Invoker.invokeUnchecked(result);
 479             } else {
 480                 Invoker.invoke(result);
 481             }
 482         }
 483 
 484         @Override
 485         public void failed(int error, IOException x) {
 486             // if EOF detected asynchronously then it is reported as error
 487             if (error == ERROR_HANDLE_EOF) {
 488                 completed(-1, false);
 489             } else {
 490                 // return direct buffer to cache if substituted
 491                 releaseBufferIfSubstituted();
 492 
 493                 // release waiters
 494                 if (isOpen()) {
 495                     result.setFailure(x);
 496                 } else {
 497                     result.setFailure(new AsynchronousCloseException());
 498                 }
 499                 Invoker.invoke(result);
 500             }
 501         }
 502     }
 503 
 504     @Override
 505     <A> Future<Integer> implRead(ByteBuffer dst,
 506                                  long position,
 507                                  A attachment,
 508                                  CompletionHandler<Integer,? super A> handler)
 509     {
 510         if (!reading)
 511             throw new NonReadableChannelException();
 512         if (position < 0)
 513             throw new IllegalArgumentException("Negative position");
 514         if (dst.isReadOnly())
 515             throw new IllegalArgumentException("Read-only buffer");
 516 
 517         // check if channel is closed
 518         if (!isOpen()) {
 519             Throwable exc = new ClosedChannelException();
 520             if (handler == null)
 521                 return CompletedFuture.withFailure(exc);
 522             Invoker.invoke(this, handler, attachment, null, exc);
 523             return null;
 524         }
 525 
 526         int pos = dst.position();
 527         int lim = dst.limit();
 528         assert (pos <= lim);
 529         int rem = (pos <= lim ? lim - pos : 0);
 530 
 531         // no space remaining
 532         if (rem == 0) {
 533             if (handler == null)
 534                 return CompletedFuture.withResult(0);
 535             Invoker.invoke(this, handler, attachment, 0, null);
 536             return null;
 537         }
 538 
 539         // create Future and task that initiates read
 540         PendingFuture<Integer,A> result =
 541             new PendingFuture<Integer,A>(this, handler, attachment);
 542         ReadTask<A> readTask = new ReadTask<A>(dst, pos, rem, position, result);
 543         result.setContext(readTask);
 544 
 545         // initiate I/O
 546         readTask.run();
 547         return result;
 548     }
 549 
 550     /**
 551      * Task that initiates write operation and handles completion result.
 552      */
 553     private class WriteTask<A> implements Runnable, Iocp.ResultHandler {
 554         private final ByteBuffer src;
 555         private final int pos, rem;     // buffer position/remaining
 556         private final long position;    // file position
 557         private final PendingFuture<Integer,A> result;
 558 
 559         // set to src if direct; otherwise set to substituted direct buffer
 560         private volatile ByteBuffer buf;
 561 
 562         WriteTask(ByteBuffer src,
 563                   int pos,
 564                   int rem,
 565                   long position,
 566                   PendingFuture<Integer,A> result)
 567         {
 568             this.src = src;
 569             this.pos = pos;
 570             this.rem = rem;
 571             this.position = position;
 572             this.result = result;
 573         }
 574 
 575         void releaseBufferIfSubstituted() {
 576             if (buf != src)
 577                 Util.releaseTemporaryDirectBuffer(buf);
 578         }
 579 
 580         void updatePosition(int bytesTransferred) {
 581             // if the I/O succeeded then adjust buffer position
 582             if (bytesTransferred > 0) {
 583                 try {
 584                     src.position(pos + bytesTransferred);
 585                 } catch (IllegalArgumentException x) {
 586                     // someone has changed the position
 587                 }
 588             }
 589         }
 590 
 591         @Override
 592         public void run() {
 593             int n = -1;
 594             long overlapped = 0L;
 595             long address;
 596 
 597             // Substitute a native buffer if not direct
 598             if (src instanceof DirectBuffer) {
 599                 buf = src;
 600                 address = ((DirectBuffer)src).address() + pos;
 601             } else {
 602                 buf = Util.getTemporaryDirectBuffer(rem);
 603                 buf.put(src);
 604                 buf.flip();
 605                 // temporarily restore position as we don't know how many bytes
 606                 // will be written
 607                 src.position(pos);
 608                 address = ((DirectBuffer)buf).address();
 609             }
 610 
 611             try {
 612                 begin();
 613 
 614                 // allocate an OVERLAPPED structure
 615                 overlapped = ioCache.add(result);
 616 
 617                 // initiate the write
 618                 n = writeFile(handle, address, rem, position, overlapped);
 619                 if (n == IOStatus.UNAVAILABLE) {
 620                     // I/O is pending
 621                     return;
 622                 } else {
 623                     throw new InternalError("Unexpected result: " + n);
 624                 }
 625 
 626             } catch (Throwable x) {
 627                 // failed to initiate read:
 628                 result.setFailure(toIOException(x));
 629 
 630                 // release resources
 631                 if (overlapped != 0L)
 632                     ioCache.remove(overlapped);
 633                 releaseBufferIfSubstituted();
 634 
 635             } finally {
 636                 end();
 637             }
 638 
 639             // invoke completion handler
 640             Invoker.invoke(result);
 641         }
 642 
 643         /**
 644          * Executed when the I/O has completed
 645          */
 646         @Override
 647         public void completed(int bytesTransferred, boolean canInvokeDirect) {
 648             updatePosition(bytesTransferred);
 649 
 650             // return direct buffer to cache if substituted
 651             releaseBufferIfSubstituted();
 652 
 653             // release waiters and invoke completion handler
 654             result.setResult(bytesTransferred);
 655             if (canInvokeDirect) {
 656                 Invoker.invokeUnchecked(result);
 657             } else {
 658                 Invoker.invoke(result);
 659             }
 660         }
 661 
 662         @Override
 663         public void failed(int error, IOException x) {
 664             // return direct buffer to cache if substituted
 665             releaseBufferIfSubstituted();
 666 
 667             // release waiters and invoker completion handler
 668             if (isOpen()) {
 669                 result.setFailure(x);
 670             } else {
 671                 result.setFailure(new AsynchronousCloseException());
 672             }
 673             Invoker.invoke(result);
 674         }
 675     }
 676 
 677     <A> Future<Integer> implWrite(ByteBuffer src,
 678                                   long position,
 679                                   A attachment,
 680                                   CompletionHandler<Integer,? super A> handler)
 681     {
 682         if (!writing)
 683             throw new NonWritableChannelException();
 684         if (position < 0)
 685             throw new IllegalArgumentException("Negative position");
 686 
 687         // check if channel is closed
 688         if (!isOpen()) {
 689            Throwable exc = new ClosedChannelException();
 690             if (handler == null)
 691                 return CompletedFuture.withFailure(exc);
 692             Invoker.invoke(this, handler, attachment, null, exc);
 693             return null;
 694         }
 695 
 696         int pos = src.position();
 697         int lim = src.limit();
 698         assert (pos <= lim);
 699         int rem = (pos <= lim ? lim - pos : 0);
 700 
 701         // nothing to write
 702         if (rem == 0) {
 703             if (handler == null)
 704                 return CompletedFuture.withResult(0);
 705             Invoker.invoke(this, handler, attachment, 0, null);
 706             return null;
 707         }
 708 
 709         // create Future and task to initiate write
 710         PendingFuture<Integer,A> result =
 711             new PendingFuture<Integer,A>(this, handler, attachment);
 712         WriteTask<A> writeTask = new WriteTask<A>(src, pos, rem, position, result);
 713         result.setContext(writeTask);
 714 
 715         // initiate I/O
 716         writeTask.run();
 717         return result;
 718     }
 719 
 720     // -- Native methods --
 721 
 722     private static native int readFile(long handle, long address, int len,
 723         long offset, long overlapped) throws IOException;
 724 
 725     private static native int writeFile(long handle, long address, int len,
 726         long offset, long overlapped) throws IOException;
 727 
 728     private static native int lockFile(long handle, long position, long size,
 729         boolean shared, long overlapped) throws IOException;
 730 
 731     private static native void close0(long handle);
 732 
 733     static {
 734         IOUtil.load();
 735     }
 736 }