1 /* 2 * Copyright (c) 2008, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package sun.nio.ch; 27 28 import java.nio.channels.*; 29 import java.util.concurrent.*; 30 import java.nio.ByteBuffer; 31 import java.nio.BufferOverflowException; 32 import java.io.IOException; 33 import java.io.FileDescriptor; 34 import jdk.internal.misc.SharedSecrets; 35 import jdk.internal.misc.JavaIOFileDescriptorAccess; 36 37 /** 38 * Windows implementation of AsynchronousFileChannel using overlapped I/O. 39 */ 40 41 public class WindowsAsynchronousFileChannelImpl 42 extends AsynchronousFileChannelImpl 43 implements Iocp.OverlappedChannel, Groupable 44 { 45 private static final JavaIOFileDescriptorAccess fdAccess = 46 SharedSecrets.getJavaIOFileDescriptorAccess(); 47 48 // error when EOF is detected asynchronously. 49 private static final int ERROR_HANDLE_EOF = 38; 50 51 // Lazy initialization of default I/O completion port 52 private static class DefaultIocpHolder { 53 static final Iocp defaultIocp = defaultIocp(); 54 private static Iocp defaultIocp() { 55 try { 56 return new Iocp(null, ThreadPool.createDefault()).start(); 57 } catch (IOException ioe) { 58 throw new InternalError(ioe); 59 } 60 } 61 } 62 63 // Used for force/truncate/size methods 64 private static final FileDispatcher nd = new FileDispatcherImpl(); 65 66 // The handle is extracted for use in native methods invoked from this class. 67 private final long handle; 68 69 // The key that identifies the channel's association with the I/O port 70 private final int completionKey; 71 72 // I/O completion port (group) 73 private final Iocp iocp; 74 75 private final boolean isDefaultIocp; 76 77 // Caches OVERLAPPED structure for each outstanding I/O operation 78 private final PendingIoCache ioCache; 79 80 81 private WindowsAsynchronousFileChannelImpl(FileDescriptor fdObj, 82 boolean reading, 83 boolean writing, 84 Iocp iocp, 85 boolean isDefaultIocp) 86 throws IOException 87 { 88 super(fdObj, reading, writing, iocp.executor()); 89 this.handle = fdAccess.getHandle(fdObj); 90 this.iocp = iocp; 91 this.isDefaultIocp = isDefaultIocp; 92 this.ioCache = new PendingIoCache(); 93 this.completionKey = iocp.associate(this, handle); 94 } 95 96 public static AsynchronousFileChannel open(FileDescriptor fdo, 97 boolean reading, 98 boolean writing, 99 ThreadPool pool) 100 throws IOException 101 { 102 Iocp iocp; 103 boolean isDefaultIocp; 104 if (pool == null) { 105 iocp = DefaultIocpHolder.defaultIocp; 106 isDefaultIocp = true; 107 } else { 108 iocp = new Iocp(null, pool).start(); 109 isDefaultIocp = false; 110 } 111 try { 112 return new 113 WindowsAsynchronousFileChannelImpl(fdo, reading, writing, iocp, isDefaultIocp); 114 } catch (IOException x) { 115 // error binding to port so need to close it (if created for this channel) 116 if (!isDefaultIocp) 117 iocp.implClose(); 118 throw x; 119 } 120 } 121 122 @Override 123 public <V,A> PendingFuture<V,A> getByOverlapped(long overlapped) { 124 return ioCache.remove(overlapped); 125 } 126 127 @Override 128 public void close() throws IOException { 129 closeLock.writeLock().lock(); 130 try { 131 if (closed) 132 return; // already closed 133 closed = true; 134 } finally { 135 closeLock.writeLock().unlock(); 136 } 137 138 // invalidate all locks held for this channel 139 invalidateAllLocks(); 140 141 // close the file 142 close0(handle); 143 144 // waits until all I/O operations have completed 145 ioCache.close(); 146 147 // disassociate from port 148 iocp.disassociate(completionKey); 149 150 // for the non-default group close the port 151 if (!isDefaultIocp) 152 iocp.detachFromThreadPool(); 153 } 154 155 @Override 156 public AsynchronousChannelGroupImpl group() { 157 return iocp; 158 } 159 160 /** 161 * Translates Throwable to IOException 162 */ 163 private static IOException toIOException(Throwable x) { 164 if (x instanceof IOException) { 165 if (x instanceof ClosedChannelException) 166 x = new AsynchronousCloseException(); 167 return (IOException)x; 168 } 169 return new IOException(x); 170 } 171 172 @Override 173 public long size() throws IOException { 174 try { 175 begin(); 176 return nd.size(fdObj); 177 } finally { 178 end(); 179 } 180 } 181 182 @Override 183 public AsynchronousFileChannel truncate(long size) throws IOException { 184 if (size < 0) 185 throw new IllegalArgumentException("Negative size"); 186 if (!writing) 187 throw new NonWritableChannelException(); 188 try { 189 begin(); 190 if (size > nd.size(fdObj)) 191 return this; 192 nd.truncate(fdObj, size); 193 } finally { 194 end(); 195 } 196 return this; 197 } 198 199 @Override 200 public void force(boolean metaData) throws IOException { 201 try { 202 begin(); 203 nd.force(fdObj, metaData); 204 } finally { 205 end(); 206 } 207 } 208 209 // -- file locking -- 210 211 /** 212 * Task that initiates locking operation and handles completion result. 213 */ 214 private class LockTask<A> implements Runnable, Iocp.ResultHandler { 215 private final long position; 216 private final FileLockImpl fli; 217 private final PendingFuture<FileLock,A> result; 218 219 LockTask(long position, 220 FileLockImpl fli, 221 PendingFuture<FileLock,A> result) 222 { 223 this.position = position; 224 this.fli = fli; 225 this.result = result; 226 } 227 228 @Override 229 public void run() { 230 long overlapped = 0L; 231 boolean pending = false; 232 try { 233 begin(); 234 235 // allocate OVERLAPPED structure 236 overlapped = ioCache.add(result); 237 238 // synchronize on result to avoid race with handler thread 239 // when lock is acquired immediately. 240 synchronized (result) { 241 int n = lockFile(handle, position, fli.size(), fli.isShared(), 242 overlapped); 243 if (n == IOStatus.UNAVAILABLE) { 244 // I/O is pending 245 pending = true; 246 return; 247 } 248 // acquired lock immediately 249 result.setResult(fli); 250 } 251 252 } catch (Throwable x) { 253 // lock failed or channel closed 254 removeFromFileLockTable(fli); 255 result.setFailure(toIOException(x)); 256 } finally { 257 if (!pending && overlapped != 0L) 258 ioCache.remove(overlapped); 259 end(); 260 } 261 262 // invoke completion handler 263 Invoker.invoke(result); 264 } 265 266 @Override 267 public void completed(int bytesTransferred, boolean canInvokeDirect) { 268 // release waiters and invoke completion handler 269 result.setResult(fli); 270 if (canInvokeDirect) { 271 Invoker.invokeUnchecked(result); 272 } else { 273 Invoker.invoke(result); 274 } 275 } 276 277 @Override 278 public void failed(int error, IOException x) { 279 // lock not acquired so remove from lock table 280 removeFromFileLockTable(fli); 281 282 // release waiters 283 if (isOpen()) { 284 result.setFailure(x); 285 } else { 286 result.setFailure(new AsynchronousCloseException()); 287 } 288 Invoker.invoke(result); 289 } 290 } 291 292 @Override 293 <A> Future<FileLock> implLock(final long position, 294 final long size, 295 final boolean shared, 296 A attachment, 297 final CompletionHandler<FileLock,? super A> handler) 298 { 299 if (shared && !reading) 300 throw new NonReadableChannelException(); 301 if (!shared && !writing) 302 throw new NonWritableChannelException(); 303 304 // add to lock table 305 FileLockImpl fli = addToFileLockTable(position, size, shared); 306 if (fli == null) { 307 Throwable exc = new ClosedChannelException(); 308 if (handler == null) 309 return CompletedFuture.withFailure(exc); 310 Invoker.invoke(this, handler, attachment, null, exc); 311 return null; 312 } 313 314 // create Future and task that will be invoked to acquire lock 315 PendingFuture<FileLock,A> result = 316 new PendingFuture<FileLock,A>(this, handler, attachment); 317 LockTask<A> lockTask = new LockTask<A>(position, fli, result); 318 result.setContext(lockTask); 319 320 // initiate I/O 321 lockTask.run(); 322 return result; 323 } 324 325 static final int NO_LOCK = -1; // Failed to lock 326 static final int LOCKED = 0; // Obtained requested lock 327 328 @Override 329 public FileLock tryLock(long position, long size, boolean shared) 330 throws IOException 331 { 332 if (shared && !reading) 333 throw new NonReadableChannelException(); 334 if (!shared && !writing) 335 throw new NonWritableChannelException(); 336 337 // add to lock table 338 final FileLockImpl fli = addToFileLockTable(position, size, shared); 339 if (fli == null) 340 throw new ClosedChannelException(); 341 342 boolean gotLock = false; 343 try { 344 begin(); 345 // try to acquire the lock 346 int res = nd.lock(fdObj, false, position, size, shared); 347 if (res == NO_LOCK) 348 return null; 349 gotLock = true; 350 return fli; 351 } finally { 352 if (!gotLock) 353 removeFromFileLockTable(fli); 354 end(); 355 } 356 } 357 358 @Override 359 protected void implRelease(FileLockImpl fli) throws IOException { 360 nd.release(fdObj, fli.position(), fli.size()); 361 } 362 363 /** 364 * Task that initiates read operation and handles completion result. 365 */ 366 private class ReadTask<A> implements Runnable, Iocp.ResultHandler { 367 private final ByteBuffer dst; 368 private final int pos, rem; // buffer position/remaining 369 private final long position; // file position 370 private final PendingFuture<Integer,A> result; 371 372 // set to dst if direct; otherwise set to substituted direct buffer 373 private volatile ByteBuffer buf; 374 375 ReadTask(ByteBuffer dst, 376 int pos, 377 int rem, 378 long position, 379 PendingFuture<Integer,A> result) 380 { 381 this.dst = dst; 382 this.pos = pos; 383 this.rem = rem; 384 this.position = position; 385 this.result = result; 386 } 387 388 void releaseBufferIfSubstituted() { 389 if (buf != dst) 390 Util.releaseTemporaryDirectBuffer(buf); 391 } 392 393 void updatePosition(int bytesTransferred) { 394 // if the I/O succeeded then adjust buffer position 395 if (bytesTransferred > 0) { 396 if (buf == dst) { 397 try { 398 dst.position(pos + bytesTransferred); 399 } catch (IllegalArgumentException x) { 400 // someone has changed the position; ignore 401 } 402 } else { 403 // had to substitute direct buffer 404 buf.position(bytesTransferred).flip(); 405 try { 406 dst.put(buf); 407 } catch (BufferOverflowException x) { 408 // someone has changed the position; ignore 409 } 410 } 411 } 412 } 413 414 @Override 415 public void run() { 416 int n = -1; 417 long overlapped = 0L; 418 long address; 419 420 // Substitute a native buffer if not direct 421 if (dst instanceof DirectBuffer) { 422 buf = dst; 423 address = ((DirectBuffer)dst).address() + pos; 424 } else { 425 buf = Util.getTemporaryDirectBuffer(rem); 426 address = ((DirectBuffer)buf).address(); 427 } 428 429 boolean pending = false; 430 try { 431 begin(); 432 433 // allocate OVERLAPPED 434 overlapped = ioCache.add(result); 435 436 // initiate read 437 n = readFile(handle, address, rem, position, overlapped); 438 if (n == IOStatus.UNAVAILABLE) { 439 // I/O is pending 440 pending = true; 441 return; 442 } else if (n == IOStatus.EOF) { 443 result.setResult(n); 444 } else { 445 throw new InternalError("Unexpected result: " + n); 446 } 447 448 } catch (Throwable x) { 449 // failed to initiate read 450 result.setFailure(toIOException(x)); 451 } finally { 452 if (!pending) { 453 // release resources 454 if (overlapped != 0L) 455 ioCache.remove(overlapped); 456 releaseBufferIfSubstituted(); 457 } 458 end(); 459 } 460 461 // invoke completion handler 462 Invoker.invoke(result); 463 } 464 465 /** 466 * Executed when the I/O has completed 467 */ 468 @Override 469 public void completed(int bytesTransferred, boolean canInvokeDirect) { 470 updatePosition(bytesTransferred); 471 472 // return direct buffer to cache if substituted 473 releaseBufferIfSubstituted(); 474 475 // release waiters and invoke completion handler 476 result.setResult(bytesTransferred); 477 if (canInvokeDirect) { 478 Invoker.invokeUnchecked(result); 479 } else { 480 Invoker.invoke(result); 481 } 482 } 483 484 @Override 485 public void failed(int error, IOException x) { 486 // if EOF detected asynchronously then it is reported as error 487 if (error == ERROR_HANDLE_EOF) { 488 completed(-1, false); 489 } else { 490 // return direct buffer to cache if substituted 491 releaseBufferIfSubstituted(); 492 493 // release waiters 494 if (isOpen()) { 495 result.setFailure(x); 496 } else { 497 result.setFailure(new AsynchronousCloseException()); 498 } 499 Invoker.invoke(result); 500 } 501 } 502 } 503 504 @Override 505 <A> Future<Integer> implRead(ByteBuffer dst, 506 long position, 507 A attachment, 508 CompletionHandler<Integer,? super A> handler) 509 { 510 if (!reading) 511 throw new NonReadableChannelException(); 512 if (position < 0) 513 throw new IllegalArgumentException("Negative position"); 514 if (dst.isReadOnly()) 515 throw new IllegalArgumentException("Read-only buffer"); 516 517 // check if channel is closed 518 if (!isOpen()) { 519 Throwable exc = new ClosedChannelException(); 520 if (handler == null) 521 return CompletedFuture.withFailure(exc); 522 Invoker.invoke(this, handler, attachment, null, exc); 523 return null; 524 } 525 526 int pos = dst.position(); 527 int lim = dst.limit(); 528 assert (pos <= lim); 529 int rem = (pos <= lim ? lim - pos : 0); 530 531 // no space remaining 532 if (rem == 0) { 533 if (handler == null) 534 return CompletedFuture.withResult(0); 535 Invoker.invoke(this, handler, attachment, 0, null); 536 return null; 537 } 538 539 // create Future and task that initiates read 540 PendingFuture<Integer,A> result = 541 new PendingFuture<Integer,A>(this, handler, attachment); 542 ReadTask<A> readTask = new ReadTask<A>(dst, pos, rem, position, result); 543 result.setContext(readTask); 544 545 // initiate I/O 546 readTask.run(); 547 return result; 548 } 549 550 /** 551 * Task that initiates write operation and handles completion result. 552 */ 553 private class WriteTask<A> implements Runnable, Iocp.ResultHandler { 554 private final ByteBuffer src; 555 private final int pos, rem; // buffer position/remaining 556 private final long position; // file position 557 private final PendingFuture<Integer,A> result; 558 559 // set to src if direct; otherwise set to substituted direct buffer 560 private volatile ByteBuffer buf; 561 562 WriteTask(ByteBuffer src, 563 int pos, 564 int rem, 565 long position, 566 PendingFuture<Integer,A> result) 567 { 568 this.src = src; 569 this.pos = pos; 570 this.rem = rem; 571 this.position = position; 572 this.result = result; 573 } 574 575 void releaseBufferIfSubstituted() { 576 if (buf != src) 577 Util.releaseTemporaryDirectBuffer(buf); 578 } 579 580 void updatePosition(int bytesTransferred) { 581 // if the I/O succeeded then adjust buffer position 582 if (bytesTransferred > 0) { 583 try { 584 src.position(pos + bytesTransferred); 585 } catch (IllegalArgumentException x) { 586 // someone has changed the position 587 } 588 } 589 } 590 591 @Override 592 public void run() { 593 int n = -1; 594 long overlapped = 0L; 595 long address; 596 597 // Substitute a native buffer if not direct 598 if (src instanceof DirectBuffer) { 599 buf = src; 600 address = ((DirectBuffer)src).address() + pos; 601 } else { 602 buf = Util.getTemporaryDirectBuffer(rem); 603 buf.put(src); 604 buf.flip(); 605 // temporarily restore position as we don't know how many bytes 606 // will be written 607 src.position(pos); 608 address = ((DirectBuffer)buf).address(); 609 } 610 611 try { 612 begin(); 613 614 // allocate an OVERLAPPED structure 615 overlapped = ioCache.add(result); 616 617 // initiate the write 618 n = writeFile(handle, address, rem, position, overlapped); 619 if (n == IOStatus.UNAVAILABLE) { 620 // I/O is pending 621 return; 622 } else { 623 throw new InternalError("Unexpected result: " + n); 624 } 625 626 } catch (Throwable x) { 627 // failed to initiate read: 628 result.setFailure(toIOException(x)); 629 630 // release resources 631 if (overlapped != 0L) 632 ioCache.remove(overlapped); 633 releaseBufferIfSubstituted(); 634 635 } finally { 636 end(); 637 } 638 639 // invoke completion handler 640 Invoker.invoke(result); 641 } 642 643 /** 644 * Executed when the I/O has completed 645 */ 646 @Override 647 public void completed(int bytesTransferred, boolean canInvokeDirect) { 648 updatePosition(bytesTransferred); 649 650 // return direct buffer to cache if substituted 651 releaseBufferIfSubstituted(); 652 653 // release waiters and invoke completion handler 654 result.setResult(bytesTransferred); 655 if (canInvokeDirect) { 656 Invoker.invokeUnchecked(result); 657 } else { 658 Invoker.invoke(result); 659 } 660 } 661 662 @Override 663 public void failed(int error, IOException x) { 664 // return direct buffer to cache if substituted 665 releaseBufferIfSubstituted(); 666 667 // release waiters and invoker completion handler 668 if (isOpen()) { 669 result.setFailure(x); 670 } else { 671 result.setFailure(new AsynchronousCloseException()); 672 } 673 Invoker.invoke(result); 674 } 675 } 676 677 <A> Future<Integer> implWrite(ByteBuffer src, 678 long position, 679 A attachment, 680 CompletionHandler<Integer,? super A> handler) 681 { 682 if (!writing) 683 throw new NonWritableChannelException(); 684 if (position < 0) 685 throw new IllegalArgumentException("Negative position"); 686 687 // check if channel is closed 688 if (!isOpen()) { 689 Throwable exc = new ClosedChannelException(); 690 if (handler == null) 691 return CompletedFuture.withFailure(exc); 692 Invoker.invoke(this, handler, attachment, null, exc); 693 return null; 694 } 695 696 int pos = src.position(); 697 int lim = src.limit(); 698 assert (pos <= lim); 699 int rem = (pos <= lim ? lim - pos : 0); 700 701 // nothing to write 702 if (rem == 0) { 703 if (handler == null) 704 return CompletedFuture.withResult(0); 705 Invoker.invoke(this, handler, attachment, 0, null); 706 return null; 707 } 708 709 // create Future and task to initiate write 710 PendingFuture<Integer,A> result = 711 new PendingFuture<Integer,A>(this, handler, attachment); 712 WriteTask<A> writeTask = new WriteTask<A>(src, pos, rem, position, result); 713 result.setContext(writeTask); 714 715 // initiate I/O 716 writeTask.run(); 717 return result; 718 } 719 720 // -- Native methods -- 721 722 private static native int readFile(long handle, long address, int len, 723 long offset, long overlapped) throws IOException; 724 725 private static native int writeFile(long handle, long address, int len, 726 long offset, long overlapped) throws IOException; 727 728 private static native int lockFile(long handle, long position, long size, 729 boolean shared, long overlapped) throws IOException; 730 731 private static native void close0(long handle); 732 733 static { 734 IOUtil.load(); 735 } 736 }