1 /* 2 * Copyright (c) 2008, 2011, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 /* @test 25 * @bug 4607272 6842687 6878369 6944810 7023403 26 * @summary Unit test for AsynchronousSocketChannel 27 * @run main Basic -skipSlowConnectTest 28 * @key randomness 29 */ 30 31 import java.nio.ByteBuffer; 32 import java.nio.channels.*; 33 import static java.net.StandardSocketOptions.*; 34 import java.net.*; 35 import java.util.Random; 36 import java.util.concurrent.*; 37 import java.util.concurrent.atomic.*; 38 import java.io.Closeable; 39 import java.io.IOException; 40 41 public class Basic { 42 static final Random rand = new Random(); 43 44 static boolean skipSlowConnectTest = false; 45 46 public static void main(String[] args) throws Exception { 47 for (String arg: args) { 48 switch (arg) { 49 case "-skipSlowConnectTest" : 50 skipSlowConnectTest = true; 51 break; 52 default: 53 throw new RuntimeException("Unrecognized argument: " + arg); 54 } 55 } 56 57 testBind(); 58 testSocketOptions(); 59 testConnect(); 60 testCloseWhenPending(); 61 testCancel(); 62 testRead1(); 63 testRead2(); 64 testRead3(); 65 testWrite1(); 66 testWrite2(); 67 // skip timeout tests until 7052549 is fixed 68 if (!System.getProperty("os.name").startsWith("Windows")) 69 testTimeout(); 70 testShutdown(); 71 } 72 73 static class Server implements Closeable { 74 private final ServerSocketChannel ssc; 75 private final InetSocketAddress address; 76 77 Server() throws IOException { 78 ssc = ServerSocketChannel.open().bind(new InetSocketAddress(0)); 79 80 InetAddress lh = InetAddress.getLocalHost(); 81 int port = ((InetSocketAddress)(ssc.getLocalAddress())).getPort(); 82 address = new InetSocketAddress(lh, port); 83 } 84 85 InetSocketAddress address() { 86 return address; 87 } 88 89 SocketChannel accept() throws IOException { 90 return ssc.accept(); 91 } 92 93 public void close() throws IOException { 94 ssc.close(); 95 } 96 97 } 98 99 static void testBind() throws Exception { 100 System.out.println("-- bind --"); 101 102 try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) { 103 if (ch.getLocalAddress() != null) 104 throw new RuntimeException("Local address should be 'null'"); 105 ch.bind(new InetSocketAddress(0)); 106 107 // check local address after binding 108 InetSocketAddress local = (InetSocketAddress)ch.getLocalAddress(); 109 if (local.getPort() == 0) 110 throw new RuntimeException("Unexpected port"); 111 if (!local.getAddress().isAnyLocalAddress()) 112 throw new RuntimeException("Not bound to a wildcard address"); 113 114 // try to re-bind 115 try { 116 ch.bind(new InetSocketAddress(0)); 117 throw new RuntimeException("AlreadyBoundException expected"); 118 } catch (AlreadyBoundException x) { 119 } 120 } 121 122 // check ClosedChannelException 123 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 124 ch.close(); 125 try { 126 ch.bind(new InetSocketAddress(0)); 127 throw new RuntimeException("ClosedChannelException expected"); 128 } catch (ClosedChannelException x) { 129 } 130 } 131 132 static void testSocketOptions() throws Exception { 133 System.out.println("-- socket options --"); 134 135 try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) { 136 ch.setOption(SO_RCVBUF, 128*1024) 137 .setOption(SO_SNDBUF, 128*1024) 138 .setOption(SO_REUSEADDR, true); 139 140 // check SO_SNDBUF/SO_RCVBUF limits 141 int before, after; 142 before = ch.getOption(SO_SNDBUF); 143 after = ch.setOption(SO_SNDBUF, Integer.MAX_VALUE).getOption(SO_SNDBUF); 144 if (after < before) 145 throw new RuntimeException("setOption caused SO_SNDBUF to decrease"); 146 before = ch.getOption(SO_RCVBUF); 147 after = ch.setOption(SO_RCVBUF, Integer.MAX_VALUE).getOption(SO_RCVBUF); 148 if (after < before) 149 throw new RuntimeException("setOption caused SO_RCVBUF to decrease"); 150 151 ch.bind(new InetSocketAddress(0)); 152 153 // default values 154 if (ch.getOption(SO_KEEPALIVE)) 155 throw new RuntimeException("Default of SO_KEEPALIVE should be 'false'"); 156 if (ch.getOption(TCP_NODELAY)) 157 throw new RuntimeException("Default of TCP_NODELAY should be 'false'"); 158 159 // set and check 160 if (!ch.setOption(SO_KEEPALIVE, true).getOption(SO_KEEPALIVE)) 161 throw new RuntimeException("SO_KEEPALIVE did not change"); 162 if (!ch.setOption(TCP_NODELAY, true).getOption(TCP_NODELAY)) 163 throw new RuntimeException("SO_KEEPALIVE did not change"); 164 165 // read others (can't check as actual value is implementation dependent) 166 ch.getOption(SO_RCVBUF); 167 ch.getOption(SO_SNDBUF); 168 } 169 } 170 171 static void testConnect() throws Exception { 172 System.out.println("-- connect --"); 173 174 SocketAddress address; 175 176 try (Server server = new Server()) { 177 address = server.address(); 178 179 // connect to server and check local/remote addresses 180 try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) { 181 ch.connect(address).get(); 182 // check local address 183 if (ch.getLocalAddress() == null) 184 throw new RuntimeException("Not bound to local address"); 185 186 // check remote address 187 InetSocketAddress remote = (InetSocketAddress)ch.getRemoteAddress(); 188 if (remote.getPort() != server.address().getPort()) 189 throw new RuntimeException("Connected to unexpected port"); 190 if (!remote.getAddress().equals(server.address().getAddress())) 191 throw new RuntimeException("Connected to unexpected address"); 192 193 // try to connect again 194 try { 195 ch.connect(server.address()).get(); 196 throw new RuntimeException("AlreadyConnectedException expected"); 197 } catch (AlreadyConnectedException x) { 198 } 199 200 // clean-up 201 server.accept().close(); 202 } 203 204 // check that connect fails with ClosedChannelException 205 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 206 ch.close(); 207 try { 208 ch.connect(server.address()).get(); 209 throw new RuntimeException("ExecutionException expected"); 210 } catch (ExecutionException x) { 211 if (!(x.getCause() instanceof ClosedChannelException)) 212 throw new RuntimeException("Cause of ClosedChannelException expected"); 213 } 214 final AtomicReference<Throwable> connectException = new AtomicReference<>(); 215 ch.connect(server.address(), (Void)null, new CompletionHandler<Void,Void>() { 216 public void completed(Void result, Void att) { 217 } 218 public void failed(Throwable exc, Void att) { 219 connectException.set(exc); 220 } 221 }); 222 while (connectException.get() == null) { 223 Thread.sleep(100); 224 } 225 if (!(connectException.get() instanceof ClosedChannelException)) 226 throw new RuntimeException("ClosedChannelException expected"); 227 } 228 229 // test that failure to connect closes the channel 230 try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) { 231 try { 232 ch.connect(address).get(); 233 } catch (ExecutionException x) { 234 // failed to establish connection 235 if (ch.isOpen()) 236 throw new RuntimeException("Channel should be closed"); 237 } 238 } 239 240 // repeat test by connecting to a (probably) non-existent host. This 241 // improves the chance that the connect will not fail immediately. 242 if (!skipSlowConnectTest) { 243 try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) { 244 try { 245 ch.connect(genSocketAddress()).get(); 246 } catch (ExecutionException x) { 247 // failed to establish connection 248 if (ch.isOpen()) 249 throw new RuntimeException("Channel should be closed"); 250 } 251 } 252 } 253 } 254 255 static void testCloseWhenPending() throws Exception { 256 System.out.println("-- asynchronous close when connecting --"); 257 258 AsynchronousSocketChannel ch; 259 260 // asynchronous close while connecting 261 ch = AsynchronousSocketChannel.open(); 262 Future<Void> connectResult = ch.connect(genSocketAddress()); 263 264 // give time to initiate the connect (SYN) 265 Thread.sleep(50); 266 267 // close 268 ch.close(); 269 270 // check that exception is thrown in timely manner 271 try { 272 connectResult.get(5, TimeUnit.SECONDS); 273 } catch (TimeoutException x) { 274 throw new RuntimeException("AsynchronousCloseException not thrown"); 275 } catch (ExecutionException x) { 276 // expected 277 } 278 279 System.out.println("-- asynchronous close when reading --"); 280 281 try (Server server = new Server()) { 282 ch = AsynchronousSocketChannel.open(); 283 ch.connect(server.address()).get(); 284 285 ByteBuffer dst = ByteBuffer.allocateDirect(100); 286 Future<Integer> result = ch.read(dst); 287 288 // attempt a second read - should fail with ReadPendingException 289 ByteBuffer buf = ByteBuffer.allocateDirect(100); 290 try { 291 ch.read(buf); 292 throw new RuntimeException("ReadPendingException expected"); 293 } catch (ReadPendingException x) { 294 } 295 296 // close channel (should cause initial read to complete) 297 ch.close(); 298 server.accept().close(); 299 300 // check that AsynchronousCloseException is thrown 301 try { 302 result.get(); 303 throw new RuntimeException("Should not read"); 304 } catch (ExecutionException x) { 305 if (!(x.getCause() instanceof AsynchronousCloseException)) 306 throw new RuntimeException(x); 307 } 308 309 System.out.println("-- asynchronous close when writing --"); 310 311 ch = AsynchronousSocketChannel.open(); 312 ch.connect(server.address()).get(); 313 314 final AtomicReference<Throwable> writeException = 315 new AtomicReference<Throwable>(); 316 317 // write bytes to fill socket buffer 318 ch.write(genBuffer(), ch, new CompletionHandler<Integer,AsynchronousSocketChannel>() { 319 public void completed(Integer result, AsynchronousSocketChannel ch) { 320 ch.write(genBuffer(), ch, this); 321 } 322 public void failed(Throwable x, AsynchronousSocketChannel ch) { 323 writeException.set(x); 324 } 325 }); 326 327 // give time for socket buffer to fill up. 328 Thread.sleep(5*1000); 329 330 // attempt a concurrent write - should fail with WritePendingException 331 try { 332 ch.write(genBuffer()); 333 throw new RuntimeException("WritePendingException expected"); 334 } catch (WritePendingException x) { 335 } 336 337 // close channel - should cause initial write to complete 338 ch.close(); 339 server.accept().close(); 340 341 // wait for exception 342 while (writeException.get() == null) { 343 Thread.sleep(100); 344 } 345 if (!(writeException.get() instanceof AsynchronousCloseException)) 346 throw new RuntimeException("AsynchronousCloseException expected"); 347 } 348 } 349 350 static void testCancel() throws Exception { 351 System.out.println("-- cancel --"); 352 353 try (Server server = new Server()) { 354 for (int i=0; i<2; i++) { 355 boolean mayInterruptIfRunning = (i == 0) ? false : true; 356 357 // establish loopback connection 358 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 359 ch.connect(server.address()).get(); 360 SocketChannel peer = server.accept(); 361 362 // start read operation 363 ByteBuffer buf = ByteBuffer.allocate(1); 364 Future<Integer> res = ch.read(buf); 365 366 // cancel operation 367 boolean cancelled = res.cancel(mayInterruptIfRunning); 368 369 // check post-conditions 370 if (!res.isDone()) 371 throw new RuntimeException("isDone should return true"); 372 if (res.isCancelled() != cancelled) 373 throw new RuntimeException("isCancelled not consistent"); 374 try { 375 res.get(); 376 throw new RuntimeException("CancellationException expected"); 377 } catch (CancellationException x) { 378 } 379 try { 380 res.get(1, TimeUnit.SECONDS); 381 throw new RuntimeException("CancellationException expected"); 382 } catch (CancellationException x) { 383 } 384 385 // check that the cancel doesn't impact writing to the channel 386 if (!mayInterruptIfRunning) { 387 buf = ByteBuffer.wrap("a".getBytes()); 388 ch.write(buf).get(); 389 } 390 391 ch.close(); 392 peer.close(); 393 } 394 } 395 } 396 397 static void testRead1() throws Exception { 398 System.out.println("-- read (1) --"); 399 400 try (Server server = new Server()) { 401 final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 402 ch.connect(server.address()).get(); 403 404 // read with 0 bytes remaining should complete immediately 405 ByteBuffer buf = ByteBuffer.allocate(1); 406 buf.put((byte)0); 407 int n = ch.read(buf).get(); 408 if (n != 0) 409 throw new RuntimeException("0 expected"); 410 411 // write bytes and close connection 412 ByteBuffer src = genBuffer(); 413 try (SocketChannel sc = server.accept()) { 414 sc.setOption(SO_SNDBUF, src.remaining()); 415 while (src.hasRemaining()) 416 sc.write(src); 417 } 418 419 // reads should complete immediately 420 final ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity() + 100); 421 final CountDownLatch latch = new CountDownLatch(1); 422 ch.read(dst, (Void)null, new CompletionHandler<Integer,Void>() { 423 public void completed(Integer result, Void att) { 424 int n = result; 425 if (n > 0) { 426 ch.read(dst, (Void)null, this); 427 } else { 428 latch.countDown(); 429 } 430 } 431 public void failed(Throwable exc, Void att) { 432 } 433 }); 434 435 latch.await(); 436 437 // check buffers 438 src.flip(); 439 dst.flip(); 440 if (!src.equals(dst)) { 441 throw new RuntimeException("Contents differ"); 442 } 443 444 // close channel 445 ch.close(); 446 447 // check read fails with ClosedChannelException 448 try { 449 ch.read(dst).get(); 450 throw new RuntimeException("ExecutionException expected"); 451 } catch (ExecutionException x) { 452 if (!(x.getCause() instanceof ClosedChannelException)) 453 throw new RuntimeException("Cause of ClosedChannelException expected"); 454 } 455 } 456 } 457 458 static void testRead2() throws Exception { 459 System.out.println("-- read (2) --"); 460 461 try (Server server = new Server()) { 462 final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 463 ch.connect(server.address()).get(); 464 SocketChannel sc = server.accept(); 465 466 ByteBuffer src = genBuffer(); 467 468 // read until the buffer is full 469 final ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity()); 470 final CountDownLatch latch = new CountDownLatch(1); 471 ch.read(dst, (Void)null, new CompletionHandler<Integer,Void>() { 472 public void completed(Integer result, Void att) { 473 if (dst.hasRemaining()) { 474 ch.read(dst, (Void)null, this); 475 } else { 476 latch.countDown(); 477 } 478 } 479 public void failed(Throwable exc, Void att) { 480 } 481 }); 482 483 // trickle the writing 484 do { 485 int rem = src.remaining(); 486 int size = (rem <= 100) ? rem : 50 + rand.nextInt(rem - 100); 487 ByteBuffer buf = ByteBuffer.allocate(size); 488 for (int i=0; i<size; i++) 489 buf.put(src.get()); 490 buf.flip(); 491 Thread.sleep(50 + rand.nextInt(1500)); 492 while (buf.hasRemaining()) 493 sc.write(buf); 494 } while (src.hasRemaining()); 495 496 // wait until ascynrhonous reading has completed 497 latch.await(); 498 499 // check buffers 500 src.flip(); 501 dst.flip(); 502 if (!src.equals(dst)) { 503 throw new RuntimeException("Contents differ"); 504 } 505 506 sc.close(); 507 ch.close(); 508 } 509 } 510 511 // exercise scattering read 512 static void testRead3() throws Exception { 513 System.out.println("-- read (3) --"); 514 515 try (Server server = new Server()) { 516 final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 517 ch.connect(server.address()).get(); 518 SocketChannel sc = server.accept(); 519 520 ByteBuffer[] dsts = new ByteBuffer[3]; 521 for (int i=0; i<dsts.length; i++) { 522 dsts[i] = ByteBuffer.allocateDirect(100); 523 } 524 525 // scattering read that completes ascynhronously 526 final CountDownLatch l1 = new CountDownLatch(1); 527 ch.read(dsts, 0, dsts.length, 0L, TimeUnit.SECONDS, (Void)null, 528 new CompletionHandler<Long,Void>() { 529 public void completed(Long result, Void att) { 530 long n = result; 531 if (n <= 0) 532 throw new RuntimeException("No bytes read"); 533 l1.countDown(); 534 } 535 public void failed(Throwable exc, Void att) { 536 } 537 }); 538 539 // write some bytes 540 sc.write(genBuffer()); 541 542 // read should now complete 543 l1.await(); 544 545 // write more bytes 546 sc.write(genBuffer()); 547 548 // read should complete immediately 549 for (int i=0; i<dsts.length; i++) { 550 dsts[i].rewind(); 551 } 552 553 final CountDownLatch l2 = new CountDownLatch(1); 554 ch.read(dsts, 0, dsts.length, 0L, TimeUnit.SECONDS, (Void)null, 555 new CompletionHandler<Long,Void>() { 556 public void completed(Long result, Void att) { 557 long n = result; 558 if (n <= 0) 559 throw new RuntimeException("No bytes read"); 560 l2.countDown(); 561 } 562 public void failed(Throwable exc, Void att) { 563 } 564 }); 565 l2.await(); 566 567 ch.close(); 568 sc.close(); 569 } 570 } 571 572 static void testWrite1() throws Exception { 573 System.out.println("-- write (1) --"); 574 575 try (Server server = new Server()) { 576 final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 577 ch.connect(server.address()).get(); 578 SocketChannel sc = server.accept(); 579 580 // write with 0 bytes remaining should complete immediately 581 ByteBuffer buf = ByteBuffer.allocate(1); 582 buf.put((byte)0); 583 int n = ch.write(buf).get(); 584 if (n != 0) 585 throw new RuntimeException("0 expected"); 586 587 // write all bytes and close connection when done 588 final ByteBuffer src = genBuffer(); 589 ch.write(src, (Void)null, new CompletionHandler<Integer,Void>() { 590 public void completed(Integer result, Void att) { 591 if (src.hasRemaining()) { 592 ch.write(src, (Void)null, this); 593 } else { 594 try { 595 ch.close(); 596 } catch (IOException ignore) { } 597 } 598 } 599 public void failed(Throwable exc, Void att) { 600 } 601 }); 602 603 // read to EOF or buffer full 604 ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity() + 100); 605 do { 606 n = sc.read(dst); 607 } while (n > 0); 608 sc.close(); 609 610 // check buffers 611 src.flip(); 612 dst.flip(); 613 if (!src.equals(dst)) { 614 throw new RuntimeException("Contents differ"); 615 } 616 617 // check write fails with ClosedChannelException 618 try { 619 ch.read(dst).get(); 620 throw new RuntimeException("ExecutionException expected"); 621 } catch (ExecutionException x) { 622 if (!(x.getCause() instanceof ClosedChannelException)) 623 throw new RuntimeException("Cause of ClosedChannelException expected"); 624 } 625 } 626 } 627 628 // exercise gathering write 629 static void testWrite2() throws Exception { 630 System.out.println("-- write (2) --"); 631 632 try (Server server = new Server()) { 633 final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 634 ch.connect(server.address()).get(); 635 SocketChannel sc = server.accept(); 636 637 // number of bytes written 638 final AtomicLong bytesWritten = new AtomicLong(0); 639 640 // write buffers (should complete immediately) 641 ByteBuffer[] srcs = genBuffers(1); 642 final CountDownLatch l1 = new CountDownLatch(1); 643 ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, (Void)null, 644 new CompletionHandler<Long,Void>() { 645 public void completed(Long result, Void att) { 646 long n = result; 647 if (n <= 0) 648 throw new RuntimeException("No bytes read"); 649 bytesWritten.addAndGet(n); 650 l1.countDown(); 651 } 652 public void failed(Throwable exc, Void att) { 653 } 654 }); 655 l1.await(); 656 657 // set to true to signal that no more buffers should be written 658 final AtomicBoolean continueWriting = new AtomicBoolean(true); 659 660 // write until socket buffer is full so as to create the conditions 661 // for when a write does not complete immediately 662 srcs = genBuffers(1); 663 ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, (Void)null, 664 new CompletionHandler<Long,Void>() { 665 public void completed(Long result, Void att) { 666 long n = result; 667 if (n <= 0) 668 throw new RuntimeException("No bytes written"); 669 bytesWritten.addAndGet(n); 670 if (continueWriting.get()) { 671 ByteBuffer[] srcs = genBuffers(8); 672 ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, 673 (Void)null, this); 674 } 675 } 676 public void failed(Throwable exc, Void att) { 677 } 678 }); 679 680 // give time for socket buffer to fill up. 681 Thread.sleep(5*1000); 682 683 // signal handler to stop further writing 684 continueWriting.set(false); 685 686 // read until done 687 ByteBuffer buf = ByteBuffer.allocateDirect(4096); 688 long total = 0L; 689 do { 690 int n = sc.read(buf); 691 if (n <= 0) 692 throw new RuntimeException("No bytes read"); 693 buf.rewind(); 694 total += n; 695 } while (total < bytesWritten.get()); 696 697 ch.close(); 698 sc.close(); 699 } 700 } 701 702 static void testShutdown() throws Exception { 703 System.out.println("-- shutdown--"); 704 705 try (Server server = new Server(); 706 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) 707 { 708 ch.connect(server.address()).get(); 709 try (SocketChannel peer = server.accept()) { 710 ByteBuffer buf = ByteBuffer.allocateDirect(1000); 711 int n; 712 713 // check read 714 ch.shutdownInput(); 715 n = ch.read(buf).get(); 716 if (n != -1) 717 throw new RuntimeException("-1 expected"); 718 // check full with full buffer 719 buf.put(new byte[100]); 720 n = ch.read(buf).get(); 721 if (n != -1) 722 throw new RuntimeException("-1 expected"); 723 724 // check write 725 ch.shutdownOutput(); 726 try { 727 ch.write(buf).get(); 728 throw new RuntimeException("ClosedChannelException expected"); 729 } catch (ExecutionException x) { 730 if (!(x.getCause() instanceof ClosedChannelException)) 731 throw new RuntimeException("ClosedChannelException expected"); 732 } 733 } 734 } 735 } 736 737 static void testTimeout() throws Exception { 738 System.out.println("-- timeouts --"); 739 testTimeout(Integer.MIN_VALUE, TimeUnit.SECONDS); 740 testTimeout(-1L, TimeUnit.SECONDS); 741 testTimeout(0L, TimeUnit.SECONDS); 742 testTimeout(2L, TimeUnit.SECONDS); 743 } 744 745 static void testTimeout(final long timeout, final TimeUnit unit) throws Exception { 746 try (Server server = new Server()) { 747 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 748 ch.connect(server.address()).get(); 749 750 ByteBuffer dst = ByteBuffer.allocate(512); 751 752 final AtomicReference<Throwable> readException = new AtomicReference<Throwable>(); 753 754 // this read should timeout if value is > 0 755 ch.read(dst, timeout, unit, null, new CompletionHandler<Integer,Void>() { 756 public void completed(Integer result, Void att) { 757 readException.set(new RuntimeException("Should not complete")); 758 } 759 public void failed(Throwable exc, Void att) { 760 readException.set(exc); 761 } 762 }); 763 if (timeout > 0L) { 764 // wait for exception 765 while (readException.get() == null) { 766 Thread.sleep(100); 767 } 768 if (!(readException.get() instanceof InterruptedByTimeoutException)) 769 throw new RuntimeException("InterruptedByTimeoutException expected"); 770 771 // after a timeout then further reading should throw unspecified runtime exception 772 boolean exceptionThrown = false; 773 try { 774 ch.read(dst); 775 } catch (RuntimeException x) { 776 exceptionThrown = true; 777 } 778 if (!exceptionThrown) 779 throw new RuntimeException("RuntimeException expected after timeout."); 780 } else { 781 Thread.sleep(1000); 782 Throwable exc = readException.get(); 783 if (exc != null) 784 throw new RuntimeException(exc); 785 } 786 787 final AtomicReference<Throwable> writeException = new AtomicReference<Throwable>(); 788 789 // write bytes to fill socket buffer 790 ch.write(genBuffer(), timeout, unit, ch, 791 new CompletionHandler<Integer,AsynchronousSocketChannel>() 792 { 793 public void completed(Integer result, AsynchronousSocketChannel ch) { 794 ch.write(genBuffer(), timeout, unit, ch, this); 795 } 796 public void failed(Throwable exc, AsynchronousSocketChannel ch) { 797 writeException.set(exc); 798 } 799 }); 800 if (timeout > 0) { 801 // wait for exception 802 while (writeException.get() == null) { 803 Thread.sleep(100); 804 } 805 if (!(writeException.get() instanceof InterruptedByTimeoutException)) 806 throw new RuntimeException("InterruptedByTimeoutException expected"); 807 808 // after a timeout then further writing should throw unspecified runtime exception 809 boolean exceptionThrown = false; 810 try { 811 ch.write(genBuffer()); 812 } catch (RuntimeException x) { 813 exceptionThrown = true; 814 } 815 if (!exceptionThrown) 816 throw new RuntimeException("RuntimeException expected after timeout."); 817 } else { 818 Thread.sleep(1000); 819 Throwable exc = writeException.get(); 820 if (exc != null) 821 throw new RuntimeException(exc); 822 } 823 824 // clean-up 825 server.accept().close(); 826 ch.close(); 827 } 828 } 829 830 // returns ByteBuffer with random bytes 831 static ByteBuffer genBuffer() { 832 int size = 1024 + rand.nextInt(16000); 833 byte[] buf = new byte[size]; 834 rand.nextBytes(buf); 835 boolean useDirect = rand.nextBoolean(); 836 if (useDirect) { 837 ByteBuffer bb = ByteBuffer.allocateDirect(buf.length); 838 bb.put(buf); 839 bb.flip(); 840 return bb; 841 } else { 842 return ByteBuffer.wrap(buf); 843 } 844 } 845 846 // return ByteBuffer[] with random bytes 847 static ByteBuffer[] genBuffers(int max) { 848 int len = 1; 849 if (max > 1) 850 len += rand.nextInt(max); 851 ByteBuffer[] bufs = new ByteBuffer[len]; 852 for (int i=0; i<len; i++) 853 bufs[i] = genBuffer(); 854 return bufs; 855 } 856 857 // return random SocketAddress 858 static SocketAddress genSocketAddress() { 859 StringBuilder sb = new StringBuilder("10."); 860 sb.append(rand.nextInt(256)); 861 sb.append('.'); 862 sb.append(rand.nextInt(256)); 863 sb.append('.'); 864 sb.append(rand.nextInt(256)); 865 InetAddress rh; 866 try { 867 rh = InetAddress.getByName(sb.toString()); 868 } catch (UnknownHostException x) { 869 throw new InternalError("Should not happen"); 870 } 871 return new InetSocketAddress(rh, rand.nextInt(65535)+1); 872 } 873 }