1 /* 2 * Copyright (c) 2008, 2019, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package sun.nio.ch; 27 28 import java.nio.channels.*; 29 import java.util.concurrent.*; 30 import java.nio.ByteBuffer; 31 import java.nio.BufferOverflowException; 32 import java.io.IOException; 33 import java.io.FileDescriptor; 34 import jdk.internal.misc.SharedSecrets; 35 import jdk.internal.misc.JavaIOFileDescriptorAccess; 36 37 /** 38 * Windows implementation of AsynchronousFileChannel using overlapped I/O. 39 */ 40 41 public class WindowsAsynchronousFileChannelImpl 42 extends AsynchronousFileChannelImpl 43 implements Iocp.OverlappedChannel, Groupable 44 { 45 private static final JavaIOFileDescriptorAccess fdAccess = 46 SharedSecrets.getJavaIOFileDescriptorAccess(); 47 48 // error when EOF is detected asynchronously. 49 private static final int ERROR_HANDLE_EOF = 38; 50 51 // Lazy initialization of default I/O completion port 52 private static class DefaultIocpHolder { 53 static final Iocp defaultIocp = defaultIocp(); 54 private static Iocp defaultIocp() { 55 try { 56 return new Iocp(null, ThreadPool.createDefault()).start(); 57 } catch (IOException ioe) { 58 throw new InternalError(ioe); 59 } 60 } 61 } 62 63 // Used for force/truncate/size methods 64 private static final FileDispatcher nd = new FileDispatcherImpl(); 65 66 // The handle is extracted for use in native methods invoked from this class. 67 private final long handle; 68 69 // The key that identifies the channel's association with the I/O port 70 private final int completionKey; 71 72 // I/O completion port (group) 73 private final Iocp iocp; 74 75 private final boolean isDefaultIocp; 76 77 // Caches OVERLAPPED structure for each outstanding I/O operation 78 private final PendingIoCache ioCache; 79 80 81 private WindowsAsynchronousFileChannelImpl(FileDescriptor fdObj, 82 boolean reading, 83 boolean writing, 84 Iocp iocp, 85 boolean isDefaultIocp) 86 throws IOException 87 { 88 super(fdObj, reading, writing, iocp.executor()); 89 this.handle = fdAccess.getHandle(fdObj); 90 this.iocp = iocp; 91 this.isDefaultIocp = isDefaultIocp; 92 this.ioCache = new PendingIoCache(); 93 this.completionKey = iocp.associate(this, handle); 94 } 95 96 public static AsynchronousFileChannel open(FileDescriptor fdo, 97 boolean reading, 98 boolean writing, 99 ThreadPool pool) 100 throws IOException 101 { 102 Iocp iocp; 103 boolean isDefaultIocp; 104 if (pool == null) { 105 iocp = DefaultIocpHolder.defaultIocp; 106 isDefaultIocp = true; 107 } else { 108 iocp = new Iocp(null, pool).start(); 109 isDefaultIocp = false; 110 } 111 try { 112 return new 113 WindowsAsynchronousFileChannelImpl(fdo, reading, writing, iocp, isDefaultIocp); 114 } catch (IOException x) { 115 // error binding to port so need to close it (if created for this channel) 116 if (!isDefaultIocp) 117 iocp.implClose(); 118 throw x; 119 } 120 } 121 122 @Override 123 public <V,A> PendingFuture<V,A> getByOverlapped(long overlapped) { 124 return ioCache.remove(overlapped); 125 } 126 127 @Override 128 public void close() throws IOException { 129 closeLock.writeLock().lock(); 130 try { 131 if (closed) 132 return; // already closed 133 closed = true; 134 } finally { 135 closeLock.writeLock().unlock(); 136 } 137 138 // invalidate all locks held for this channel 139 invalidateAllLocks(); 140 141 // close the file 142 nd.close(fdObj); 143 144 // waits until all I/O operations have completed 145 ioCache.close(); 146 147 // disassociate from port 148 iocp.disassociate(completionKey); 149 150 // for the non-default group close the port 151 if (!isDefaultIocp) 152 iocp.detachFromThreadPool(); 153 } 154 155 @Override 156 public AsynchronousChannelGroupImpl group() { 157 return iocp; 158 } 159 160 /** 161 * Translates Throwable to IOException 162 */ 163 private static IOException toIOException(Throwable x) { 164 if (x instanceof IOException) { 165 if (x instanceof ClosedChannelException) 166 x = new AsynchronousCloseException(); 167 return (IOException)x; 168 } 169 return new IOException(x); 170 } 171 172 @Override 173 public long size() throws IOException { 174 try { 175 begin(); 176 return nd.size(fdObj); 177 } finally { 178 end(); 179 } 180 } 181 182 @Override 183 public AsynchronousFileChannel truncate(long size) throws IOException { 184 if (size < 0) 185 throw new IllegalArgumentException("Negative size"); 186 if (!writing) 187 throw new NonWritableChannelException(); 188 try { 189 begin(); 190 if (size > nd.size(fdObj)) 191 return this; 192 nd.truncate(fdObj, size); 193 } finally { 194 end(); 195 } 196 return this; 197 } 198 199 @Override 200 public void force(boolean metaData) throws IOException { 201 try { 202 begin(); 203 nd.force(fdObj, metaData); 204 } finally { 205 end(); 206 } 207 } 208 209 // -- file locking -- 210 211 /** 212 * Task that initiates locking operation and handles completion result. 213 */ 214 private class LockTask<A> implements Runnable, Iocp.ResultHandler { 215 private final long position; 216 private final FileLockImpl fli; 217 private final PendingFuture<FileLock,A> result; 218 219 LockTask(long position, 220 FileLockImpl fli, 221 PendingFuture<FileLock,A> result) 222 { 223 this.position = position; 224 this.fli = fli; 225 this.result = result; 226 } 227 228 @Override 229 public void run() { 230 long overlapped = 0L; 231 try { 232 begin(); 233 234 // allocate OVERLAPPED structure 235 overlapped = ioCache.add(result); 236 237 // synchronize on result to avoid race with handler thread 238 // when lock is acquired immediately. 239 synchronized (result) { 240 int n = lockFile(handle, position, fli.size(), fli.isShared(), 241 overlapped); 242 if (n == IOStatus.UNAVAILABLE) { 243 // I/O is pending 244 return; 245 } 246 // acquired lock immediately 247 result.setResult(fli); 248 } 249 250 } catch (Throwable x) { 251 // lock failed or channel closed 252 removeFromFileLockTable(fli); 253 result.setFailure(toIOException(x)); 254 if (overlapped != 0L) 255 ioCache.remove(overlapped); 256 } finally { 257 end(); 258 } 259 260 // invoke completion handler 261 Invoker.invoke(result); 262 } 263 264 @Override 265 public void completed(int bytesTransferred, boolean canInvokeDirect) { 266 // release waiters and invoke completion handler 267 result.setResult(fli); 268 if (canInvokeDirect) { 269 Invoker.invokeUnchecked(result); 270 } else { 271 Invoker.invoke(result); 272 } 273 } 274 275 @Override 276 public void failed(int error, IOException x) { 277 // lock not acquired so remove from lock table 278 removeFromFileLockTable(fli); 279 280 // release waiters 281 if (isOpen()) { 282 result.setFailure(x); 283 } else { 284 result.setFailure(new AsynchronousCloseException()); 285 } 286 Invoker.invoke(result); 287 } 288 } 289 290 @Override 291 <A> Future<FileLock> implLock(final long position, 292 final long size, 293 final boolean shared, 294 A attachment, 295 final CompletionHandler<FileLock,? super A> handler) 296 { 297 if (shared && !reading) 298 throw new NonReadableChannelException(); 299 if (!shared && !writing) 300 throw new NonWritableChannelException(); 301 302 // add to lock table 303 FileLockImpl fli = addToFileLockTable(position, size, shared); 304 if (fli == null) { 305 Throwable exc = new ClosedChannelException(); 306 if (handler == null) 307 return CompletedFuture.withFailure(exc); 308 Invoker.invoke(this, handler, attachment, null, exc); 309 return null; 310 } 311 312 // create Future and task that will be invoked to acquire lock 313 PendingFuture<FileLock,A> result = 314 new PendingFuture<FileLock,A>(this, handler, attachment); 315 LockTask<A> lockTask = new LockTask<A>(position, fli, result); 316 result.setContext(lockTask); 317 318 // initiate I/O 319 lockTask.run(); 320 return result; 321 } 322 323 static final int NO_LOCK = -1; // Failed to lock 324 static final int LOCKED = 0; // Obtained requested lock 325 326 @Override 327 public FileLock tryLock(long position, long size, boolean shared) 328 throws IOException 329 { 330 if (shared && !reading) 331 throw new NonReadableChannelException(); 332 if (!shared && !writing) 333 throw new NonWritableChannelException(); 334 335 // add to lock table 336 final FileLockImpl fli = addToFileLockTable(position, size, shared); 337 if (fli == null) 338 throw new ClosedChannelException(); 339 340 boolean gotLock = false; 341 try { 342 begin(); 343 // try to acquire the lock 344 int res = nd.lock(fdObj, false, position, size, shared); 345 if (res == NO_LOCK) 346 return null; 347 gotLock = true; 348 return fli; 349 } finally { 350 if (!gotLock) 351 removeFromFileLockTable(fli); 352 end(); 353 } 354 } 355 356 @Override 357 protected void implRelease(FileLockImpl fli) throws IOException { 358 nd.release(fdObj, fli.position(), fli.size()); 359 } 360 361 /** 362 * Task that initiates read operation and handles completion result. 363 */ 364 private class ReadTask<A> implements Runnable, Iocp.ResultHandler { 365 private final ByteBuffer dst; 366 private final int pos, rem; // buffer position/remaining 367 private final long position; // file position 368 private final PendingFuture<Integer,A> result; 369 370 // set to dst if direct; otherwise set to substituted direct buffer 371 private volatile ByteBuffer buf; 372 373 ReadTask(ByteBuffer dst, 374 int pos, 375 int rem, 376 long position, 377 PendingFuture<Integer,A> result) 378 { 379 this.dst = dst; 380 this.pos = pos; 381 this.rem = rem; 382 this.position = position; 383 this.result = result; 384 } 385 386 void releaseBufferIfSubstituted() { 387 if (buf != dst) 388 Util.releaseTemporaryDirectBuffer(buf); 389 } 390 391 void updatePosition(int bytesTransferred) { 392 // if the I/O succeeded then adjust buffer position 393 if (bytesTransferred > 0) { 394 if (buf == dst) { 395 try { 396 dst.position(pos + bytesTransferred); 397 } catch (IllegalArgumentException x) { 398 // someone has changed the position; ignore 399 } 400 } else { 401 // had to substitute direct buffer 402 buf.position(bytesTransferred).flip(); 403 try { 404 dst.put(buf); 405 } catch (BufferOverflowException x) { 406 // someone has changed the position; ignore 407 } 408 } 409 } 410 } 411 412 @Override 413 public void run() { 414 int n = -1; 415 long overlapped = 0L; 416 long address; 417 418 // Substitute a native buffer if not direct 419 if (dst instanceof DirectBuffer) { 420 buf = dst; 421 address = ((DirectBuffer)dst).address() + pos; 422 } else { 423 buf = Util.getTemporaryDirectBuffer(rem); 424 address = ((DirectBuffer)buf).address(); 425 } 426 427 boolean pending = false; 428 try { 429 begin(); 430 431 // allocate OVERLAPPED 432 overlapped = ioCache.add(result); 433 434 // initiate read 435 n = readFile(handle, address, rem, position, overlapped); 436 if (n == IOStatus.UNAVAILABLE) { 437 // I/O is pending 438 pending = true; 439 return; 440 } else if (n == IOStatus.EOF) { 441 result.setResult(n); 442 } else { 443 throw new InternalError("Unexpected result: " + n); 444 } 445 446 } catch (Throwable x) { 447 // failed to initiate read 448 result.setFailure(toIOException(x)); 449 if (overlapped != 0L) 450 ioCache.remove(overlapped); 451 } finally { 452 if (!pending) 453 // release resources 454 releaseBufferIfSubstituted(); 455 end(); 456 } 457 458 // invoke completion handler 459 Invoker.invoke(result); 460 } 461 462 /** 463 * Executed when the I/O has completed 464 */ 465 @Override 466 public void completed(int bytesTransferred, boolean canInvokeDirect) { 467 updatePosition(bytesTransferred); 468 469 // return direct buffer to cache if substituted 470 releaseBufferIfSubstituted(); 471 472 // release waiters and invoke completion handler 473 result.setResult(bytesTransferred); 474 if (canInvokeDirect) { 475 Invoker.invokeUnchecked(result); 476 } else { 477 Invoker.invoke(result); 478 } 479 } 480 481 @Override 482 public void failed(int error, IOException x) { 483 // if EOF detected asynchronously then it is reported as error 484 if (error == ERROR_HANDLE_EOF) { 485 completed(-1, false); 486 } else { 487 // return direct buffer to cache if substituted 488 releaseBufferIfSubstituted(); 489 490 // release waiters 491 if (isOpen()) { 492 result.setFailure(x); 493 } else { 494 result.setFailure(new AsynchronousCloseException()); 495 } 496 Invoker.invoke(result); 497 } 498 } 499 } 500 501 @Override 502 <A> Future<Integer> implRead(ByteBuffer dst, 503 long position, 504 A attachment, 505 CompletionHandler<Integer,? super A> handler) 506 { 507 if (!reading) 508 throw new NonReadableChannelException(); 509 if (position < 0) 510 throw new IllegalArgumentException("Negative position"); 511 if (dst.isReadOnly()) 512 throw new IllegalArgumentException("Read-only buffer"); 513 514 // check if channel is closed 515 if (!isOpen()) { 516 Throwable exc = new ClosedChannelException(); 517 if (handler == null) 518 return CompletedFuture.withFailure(exc); 519 Invoker.invoke(this, handler, attachment, null, exc); 520 return null; 521 } 522 523 int pos = dst.position(); 524 int lim = dst.limit(); 525 assert (pos <= lim); 526 int rem = (pos <= lim ? lim - pos : 0); 527 528 // no space remaining 529 if (rem == 0) { 530 if (handler == null) 531 return CompletedFuture.withResult(0); 532 Invoker.invoke(this, handler, attachment, 0, null); 533 return null; 534 } 535 536 // create Future and task that initiates read 537 PendingFuture<Integer,A> result = 538 new PendingFuture<Integer,A>(this, handler, attachment); 539 ReadTask<A> readTask = new ReadTask<A>(dst, pos, rem, position, result); 540 result.setContext(readTask); 541 542 // initiate I/O 543 readTask.run(); 544 return result; 545 } 546 547 /** 548 * Task that initiates write operation and handles completion result. 549 */ 550 private class WriteTask<A> implements Runnable, Iocp.ResultHandler { 551 private final ByteBuffer src; 552 private final int pos, rem; // buffer position/remaining 553 private final long position; // file position 554 private final PendingFuture<Integer,A> result; 555 556 // set to src if direct; otherwise set to substituted direct buffer 557 private volatile ByteBuffer buf; 558 559 WriteTask(ByteBuffer src, 560 int pos, 561 int rem, 562 long position, 563 PendingFuture<Integer,A> result) 564 { 565 this.src = src; 566 this.pos = pos; 567 this.rem = rem; 568 this.position = position; 569 this.result = result; 570 } 571 572 void releaseBufferIfSubstituted() { 573 if (buf != src) 574 Util.releaseTemporaryDirectBuffer(buf); 575 } 576 577 void updatePosition(int bytesTransferred) { 578 // if the I/O succeeded then adjust buffer position 579 if (bytesTransferred > 0) { 580 try { 581 src.position(pos + bytesTransferred); 582 } catch (IllegalArgumentException x) { 583 // someone has changed the position 584 } 585 } 586 } 587 588 @Override 589 public void run() { 590 int n = -1; 591 long overlapped = 0L; 592 long address; 593 594 // Substitute a native buffer if not direct 595 if (src instanceof DirectBuffer) { 596 buf = src; 597 address = ((DirectBuffer)src).address() + pos; 598 } else { 599 buf = Util.getTemporaryDirectBuffer(rem); 600 buf.put(src); 601 buf.flip(); 602 // temporarily restore position as we don't know how many bytes 603 // will be written 604 src.position(pos); 605 address = ((DirectBuffer)buf).address(); 606 } 607 608 try { 609 begin(); 610 611 // allocate an OVERLAPPED structure 612 overlapped = ioCache.add(result); 613 614 // initiate the write 615 n = writeFile(handle, address, rem, position, overlapped); 616 if (n == IOStatus.UNAVAILABLE) { 617 // I/O is pending 618 return; 619 } else { 620 throw new InternalError("Unexpected result: " + n); 621 } 622 623 } catch (Throwable x) { 624 // failed to initiate read: 625 result.setFailure(toIOException(x)); 626 627 // release resources 628 releaseBufferIfSubstituted(); 629 if (overlapped != 0L) 630 ioCache.remove(overlapped); 631 632 } finally { 633 end(); 634 } 635 636 // invoke completion handler 637 Invoker.invoke(result); 638 } 639 640 /** 641 * Executed when the I/O has completed 642 */ 643 @Override 644 public void completed(int bytesTransferred, boolean canInvokeDirect) { 645 updatePosition(bytesTransferred); 646 647 // return direct buffer to cache if substituted 648 releaseBufferIfSubstituted(); 649 650 // release waiters and invoke completion handler 651 result.setResult(bytesTransferred); 652 if (canInvokeDirect) { 653 Invoker.invokeUnchecked(result); 654 } else { 655 Invoker.invoke(result); 656 } 657 } 658 659 @Override 660 public void failed(int error, IOException x) { 661 // return direct buffer to cache if substituted 662 releaseBufferIfSubstituted(); 663 664 // release waiters and invoker completion handler 665 if (isOpen()) { 666 result.setFailure(x); 667 } else { 668 result.setFailure(new AsynchronousCloseException()); 669 } 670 Invoker.invoke(result); 671 } 672 } 673 674 <A> Future<Integer> implWrite(ByteBuffer src, 675 long position, 676 A attachment, 677 CompletionHandler<Integer,? super A> handler) 678 { 679 if (!writing) 680 throw new NonWritableChannelException(); 681 if (position < 0) 682 throw new IllegalArgumentException("Negative position"); 683 684 // check if channel is closed 685 if (!isOpen()) { 686 Throwable exc = new ClosedChannelException(); 687 if (handler == null) 688 return CompletedFuture.withFailure(exc); 689 Invoker.invoke(this, handler, attachment, null, exc); 690 return null; 691 } 692 693 int pos = src.position(); 694 int lim = src.limit(); 695 assert (pos <= lim); 696 int rem = (pos <= lim ? lim - pos : 0); 697 698 // nothing to write 699 if (rem == 0) { 700 if (handler == null) 701 return CompletedFuture.withResult(0); 702 Invoker.invoke(this, handler, attachment, 0, null); 703 return null; 704 } 705 706 // create Future and task to initiate write 707 PendingFuture<Integer,A> result = 708 new PendingFuture<Integer,A>(this, handler, attachment); 709 WriteTask<A> writeTask = new WriteTask<A>(src, pos, rem, position, result); 710 result.setContext(writeTask); 711 712 // initiate I/O 713 writeTask.run(); 714 return result; 715 } 716 717 // -- Native methods -- 718 719 private static native int readFile(long handle, long address, int len, 720 long offset, long overlapped) throws IOException; 721 722 private static native int writeFile(long handle, long address, int len, 723 long offset, long overlapped) throws IOException; 724 725 private static native int lockFile(long handle, long position, long size, 726 boolean shared, long overlapped) throws IOException; 727 728 static { 729 IOUtil.load(); 730 } 731 }