1 /* 2 * Copyright (c) 2008, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package sun.nio.ch; 27 28 import java.nio.channels.*; 29 import java.util.concurrent.*; 30 import java.nio.ByteBuffer; 31 import java.nio.BufferOverflowException; 32 import java.io.IOException; 33 import java.io.FileDescriptor; 34 import sun.misc.SharedSecrets; 35 import sun.misc.JavaIOFileDescriptorAccess; 36 37 /** 38 * Windows implementation of AsynchronousFileChannel using overlapped I/O. 39 */ 40 41 public class WindowsAsynchronousFileChannelImpl 42 extends AsynchronousFileChannelImpl 43 implements Iocp.OverlappedChannel, Groupable 44 { 45 private static final JavaIOFileDescriptorAccess fdAccess = 46 SharedSecrets.getJavaIOFileDescriptorAccess(); 47 48 // error when EOF is detected asynchronously. 49 private static final int ERROR_HANDLE_EOF = 38; 50 51 // Lazy initialization of default I/O completion port 52 private static class DefaultIocpHolder { 53 static final Iocp defaultIocp = defaultIocp(); 54 private static Iocp defaultIocp() { 55 try { 56 return new Iocp(null, ThreadPool.createDefault()).start(); 57 } catch (IOException ioe) { 58 throw new InternalError(ioe); 59 } 60 } 61 } 62 63 // Used for force/truncate/size methods 64 private static final FileDispatcher nd = new FileDispatcherImpl(); 65 66 // The handle is extracted for use in native methods invoked from this class. 67 private final long handle; 68 69 // The key that identifies the channel's association with the I/O port 70 private final int completionKey; 71 72 // I/O completion port (group) 73 private final Iocp iocp; 74 75 private final boolean isDefaultIocp; 76 77 // Caches OVERLAPPED structure for each outstanding I/O operation 78 private final PendingIoCache ioCache; 79 80 81 private WindowsAsynchronousFileChannelImpl(FileDescriptor fdObj, 82 boolean reading, 83 boolean writing, 84 Iocp iocp, 85 boolean isDefaultIocp) 86 throws IOException 87 { 88 super(fdObj, reading, writing, iocp.executor()); 89 this.handle = fdAccess.getHandle(fdObj); 90 this.iocp = iocp; 91 this.isDefaultIocp = isDefaultIocp; 92 this.ioCache = new PendingIoCache(); 93 this.completionKey = iocp.associate(this, handle); 94 } 95 96 public static AsynchronousFileChannel open(FileDescriptor fdo, 97 boolean reading, 98 boolean writing, 99 ThreadPool pool) 100 throws IOException 101 { 102 Iocp iocp; 103 boolean isDefaultIocp; 104 if (pool == null) { 105 iocp = DefaultIocpHolder.defaultIocp; 106 isDefaultIocp = true; 107 } else { 108 iocp = new Iocp(null, pool).start(); 109 isDefaultIocp = false; 110 } 111 try { 112 return new 113 WindowsAsynchronousFileChannelImpl(fdo, reading, writing, iocp, isDefaultIocp); 114 } catch (IOException x) { 115 // error binding to port so need to close it (if created for this channel) 116 if (!isDefaultIocp) 117 iocp.implClose(); 118 throw x; 119 } 120 } 121 122 @Override 123 public <V,A> PendingFuture<V,A> getByOverlapped(long overlapped) { 124 return ioCache.remove(overlapped); 125 } 126 127 @Override 128 public void close() throws IOException { 129 closeLock.writeLock().lock(); 130 try { 131 if (closed) 132 return; // already closed 133 closed = true; 134 } finally { 135 closeLock.writeLock().unlock(); 136 } 137 138 // invalidate all locks held for this channel 139 invalidateAllLocks(); 140 141 // close the file 142 close0(handle); 143 144 // waits until all I/O operations have completed 145 ioCache.close(); 146 147 // disassociate from port 148 iocp.disassociate(completionKey); 149 150 // for the non-default group close the port 151 if (!isDefaultIocp) 152 iocp.detachFromThreadPool(); 153 } 154 155 @Override 156 public AsynchronousChannelGroupImpl group() { 157 return iocp; 158 } 159 160 /** 161 * Translates Throwable to IOException 162 */ 163 private static IOException toIOException(Throwable x) { 164 if (x instanceof IOException) { 165 if (x instanceof ClosedChannelException) 166 x = new AsynchronousCloseException(); 167 return (IOException)x; 168 } 169 return new IOException(x); 170 } 171 172 @Override 173 public long size() throws IOException { 174 try { 175 begin(); 176 return nd.size(fdObj); 177 } finally { 178 end(); 179 } 180 } 181 182 @Override 183 public AsynchronousFileChannel truncate(long size) throws IOException { 184 if (size < 0) 185 throw new IllegalArgumentException("Negative size"); 186 if (!writing) 187 throw new NonWritableChannelException(); 188 try { 189 begin(); 190 if (size > nd.size(fdObj)) 191 return this; 192 nd.truncate(fdObj, size); 193 } finally { 194 end(); 195 } 196 return this; 197 } 198 199 @Override 200 public void force(boolean metaData) throws IOException { 201 try { 202 begin(); 203 nd.force(fdObj, metaData); 204 } finally { 205 end(); 206 } 207 } 208 209 // -- file locking -- 210 211 /** 212 * Task that initiates locking operation and handles completion result. 213 */ 214 private class LockTask<A> implements Runnable, Iocp.ResultHandler { 215 private final long position; 216 private final FileLockImpl fli; 217 private final PendingFuture<FileLock,A> result; 218 219 LockTask(long position, 220 FileLockImpl fli, 221 PendingFuture<FileLock,A> result) 222 { 223 this.position = position; 224 this.fli = fli; 225 this.result = result; 226 } 227 228 @Override 229 public void run() { 230 long overlapped = 0L; 231 boolean pending = false; 232 try { 233 begin(); 234 235 // allocate OVERLAPPED structure 236 overlapped = ioCache.add(result); 237 238 // synchronize on result to avoid race with handler thread 239 // when lock is acquired immediately. 240 synchronized (result) { 241 int n = lockFile(handle, position, fli.size(), fli.isShared(), 242 overlapped); 243 if (n == IOStatus.UNAVAILABLE) { 244 // I/O is pending 245 pending = true; 246 return; 247 } 248 // acquired lock immediately 249 result.setResult(fli); 250 } 251 252 } catch (Throwable x) { 253 // lock failed or channel closed 254 removeFromFileLockTable(fli); 255 result.setFailure(toIOException(x)); 256 } finally { 257 if (!pending && overlapped != 0L) 258 ioCache.remove(overlapped); 259 end(); 260 } 261 262 // invoke completion handler 263 Invoker.invoke(result); 264 } 265 266 @Override 267 public void completed(int bytesTransferred, boolean canInvokeDirect) { 268 // release waiters and invoke completion handler 269 result.setResult(fli); 270 if (canInvokeDirect) { 271 Invoker.invokeUnchecked(result); 272 } else { 273 Invoker.invoke(result); 274 } 275 } 276 277 @Override 278 public void failed(int error, IOException x) { 279 // lock not acquired so remove from lock table 280 removeFromFileLockTable(fli); 281 282 // release waiters 283 if (isOpen()) { 284 result.setFailure(x); 285 } else { 286 result.setFailure(new AsynchronousCloseException()); 287 } 288 Invoker.invoke(result); 289 } 290 } 291 292 @Override 293 <A> Future<FileLock> implLock(final long position, 294 final long size, 295 final boolean shared, 296 A attachment, 297 final CompletionHandler<FileLock,? super A> handler) 298 { 299 if (shared && !reading) 300 throw new NonReadableChannelException(); 301 if (!shared && !writing) 302 throw new NonWritableChannelException(); 303 304 // add to lock table 305 FileLockImpl fli = addToFileLockTable(position, size, shared); 306 if (fli == null) { 307 Throwable exc = new ClosedChannelException(); 308 if (handler == null) 309 return CompletedFuture.withFailure(exc); 310 Invoker.invoke(this, handler, attachment, null, exc); 311 return null; 312 } 313 314 // create Future and task that will be invoked to acquire lock 315 PendingFuture<FileLock,A> result = 316 new PendingFuture<FileLock,A>(this, handler, attachment); 317 LockTask<A> lockTask = new LockTask<A>(position, fli, result); 318 result.setContext(lockTask); 319 320 // initiate I/O 321 if (Iocp.supportsThreadAgnosticIo()) { 322 lockTask.run(); 323 } else { 324 boolean executed = false; 325 try { 326 Invoker.invokeOnThreadInThreadPool(this, lockTask); 327 executed = true; 328 } finally { 329 if (!executed) { 330 // rollback 331 removeFromFileLockTable(fli); 332 } 333 } 334 } 335 return result; 336 } 337 338 static final int NO_LOCK = -1; // Failed to lock 339 static final int LOCKED = 0; // Obtained requested lock 340 341 @Override 342 public FileLock tryLock(long position, long size, boolean shared) 343 throws IOException 344 { 345 if (shared && !reading) 346 throw new NonReadableChannelException(); 347 if (!shared && !writing) 348 throw new NonWritableChannelException(); 349 350 // add to lock table 351 final FileLockImpl fli = addToFileLockTable(position, size, shared); 352 if (fli == null) 353 throw new ClosedChannelException(); 354 355 boolean gotLock = false; 356 try { 357 begin(); 358 // try to acquire the lock 359 int res = nd.lock(fdObj, false, position, size, shared); 360 if (res == NO_LOCK) 361 return null; 362 gotLock = true; 363 return fli; 364 } finally { 365 if (!gotLock) 366 removeFromFileLockTable(fli); 367 end(); 368 } 369 } 370 371 @Override 372 protected void implRelease(FileLockImpl fli) throws IOException { 373 nd.release(fdObj, fli.position(), fli.size()); 374 } 375 376 /** 377 * Task that initiates read operation and handles completion result. 378 */ 379 private class ReadTask<A> implements Runnable, Iocp.ResultHandler { 380 private final ByteBuffer dst; 381 private final int pos, rem; // buffer position/remaining 382 private final long position; // file position 383 private final PendingFuture<Integer,A> result; 384 385 // set to dst if direct; otherwise set to substituted direct buffer 386 private volatile ByteBuffer buf; 387 388 ReadTask(ByteBuffer dst, 389 int pos, 390 int rem, 391 long position, 392 PendingFuture<Integer,A> result) 393 { 394 this.dst = dst; 395 this.pos = pos; 396 this.rem = rem; 397 this.position = position; 398 this.result = result; 399 } 400 401 void releaseBufferIfSubstituted() { 402 if (buf != dst) 403 Util.releaseTemporaryDirectBuffer(buf); 404 } 405 406 void updatePosition(int bytesTransferred) { 407 // if the I/O succeeded then adjust buffer position 408 if (bytesTransferred > 0) { 409 if (buf == dst) { 410 try { 411 dst.position(pos + bytesTransferred); 412 } catch (IllegalArgumentException x) { 413 // someone has changed the position; ignore 414 } 415 } else { 416 // had to substitute direct buffer 417 buf.position(bytesTransferred).flip(); 418 try { 419 dst.put(buf); 420 } catch (BufferOverflowException x) { 421 // someone has changed the position; ignore 422 } 423 } 424 } 425 } 426 427 @Override 428 public void run() { 429 int n = -1; 430 long overlapped = 0L; 431 long address; 432 433 // Substitute a native buffer if not direct 434 if (dst instanceof DirectBuffer) { 435 buf = dst; 436 address = ((DirectBuffer)dst).address() + pos; 437 } else { 438 buf = Util.getTemporaryDirectBuffer(rem); 439 address = ((DirectBuffer)buf).address(); 440 } 441 442 boolean pending = false; 443 try { 444 begin(); 445 446 // allocate OVERLAPPED 447 overlapped = ioCache.add(result); 448 449 // initiate read 450 n = readFile(handle, address, rem, position, overlapped); 451 if (n == IOStatus.UNAVAILABLE) { 452 // I/O is pending 453 pending = true; 454 return; 455 } else if (n == IOStatus.EOF) { 456 result.setResult(n); 457 } else { 458 throw new InternalError("Unexpected result: " + n); 459 } 460 461 } catch (Throwable x) { 462 // failed to initiate read 463 result.setFailure(toIOException(x)); 464 } finally { 465 if (!pending) { 466 // release resources 467 if (overlapped != 0L) 468 ioCache.remove(overlapped); 469 releaseBufferIfSubstituted(); 470 } 471 end(); 472 } 473 474 // invoke completion handler 475 Invoker.invoke(result); 476 } 477 478 /** 479 * Executed when the I/O has completed 480 */ 481 @Override 482 public void completed(int bytesTransferred, boolean canInvokeDirect) { 483 updatePosition(bytesTransferred); 484 485 // return direct buffer to cache if substituted 486 releaseBufferIfSubstituted(); 487 488 // release waiters and invoke completion handler 489 result.setResult(bytesTransferred); 490 if (canInvokeDirect) { 491 Invoker.invokeUnchecked(result); 492 } else { 493 Invoker.invoke(result); 494 } 495 } 496 497 @Override 498 public void failed(int error, IOException x) { 499 // if EOF detected asynchronously then it is reported as error 500 if (error == ERROR_HANDLE_EOF) { 501 completed(-1, false); 502 } else { 503 // return direct buffer to cache if substituted 504 releaseBufferIfSubstituted(); 505 506 // release waiters 507 if (isOpen()) { 508 result.setFailure(x); 509 } else { 510 result.setFailure(new AsynchronousCloseException()); 511 } 512 Invoker.invoke(result); 513 } 514 } 515 } 516 517 @Override 518 <A> Future<Integer> implRead(ByteBuffer dst, 519 long position, 520 A attachment, 521 CompletionHandler<Integer,? super A> handler) 522 { 523 if (!reading) 524 throw new NonReadableChannelException(); 525 if (position < 0) 526 throw new IllegalArgumentException("Negative position"); 527 if (dst.isReadOnly()) 528 throw new IllegalArgumentException("Read-only buffer"); 529 530 // check if channel is closed 531 if (!isOpen()) { 532 Throwable exc = new ClosedChannelException(); 533 if (handler == null) 534 return CompletedFuture.withFailure(exc); 535 Invoker.invoke(this, handler, attachment, null, exc); 536 return null; 537 } 538 539 int pos = dst.position(); 540 int lim = dst.limit(); 541 assert (pos <= lim); 542 int rem = (pos <= lim ? lim - pos : 0); 543 544 // no space remaining 545 if (rem == 0) { 546 if (handler == null) 547 return CompletedFuture.withResult(0); 548 Invoker.invoke(this, handler, attachment, 0, null); 549 return null; 550 } 551 552 // create Future and task that initiates read 553 PendingFuture<Integer,A> result = 554 new PendingFuture<Integer,A>(this, handler, attachment); 555 ReadTask<A> readTask = new ReadTask<A>(dst, pos, rem, position, result); 556 result.setContext(readTask); 557 558 // initiate I/O 559 if (Iocp.supportsThreadAgnosticIo()) { 560 readTask.run(); 561 } else { 562 Invoker.invokeOnThreadInThreadPool(this, readTask); 563 } 564 return result; 565 } 566 567 /** 568 * Task that initiates write operation and handles completion result. 569 */ 570 private class WriteTask<A> implements Runnable, Iocp.ResultHandler { 571 private final ByteBuffer src; 572 private final int pos, rem; // buffer position/remaining 573 private final long position; // file position 574 private final PendingFuture<Integer,A> result; 575 576 // set to src if direct; otherwise set to substituted direct buffer 577 private volatile ByteBuffer buf; 578 579 WriteTask(ByteBuffer src, 580 int pos, 581 int rem, 582 long position, 583 PendingFuture<Integer,A> result) 584 { 585 this.src = src; 586 this.pos = pos; 587 this.rem = rem; 588 this.position = position; 589 this.result = result; 590 } 591 592 void releaseBufferIfSubstituted() { 593 if (buf != src) 594 Util.releaseTemporaryDirectBuffer(buf); 595 } 596 597 void updatePosition(int bytesTransferred) { 598 // if the I/O succeeded then adjust buffer position 599 if (bytesTransferred > 0) { 600 try { 601 src.position(pos + bytesTransferred); 602 } catch (IllegalArgumentException x) { 603 // someone has changed the position 604 } 605 } 606 } 607 608 @Override 609 public void run() { 610 int n = -1; 611 long overlapped = 0L; 612 long address; 613 614 // Substitute a native buffer if not direct 615 if (src instanceof DirectBuffer) { 616 buf = src; 617 address = ((DirectBuffer)src).address() + pos; 618 } else { 619 buf = Util.getTemporaryDirectBuffer(rem); 620 buf.put(src); 621 buf.flip(); 622 // temporarily restore position as we don't know how many bytes 623 // will be written 624 src.position(pos); 625 address = ((DirectBuffer)buf).address(); 626 } 627 628 try { 629 begin(); 630 631 // allocate an OVERLAPPED structure 632 overlapped = ioCache.add(result); 633 634 // initiate the write 635 n = writeFile(handle, address, rem, position, overlapped); 636 if (n == IOStatus.UNAVAILABLE) { 637 // I/O is pending 638 return; 639 } else { 640 throw new InternalError("Unexpected result: " + n); 641 } 642 643 } catch (Throwable x) { 644 // failed to initiate read: 645 result.setFailure(toIOException(x)); 646 647 // release resources 648 if (overlapped != 0L) 649 ioCache.remove(overlapped); 650 releaseBufferIfSubstituted(); 651 652 } finally { 653 end(); 654 } 655 656 // invoke completion handler 657 Invoker.invoke(result); 658 } 659 660 /** 661 * Executed when the I/O has completed 662 */ 663 @Override 664 public void completed(int bytesTransferred, boolean canInvokeDirect) { 665 updatePosition(bytesTransferred); 666 667 // return direct buffer to cache if substituted 668 releaseBufferIfSubstituted(); 669 670 // release waiters and invoke completion handler 671 result.setResult(bytesTransferred); 672 if (canInvokeDirect) { 673 Invoker.invokeUnchecked(result); 674 } else { 675 Invoker.invoke(result); 676 } 677 } 678 679 @Override 680 public void failed(int error, IOException x) { 681 // return direct buffer to cache if substituted 682 releaseBufferIfSubstituted(); 683 684 // release waiters and invoker completion handler 685 if (isOpen()) { 686 result.setFailure(x); 687 } else { 688 result.setFailure(new AsynchronousCloseException()); 689 } 690 Invoker.invoke(result); 691 } 692 } 693 694 <A> Future<Integer> implWrite(ByteBuffer src, 695 long position, 696 A attachment, 697 CompletionHandler<Integer,? super A> handler) 698 { 699 if (!writing) 700 throw new NonWritableChannelException(); 701 if (position < 0) 702 throw new IllegalArgumentException("Negative position"); 703 704 // check if channel is closed 705 if (!isOpen()) { 706 Throwable exc = new ClosedChannelException(); 707 if (handler == null) 708 return CompletedFuture.withFailure(exc); 709 Invoker.invoke(this, handler, attachment, null, exc); 710 return null; 711 } 712 713 int pos = src.position(); 714 int lim = src.limit(); 715 assert (pos <= lim); 716 int rem = (pos <= lim ? lim - pos : 0); 717 718 // nothing to write 719 if (rem == 0) { 720 if (handler == null) 721 return CompletedFuture.withResult(0); 722 Invoker.invoke(this, handler, attachment, 0, null); 723 return null; 724 } 725 726 // create Future and task to initiate write 727 PendingFuture<Integer,A> result = 728 new PendingFuture<Integer,A>(this, handler, attachment); 729 WriteTask<A> writeTask = new WriteTask<A>(src, pos, rem, position, result); 730 result.setContext(writeTask); 731 732 // initiate I/O 733 if (Iocp.supportsThreadAgnosticIo()) { 734 writeTask.run(); 735 } else { 736 Invoker.invokeOnThreadInThreadPool(this, writeTask); 737 } 738 return result; 739 } 740 741 // -- Native methods -- 742 743 private static native int readFile(long handle, long address, int len, 744 long offset, long overlapped) throws IOException; 745 746 private static native int writeFile(long handle, long address, int len, 747 long offset, long overlapped) throws IOException; 748 749 private static native int lockFile(long handle, long position, long size, 750 boolean shared, long overlapped) throws IOException; 751 752 private static native void close0(long handle); 753 754 static { 755 IOUtil.load(); 756 } 757 }