458 } 459 } 460 461 @Override 462 @SuppressWarnings("unchecked") 463 public void run() { 464 long overlapped = 0L; 465 boolean prepared = false; 466 boolean pending = false; 467 468 try { 469 begin(); 470 471 // substitute non-direct buffers 472 prepareBuffers(); 473 prepared = true; 474 475 // get an OVERLAPPED structure (from the cache or allocate) 476 overlapped = ioCache.add(result); 477 478 // synchronize on result to allow this thread handle the case 479 // where the read completes immediately. 480 synchronized (result) { 481 int n = read0(handle, numBufs, readBufferArray, overlapped); 482 if (n == IOStatus.UNAVAILABLE) { 483 // I/O is pending 484 pending = true; 485 return; 486 } 487 // read completed immediately: 488 // 1. update buffer position 489 // 2. reset read flag 490 // 3. release waiters 491 if (n == 0) { 492 n = -1; 493 } else { 494 updateBuffers(n); 495 } 496 enableReading(); 497 498 if (scatteringRead) { 499 result.setResult((V)Long.valueOf(n)); 500 } else { 501 result.setResult((V)Integer.valueOf(n)); 502 } 503 } 504 } catch (Throwable x) { 505 // failed to initiate read: 506 // 1. reset read flag 507 // 2. free resources 508 // 3. release waiters 509 enableReading(); 510 if (overlapped != 0L) 511 ioCache.remove(overlapped); 512 if (x instanceof ClosedChannelException) 513 x = new AsynchronousCloseException(); 514 if (!(x instanceof IOException)) 515 x = new IOException(x); 516 result.setFailure(x); 517 } finally { 518 if (prepared && !pending) { 519 // return direct buffer(s) to cache if substituted 520 releaseBuffers(); 521 } 522 end(); 523 } 524 525 // invoke completion handler 526 Invoker.invoke(result.handler(), result); 527 } 528 529 /** 530 * Executed when the I/O has completed 531 */ 532 @Override 533 @SuppressWarnings("unchecked") 534 public void completed(int bytesTransferred) { 535 if (bytesTransferred == 0) { 536 bytesTransferred = -1; // EOF 537 } else { 538 updateBuffers(bytesTransferred); 539 } 704 } catch (IllegalArgumentException x) { 705 // position changed by someone else 706 } 707 } 708 break; 709 } 710 } 711 } 712 713 void releaseBuffers() { 714 for (int i=0; i<numBufs; i++) { 715 if (!(bufs[i] instanceof DirectBuffer)) { 716 Util.releaseTemporaryDirectBuffer(shadow[i]); 717 } 718 } 719 } 720 721 @Override 722 @SuppressWarnings("unchecked") 723 public void run() { 724 int n = -1; 725 long overlapped = 0L; 726 boolean prepared = false; 727 boolean pending = false; 728 boolean shutdown = false; 729 730 try { 731 begin(); 732 733 // substitute non-direct buffers 734 prepareBuffers(); 735 prepared = true; 736 737 // get an OVERLAPPED structure (from the cache or allocate) 738 overlapped = ioCache.add(result); 739 740 // synchronize on result to allow this thread handle the case 741 // where the read completes immediately. 742 synchronized (result) { 743 n = write0(handle, numBufs, writeBufferArray, overlapped); 744 if (n == IOStatus.UNAVAILABLE) { 745 // I/O is pending 746 pending = true; 747 return; 748 } 749 750 enableWriting(); 751 752 if (n == IOStatus.EOF) { 753 // special case for shutdown output 754 shutdown = true; 755 throw new ClosedChannelException(); 756 } 757 758 // write completed immediately: 759 // 1. enable writing 760 // 2. update buffer position 761 // 3. release waiters 762 updateBuffers(n); 763 764 // result is a Long or Integer 765 if (gatheringWrite) { 766 result.setResult((V)Long.valueOf(n)); 767 } else { 768 result.setResult((V)Integer.valueOf(n)); 769 } 770 } 771 } catch (Throwable x) { 772 enableWriting(); 773 774 // failed to initiate read: 775 if (!shutdown && (x instanceof ClosedChannelException)) 776 x = new AsynchronousCloseException(); 777 if (!(x instanceof IOException)) 778 x = new IOException(x); 779 result.setFailure(x); 780 781 // release resources 782 if (overlapped != 0L) 783 ioCache.remove(overlapped); 784 785 } finally { 786 if (prepared && !pending) { 787 // return direct buffer(s) to cache if substituted 788 releaseBuffers(); 789 } 790 end(); 791 } 792 793 // invoke completion handler 794 Invoker.invoke(result.handler(), result); 795 } 796 797 /** 798 * Executed when the I/O has completed 799 */ 800 @Override 801 @SuppressWarnings("unchecked") 802 public void completed(int bytesTransferred) { 803 updateBuffers(bytesTransferred); 804 805 // return direct buffer to cache if substituted 806 releaseBuffers(); 807 | 458 } 459 } 460 461 @Override 462 @SuppressWarnings("unchecked") 463 public void run() { 464 long overlapped = 0L; 465 boolean prepared = false; 466 boolean pending = false; 467 468 try { 469 begin(); 470 471 // substitute non-direct buffers 472 prepareBuffers(); 473 prepared = true; 474 475 // get an OVERLAPPED structure (from the cache or allocate) 476 overlapped = ioCache.add(result); 477 478 // initiate read 479 int n = read0(handle, numBufs, readBufferArray, overlapped); 480 if (n == IOStatus.UNAVAILABLE) { 481 // I/O is pending 482 pending = true; 483 return; 484 } 485 if (n == IOStatus.EOF) { 486 // input shutdown 487 enableReading(); 488 if (scatteringRead) { 489 result.setResult((V)Long.valueOf(-1L)); 490 } else { 491 result.setResult((V)Integer.valueOf(-1)); 492 } 493 } else { 494 throw new InternalError("Read completed immediately"); 495 } 496 } catch (Throwable x) { 497 // failed to initiate read 498 // reset read flag before releasing waiters 499 enableReading(); 500 if (x instanceof ClosedChannelException) 501 x = new AsynchronousCloseException(); 502 if (!(x instanceof IOException)) 503 x = new IOException(x); 504 result.setFailure(x); 505 } finally { 506 // release resources if I/O not pending 507 if (!pending) { 508 if (overlapped != 0L) 509 ioCache.remove(overlapped); 510 if (prepared) 511 releaseBuffers(); 512 } 513 end(); 514 } 515 516 // invoke completion handler 517 Invoker.invoke(result.handler(), result); 518 } 519 520 /** 521 * Executed when the I/O has completed 522 */ 523 @Override 524 @SuppressWarnings("unchecked") 525 public void completed(int bytesTransferred) { 526 if (bytesTransferred == 0) { 527 bytesTransferred = -1; // EOF 528 } else { 529 updateBuffers(bytesTransferred); 530 } 695 } catch (IllegalArgumentException x) { 696 // position changed by someone else 697 } 698 } 699 break; 700 } 701 } 702 } 703 704 void releaseBuffers() { 705 for (int i=0; i<numBufs; i++) { 706 if (!(bufs[i] instanceof DirectBuffer)) { 707 Util.releaseTemporaryDirectBuffer(shadow[i]); 708 } 709 } 710 } 711 712 @Override 713 @SuppressWarnings("unchecked") 714 public void run() { 715 long overlapped = 0L; 716 boolean prepared = false; 717 boolean pending = false; 718 boolean shutdown = false; 719 720 try { 721 begin(); 722 723 // substitute non-direct buffers 724 prepareBuffers(); 725 prepared = true; 726 727 // get an OVERLAPPED structure (from the cache or allocate) 728 overlapped = ioCache.add(result); 729 int n = write0(handle, numBufs, writeBufferArray, overlapped); 730 if (n == IOStatus.UNAVAILABLE) { 731 // I/O is pending 732 pending = true; 733 return; 734 } 735 if (n == IOStatus.EOF) { 736 // special case for shutdown output 737 shutdown = true; 738 throw new ClosedChannelException(); 739 } 740 // write completed immediately 741 throw new InternalError("Write completed immediately"); 742 } catch (Throwable x) { 743 // write failed. Enable writing before releasing waiters. 744 enableWriting(); 745 if (!shutdown && (x instanceof ClosedChannelException)) 746 x = new AsynchronousCloseException(); 747 if (!(x instanceof IOException)) 748 x = new IOException(x); 749 result.setFailure(x); 750 } finally { 751 // release resources if I/O not pending 752 if (!pending) { 753 if (overlapped != 0L) 754 ioCache.remove(overlapped); 755 if (prepared) 756 releaseBuffers(); 757 } 758 end(); 759 } 760 761 // invoke completion handler 762 Invoker.invoke(result.handler(), result); 763 } 764 765 /** 766 * Executed when the I/O has completed 767 */ 768 @Override 769 @SuppressWarnings("unchecked") 770 public void completed(int bytesTransferred) { 771 updateBuffers(bytesTransferred); 772 773 // return direct buffer to cache if substituted 774 releaseBuffers(); 775 |