1 /* 2 * Copyright (c) 2008, 2016, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 /* @test 25 * @bug 4607272 6842687 6878369 6944810 7023403 26 * @summary Unit test for AsynchronousSocketChannel 27 * @run main Basic -skipSlowConnectTest 28 * @key randomness intermittent 29 */ 30 31 import java.nio.ByteBuffer; 32 import java.nio.channels.*; 33 import static java.net.StandardSocketOptions.*; 34 import java.net.*; 35 import java.util.Random; 36 import java.util.concurrent.*; 37 import java.util.concurrent.atomic.*; 38 import java.io.Closeable; 39 import java.io.IOException; 40 import java.util.Set; 41 42 public class Basic { 43 static final Random rand = new Random(); 44 45 static boolean skipSlowConnectTest = false; 46 47 public static void main(String[] args) throws Exception { 48 for (String arg: args) { 49 switch (arg) { 50 case "-skipSlowConnectTest" : 51 skipSlowConnectTest = true; 52 break; 53 default: 54 throw new RuntimeException("Unrecognized argument: " + arg); 55 } 56 } 57 58 testBind(); 59 testSocketOptions(); 60 testConnect(); 61 testCloseWhenPending(); 62 testCancel(); 63 testRead1(); 64 testRead2(); 65 testRead3(); 66 testWrite1(); 67 testWrite2(); 68 // skip timeout tests until 7052549 is fixed 69 if (!System.getProperty("os.name").startsWith("Windows")) 70 testTimeout(); 71 testShutdown(); 72 } 73 74 static class Server implements Closeable { 75 private final ServerSocketChannel ssc; 76 private final InetSocketAddress address; 77 78 Server() throws IOException { 79 ssc = ServerSocketChannel.open().bind(new InetSocketAddress(0)); 80 81 InetAddress lh = InetAddress.getLocalHost(); 82 int port = ((InetSocketAddress)(ssc.getLocalAddress())).getPort(); 83 address = new InetSocketAddress(lh, port); 84 } 85 86 InetSocketAddress address() { 87 return address; 88 } 89 90 SocketChannel accept() throws IOException { 91 return ssc.accept(); 92 } 93 94 public void close() throws IOException { 95 ssc.close(); 96 } 97 98 } 99 100 static void testBind() throws Exception { 101 System.out.println("-- bind --"); 102 103 try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) { 104 if (ch.getLocalAddress() != null) 105 throw new RuntimeException("Local address should be 'null'"); 106 ch.bind(new InetSocketAddress(0)); 107 108 // check local address after binding 109 InetSocketAddress local = (InetSocketAddress)ch.getLocalAddress(); 110 if (local.getPort() == 0) 111 throw new RuntimeException("Unexpected port"); 112 if (!local.getAddress().isAnyLocalAddress()) 113 throw new RuntimeException("Not bound to a wildcard address"); 114 115 // try to re-bind 116 try { 117 ch.bind(new InetSocketAddress(0)); 118 throw new RuntimeException("AlreadyBoundException expected"); 119 } catch (AlreadyBoundException x) { 120 } 121 } 122 123 // check ClosedChannelException 124 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 125 ch.close(); 126 try { 127 ch.bind(new InetSocketAddress(0)); 128 throw new RuntimeException("ClosedChannelException expected"); 129 } catch (ClosedChannelException x) { 130 } 131 } 132 133 static void testSocketOptions() throws Exception { 134 System.out.println("-- socket options --"); 135 136 try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) { 137 ch.setOption(SO_RCVBUF, 128*1024) 138 .setOption(SO_SNDBUF, 128*1024) 139 .setOption(SO_REUSEADDR, true); 140 141 // check SO_SNDBUF/SO_RCVBUF limits 142 int before, after; 143 before = ch.getOption(SO_SNDBUF); 144 after = ch.setOption(SO_SNDBUF, Integer.MAX_VALUE).getOption(SO_SNDBUF); 145 if (after < before) 146 throw new RuntimeException("setOption caused SO_SNDBUF to decrease"); 147 before = ch.getOption(SO_RCVBUF); 148 after = ch.setOption(SO_RCVBUF, Integer.MAX_VALUE).getOption(SO_RCVBUF); 149 if (after < before) 150 throw new RuntimeException("setOption caused SO_RCVBUF to decrease"); 151 152 ch.bind(new InetSocketAddress(0)); 153 154 // default values 155 if (ch.getOption(SO_KEEPALIVE)) 156 throw new RuntimeException("Default of SO_KEEPALIVE should be 'false'"); 157 if (ch.getOption(TCP_NODELAY)) 158 throw new RuntimeException("Default of TCP_NODELAY should be 'false'"); 159 160 // set and check 161 if (!ch.setOption(SO_KEEPALIVE, true).getOption(SO_KEEPALIVE)) 162 throw new RuntimeException("SO_KEEPALIVE did not change"); 163 if (!ch.setOption(TCP_NODELAY, true).getOption(TCP_NODELAY)) 164 throw new RuntimeException("SO_KEEPALIVE did not change"); 165 166 // read others (can't check as actual value is implementation dependent) 167 ch.getOption(SO_RCVBUF); 168 ch.getOption(SO_SNDBUF); 169 170 Set<SocketOption<?>> options = ch.supportedOptions(); 171 boolean reuseport = options.contains(SO_REUSEPORT); 172 if (reuseport) { 173 if (ch.getOption(SO_REUSEPORT)) 174 throw new RuntimeException("Default of SO_REUSEPORT should be 'false'"); 175 if (!ch.setOption(SO_REUSEPORT, true).getOption(SO_REUSEPORT)) 176 throw new RuntimeException("SO_REUSEPORT did not change"); 177 } 178 } 179 } 180 181 static void testConnect() throws Exception { 182 System.out.println("-- connect --"); 183 184 SocketAddress address; 185 186 try (Server server = new Server()) { 187 address = server.address(); 188 189 // connect to server and check local/remote addresses 190 try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) { 191 ch.connect(address).get(); 192 // check local address 193 if (ch.getLocalAddress() == null) 194 throw new RuntimeException("Not bound to local address"); 195 196 // check remote address 197 InetSocketAddress remote = (InetSocketAddress)ch.getRemoteAddress(); 198 if (remote.getPort() != server.address().getPort()) 199 throw new RuntimeException("Connected to unexpected port"); 200 if (!remote.getAddress().equals(server.address().getAddress())) 201 throw new RuntimeException("Connected to unexpected address"); 202 203 // try to connect again 204 try { 205 ch.connect(server.address()).get(); 206 throw new RuntimeException("AlreadyConnectedException expected"); 207 } catch (AlreadyConnectedException x) { 208 } 209 210 // clean-up 211 server.accept().close(); 212 } 213 214 // check that connect fails with ClosedChannelException 215 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 216 ch.close(); 217 try { 218 ch.connect(server.address()).get(); 219 throw new RuntimeException("ExecutionException expected"); 220 } catch (ExecutionException x) { 221 if (!(x.getCause() instanceof ClosedChannelException)) 222 throw new RuntimeException("Cause of ClosedChannelException expected", 223 x.getCause()); 224 } 225 final AtomicReference<Throwable> connectException = new AtomicReference<>(); 226 ch.connect(server.address(), (Void)null, new CompletionHandler<Void,Void>() { 227 public void completed(Void result, Void att) { 228 } 229 public void failed(Throwable exc, Void att) { 230 connectException.set(exc); 231 } 232 }); 233 while (connectException.get() == null) { 234 Thread.sleep(100); 235 } 236 if (!(connectException.get() instanceof ClosedChannelException)) 237 throw new RuntimeException("ClosedChannelException expected", 238 connectException.get()); 239 } 240 241 // test that failure to connect closes the channel 242 try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) { 243 try { 244 ch.connect(address).get(); 245 } catch (ExecutionException x) { 246 // failed to establish connection 247 if (ch.isOpen()) 248 throw new RuntimeException("Channel should be closed"); 249 } 250 } 251 252 // repeat test by connecting to a (probably) non-existent host. This 253 // improves the chance that the connect will not fail immediately. 254 if (!skipSlowConnectTest) { 255 try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) { 256 try { 257 ch.connect(genSocketAddress()).get(); 258 } catch (ExecutionException x) { 259 // failed to establish connection 260 if (ch.isOpen()) 261 throw new RuntimeException("Channel should be closed"); 262 } 263 } 264 } 265 } 266 267 static void testCloseWhenPending() throws Exception { 268 System.out.println("-- asynchronous close when connecting --"); 269 270 AsynchronousSocketChannel ch; 271 272 // asynchronous close while connecting 273 ch = AsynchronousSocketChannel.open(); 274 Future<Void> connectResult = ch.connect(genSocketAddress()); 275 276 // give time to initiate the connect (SYN) 277 Thread.sleep(50); 278 279 // close 280 ch.close(); 281 282 // check that exception is thrown in timely manner 283 try { 284 connectResult.get(5, TimeUnit.SECONDS); 285 } catch (TimeoutException x) { 286 throw new RuntimeException("AsynchronousCloseException not thrown"); 287 } catch (ExecutionException x) { 288 // expected 289 } 290 291 System.out.println("-- asynchronous close when reading --"); 292 293 try (Server server = new Server()) { 294 ch = AsynchronousSocketChannel.open(); 295 ch.connect(server.address()).get(); 296 297 ByteBuffer dst = ByteBuffer.allocateDirect(100); 298 Future<Integer> result = ch.read(dst); 299 300 // attempt a second read - should fail with ReadPendingException 301 ByteBuffer buf = ByteBuffer.allocateDirect(100); 302 try { 303 ch.read(buf); 304 throw new RuntimeException("ReadPendingException expected"); 305 } catch (ReadPendingException x) { 306 } 307 308 // close channel (should cause initial read to complete) 309 ch.close(); 310 server.accept().close(); 311 312 // check that AsynchronousCloseException is thrown 313 try { 314 result.get(); 315 throw new RuntimeException("Should not read"); 316 } catch (ExecutionException x) { 317 if (!(x.getCause() instanceof AsynchronousCloseException)) 318 throw new RuntimeException(x); 319 } 320 321 System.out.println("-- asynchronous close when writing --"); 322 323 ch = AsynchronousSocketChannel.open(); 324 ch.connect(server.address()).get(); 325 326 final AtomicReference<Throwable> writeException = 327 new AtomicReference<Throwable>(); 328 329 // write bytes to fill socket buffer 330 ch.write(genBuffer(), ch, new CompletionHandler<Integer,AsynchronousSocketChannel>() { 331 public void completed(Integer result, AsynchronousSocketChannel ch) { 332 ch.write(genBuffer(), ch, this); 333 } 334 public void failed(Throwable x, AsynchronousSocketChannel ch) { 335 writeException.set(x); 336 } 337 }); 338 339 // give time for socket buffer to fill up. 340 Thread.sleep(5*1000); 341 342 // attempt a concurrent write - should fail with WritePendingException 343 try { 344 ch.write(genBuffer()); 345 throw new RuntimeException("WritePendingException expected"); 346 } catch (WritePendingException x) { 347 } 348 349 // close channel - should cause initial write to complete 350 ch.close(); 351 server.accept().close(); 352 353 // wait for exception 354 while (writeException.get() == null) { 355 Thread.sleep(100); 356 } 357 if (!(writeException.get() instanceof AsynchronousCloseException)) 358 throw new RuntimeException("AsynchronousCloseException expected", 359 writeException.get()); 360 } 361 } 362 363 static void testCancel() throws Exception { 364 System.out.println("-- cancel --"); 365 366 try (Server server = new Server()) { 367 for (int i=0; i<2; i++) { 368 boolean mayInterruptIfRunning = (i == 0) ? false : true; 369 370 // establish loopback connection 371 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 372 ch.connect(server.address()).get(); 373 SocketChannel peer = server.accept(); 374 375 // start read operation 376 ByteBuffer buf = ByteBuffer.allocate(1); 377 Future<Integer> res = ch.read(buf); 378 379 // cancel operation 380 boolean cancelled = res.cancel(mayInterruptIfRunning); 381 382 // check post-conditions 383 if (!res.isDone()) 384 throw new RuntimeException("isDone should return true"); 385 if (res.isCancelled() != cancelled) 386 throw new RuntimeException("isCancelled not consistent"); 387 try { 388 res.get(); 389 throw new RuntimeException("CancellationException expected"); 390 } catch (CancellationException x) { 391 } 392 try { 393 res.get(1, TimeUnit.SECONDS); 394 throw new RuntimeException("CancellationException expected"); 395 } catch (CancellationException x) { 396 } 397 398 // check that the cancel doesn't impact writing to the channel 399 if (!mayInterruptIfRunning) { 400 buf = ByteBuffer.wrap("a".getBytes()); 401 ch.write(buf).get(); 402 } 403 404 ch.close(); 405 peer.close(); 406 } 407 } 408 } 409 410 static void testRead1() throws Exception { 411 System.out.println("-- read (1) --"); 412 413 try (Server server = new Server()) { 414 final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 415 ch.connect(server.address()).get(); 416 417 // read with 0 bytes remaining should complete immediately 418 ByteBuffer buf = ByteBuffer.allocate(1); 419 buf.put((byte)0); 420 int n = ch.read(buf).get(); 421 if (n != 0) 422 throw new RuntimeException("0 expected"); 423 424 // write bytes and close connection 425 ByteBuffer src = genBuffer(); 426 try (SocketChannel sc = server.accept()) { 427 sc.setOption(SO_SNDBUF, src.remaining()); 428 while (src.hasRemaining()) 429 sc.write(src); 430 } 431 432 // reads should complete immediately 433 final ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity() + 100); 434 final CountDownLatch latch = new CountDownLatch(1); 435 ch.read(dst, (Void)null, new CompletionHandler<Integer,Void>() { 436 public void completed(Integer result, Void att) { 437 int n = result; 438 if (n > 0) { 439 ch.read(dst, (Void)null, this); 440 } else { 441 latch.countDown(); 442 } 443 } 444 public void failed(Throwable exc, Void att) { 445 } 446 }); 447 448 latch.await(); 449 450 // check buffers 451 src.flip(); 452 dst.flip(); 453 if (!src.equals(dst)) { 454 throw new RuntimeException("Contents differ"); 455 } 456 457 // close channel 458 ch.close(); 459 460 // check read fails with ClosedChannelException 461 try { 462 ch.read(dst).get(); 463 throw new RuntimeException("ExecutionException expected"); 464 } catch (ExecutionException x) { 465 if (!(x.getCause() instanceof ClosedChannelException)) 466 throw new RuntimeException("Cause of ClosedChannelException expected", 467 x.getCause()); 468 } 469 } 470 } 471 472 static void testRead2() throws Exception { 473 System.out.println("-- read (2) --"); 474 475 try (Server server = new Server()) { 476 final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 477 ch.connect(server.address()).get(); 478 SocketChannel sc = server.accept(); 479 480 ByteBuffer src = genBuffer(); 481 482 // read until the buffer is full 483 final ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity()); 484 final CountDownLatch latch = new CountDownLatch(1); 485 ch.read(dst, (Void)null, new CompletionHandler<Integer,Void>() { 486 public void completed(Integer result, Void att) { 487 if (dst.hasRemaining()) { 488 ch.read(dst, (Void)null, this); 489 } else { 490 latch.countDown(); 491 } 492 } 493 public void failed(Throwable exc, Void att) { 494 } 495 }); 496 497 // trickle the writing 498 do { 499 int rem = src.remaining(); 500 int size = (rem <= 100) ? rem : 50 + rand.nextInt(rem - 100); 501 ByteBuffer buf = ByteBuffer.allocate(size); 502 for (int i=0; i<size; i++) 503 buf.put(src.get()); 504 buf.flip(); 505 Thread.sleep(50 + rand.nextInt(1500)); 506 while (buf.hasRemaining()) 507 sc.write(buf); 508 } while (src.hasRemaining()); 509 510 // wait until ascynrhonous reading has completed 511 latch.await(); 512 513 // check buffers 514 src.flip(); 515 dst.flip(); 516 if (!src.equals(dst)) { 517 throw new RuntimeException("Contents differ"); 518 } 519 520 sc.close(); 521 ch.close(); 522 } 523 } 524 525 // exercise scattering read 526 static void testRead3() throws Exception { 527 System.out.println("-- read (3) --"); 528 529 try (Server server = new Server()) { 530 final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 531 ch.connect(server.address()).get(); 532 SocketChannel sc = server.accept(); 533 534 ByteBuffer[] dsts = new ByteBuffer[3]; 535 for (int i=0; i<dsts.length; i++) { 536 dsts[i] = ByteBuffer.allocateDirect(100); 537 } 538 539 // scattering read that completes ascynhronously 540 final CountDownLatch l1 = new CountDownLatch(1); 541 ch.read(dsts, 0, dsts.length, 0L, TimeUnit.SECONDS, (Void)null, 542 new CompletionHandler<Long,Void>() { 543 public void completed(Long result, Void att) { 544 long n = result; 545 if (n <= 0) 546 throw new RuntimeException("No bytes read"); 547 l1.countDown(); 548 } 549 public void failed(Throwable exc, Void att) { 550 } 551 }); 552 553 // write some bytes 554 sc.write(genBuffer()); 555 556 // read should now complete 557 l1.await(); 558 559 // write more bytes 560 sc.write(genBuffer()); 561 562 // read should complete immediately 563 for (int i=0; i<dsts.length; i++) { 564 dsts[i].rewind(); 565 } 566 567 final CountDownLatch l2 = new CountDownLatch(1); 568 ch.read(dsts, 0, dsts.length, 0L, TimeUnit.SECONDS, (Void)null, 569 new CompletionHandler<Long,Void>() { 570 public void completed(Long result, Void att) { 571 long n = result; 572 if (n <= 0) 573 throw new RuntimeException("No bytes read"); 574 l2.countDown(); 575 } 576 public void failed(Throwable exc, Void att) { 577 } 578 }); 579 l2.await(); 580 581 ch.close(); 582 sc.close(); 583 } 584 } 585 586 static void testWrite1() throws Exception { 587 System.out.println("-- write (1) --"); 588 589 try (Server server = new Server()) { 590 final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 591 ch.connect(server.address()).get(); 592 SocketChannel sc = server.accept(); 593 594 // write with 0 bytes remaining should complete immediately 595 ByteBuffer buf = ByteBuffer.allocate(1); 596 buf.put((byte)0); 597 int n = ch.write(buf).get(); 598 if (n != 0) 599 throw new RuntimeException("0 expected"); 600 601 // write all bytes and close connection when done 602 final ByteBuffer src = genBuffer(); 603 ch.write(src, (Void)null, new CompletionHandler<Integer,Void>() { 604 public void completed(Integer result, Void att) { 605 if (src.hasRemaining()) { 606 ch.write(src, (Void)null, this); 607 } else { 608 try { 609 ch.close(); 610 } catch (IOException ignore) { } 611 } 612 } 613 public void failed(Throwable exc, Void att) { 614 } 615 }); 616 617 // read to EOF or buffer full 618 ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity() + 100); 619 do { 620 n = sc.read(dst); 621 } while (n > 0); 622 sc.close(); 623 624 // check buffers 625 src.flip(); 626 dst.flip(); 627 if (!src.equals(dst)) { 628 throw new RuntimeException("Contents differ"); 629 } 630 631 // check write fails with ClosedChannelException 632 try { 633 ch.read(dst).get(); 634 throw new RuntimeException("ExecutionException expected"); 635 } catch (ExecutionException x) { 636 if (!(x.getCause() instanceof ClosedChannelException)) 637 throw new RuntimeException("Cause of ClosedChannelException expected", 638 x.getCause()); 639 } 640 } 641 } 642 643 // exercise gathering write 644 static void testWrite2() throws Exception { 645 System.out.println("-- write (2) --"); 646 647 try (Server server = new Server()) { 648 final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 649 ch.connect(server.address()).get(); 650 SocketChannel sc = server.accept(); 651 652 // number of bytes written 653 final AtomicLong bytesWritten = new AtomicLong(0); 654 655 // write buffers (should complete immediately) 656 ByteBuffer[] srcs = genBuffers(1); 657 final CountDownLatch l1 = new CountDownLatch(1); 658 ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, (Void)null, 659 new CompletionHandler<Long,Void>() { 660 public void completed(Long result, Void att) { 661 long n = result; 662 if (n <= 0) 663 throw new RuntimeException("No bytes read"); 664 bytesWritten.addAndGet(n); 665 l1.countDown(); 666 } 667 public void failed(Throwable exc, Void att) { 668 } 669 }); 670 l1.await(); 671 672 // set to true to signal that no more buffers should be written 673 final AtomicBoolean continueWriting = new AtomicBoolean(true); 674 675 // write until socket buffer is full so as to create the conditions 676 // for when a write does not complete immediately 677 srcs = genBuffers(1); 678 ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, (Void)null, 679 new CompletionHandler<Long,Void>() { 680 public void completed(Long result, Void att) { 681 long n = result; 682 if (n <= 0) 683 throw new RuntimeException("No bytes written"); 684 bytesWritten.addAndGet(n); 685 if (continueWriting.get()) { 686 ByteBuffer[] srcs = genBuffers(8); 687 ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, 688 (Void)null, this); 689 } 690 } 691 public void failed(Throwable exc, Void att) { 692 } 693 }); 694 695 // give time for socket buffer to fill up. 696 Thread.sleep(5*1000); 697 698 // signal handler to stop further writing 699 continueWriting.set(false); 700 701 // read until done 702 ByteBuffer buf = ByteBuffer.allocateDirect(4096); 703 long total = 0L; 704 do { 705 int n = sc.read(buf); 706 if (n <= 0) 707 throw new RuntimeException("No bytes read"); 708 buf.rewind(); 709 total += n; 710 } while (total < bytesWritten.get()); 711 712 ch.close(); 713 sc.close(); 714 } 715 } 716 717 static void testShutdown() throws Exception { 718 System.out.println("-- shutdown--"); 719 720 try (Server server = new Server(); 721 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) 722 { 723 ch.connect(server.address()).get(); 724 try (SocketChannel peer = server.accept()) { 725 ByteBuffer buf = ByteBuffer.allocateDirect(1000); 726 int n; 727 728 // check read 729 ch.shutdownInput(); 730 n = ch.read(buf).get(); 731 if (n != -1) 732 throw new RuntimeException("-1 expected"); 733 // check full with full buffer 734 buf.put(new byte[100]); 735 n = ch.read(buf).get(); 736 if (n != -1) 737 throw new RuntimeException("-1 expected"); 738 739 // check write 740 ch.shutdownOutput(); 741 try { 742 ch.write(buf).get(); 743 throw new RuntimeException("ClosedChannelException expected"); 744 } catch (ExecutionException x) { 745 if (!(x.getCause() instanceof ClosedChannelException)) 746 throw new RuntimeException("ClosedChannelException expected", 747 x.getCause()); 748 } 749 } 750 } 751 } 752 753 static void testTimeout() throws Exception { 754 System.out.println("-- timeouts --"); 755 testTimeout(Integer.MIN_VALUE, TimeUnit.SECONDS); 756 testTimeout(-1L, TimeUnit.SECONDS); 757 testTimeout(0L, TimeUnit.SECONDS); 758 testTimeout(2L, TimeUnit.SECONDS); 759 } 760 761 static void testTimeout(final long timeout, final TimeUnit unit) throws Exception { 762 try (Server server = new Server()) { 763 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 764 ch.connect(server.address()).get(); 765 766 ByteBuffer dst = ByteBuffer.allocate(512); 767 768 final AtomicReference<Throwable> readException = new AtomicReference<Throwable>(); 769 770 // this read should timeout if value is > 0 771 ch.read(dst, timeout, unit, null, new CompletionHandler<Integer,Void>() { 772 public void completed(Integer result, Void att) { 773 readException.set(new RuntimeException("Should not complete")); 774 } 775 public void failed(Throwable exc, Void att) { 776 readException.set(exc); 777 } 778 }); 779 if (timeout > 0L) { 780 // wait for exception 781 while (readException.get() == null) { 782 Thread.sleep(100); 783 } 784 if (!(readException.get() instanceof InterruptedByTimeoutException)) 785 throw new RuntimeException("InterruptedByTimeoutException expected", 786 readException.get()); 787 788 // after a timeout then further reading should throw unspecified runtime exception 789 boolean exceptionThrown = false; 790 try { 791 ch.read(dst); 792 } catch (RuntimeException x) { 793 exceptionThrown = true; 794 } 795 if (!exceptionThrown) 796 throw new RuntimeException("RuntimeException expected after timeout."); 797 } else { 798 Thread.sleep(1000); 799 Throwable exc = readException.get(); 800 if (exc != null) 801 throw new RuntimeException(exc); 802 } 803 804 final AtomicReference<Throwable> writeException = new AtomicReference<Throwable>(); 805 806 // write bytes to fill socket buffer 807 ch.write(genBuffer(), timeout, unit, ch, 808 new CompletionHandler<Integer,AsynchronousSocketChannel>() 809 { 810 public void completed(Integer result, AsynchronousSocketChannel ch) { 811 ch.write(genBuffer(), timeout, unit, ch, this); 812 } 813 public void failed(Throwable exc, AsynchronousSocketChannel ch) { 814 writeException.set(exc); 815 } 816 }); 817 if (timeout > 0) { 818 // wait for exception 819 while (writeException.get() == null) { 820 Thread.sleep(100); 821 } 822 if (!(writeException.get() instanceof InterruptedByTimeoutException)) 823 throw new RuntimeException("InterruptedByTimeoutException expected", 824 writeException.get()); 825 826 // after a timeout then further writing should throw unspecified runtime exception 827 boolean exceptionThrown = false; 828 try { 829 ch.write(genBuffer()); 830 } catch (RuntimeException x) { 831 exceptionThrown = true; 832 } 833 if (!exceptionThrown) 834 throw new RuntimeException("RuntimeException expected after timeout."); 835 } else { 836 Thread.sleep(1000); 837 Throwable exc = writeException.get(); 838 if (exc != null) 839 throw new RuntimeException(exc); 840 } 841 842 // clean-up 843 server.accept().close(); 844 ch.close(); 845 } 846 } 847 848 // returns ByteBuffer with random bytes 849 static ByteBuffer genBuffer() { 850 int size = 1024 + rand.nextInt(16000); 851 byte[] buf = new byte[size]; 852 rand.nextBytes(buf); 853 boolean useDirect = rand.nextBoolean(); 854 if (useDirect) { 855 ByteBuffer bb = ByteBuffer.allocateDirect(buf.length); 856 bb.put(buf); 857 bb.flip(); 858 return bb; 859 } else { 860 return ByteBuffer.wrap(buf); 861 } 862 } 863 864 // return ByteBuffer[] with random bytes 865 static ByteBuffer[] genBuffers(int max) { 866 int len = 1; 867 if (max > 1) 868 len += rand.nextInt(max); 869 ByteBuffer[] bufs = new ByteBuffer[len]; 870 for (int i=0; i<len; i++) 871 bufs[i] = genBuffer(); 872 return bufs; 873 } 874 875 // return random SocketAddress 876 static SocketAddress genSocketAddress() { 877 StringBuilder sb = new StringBuilder("10."); 878 sb.append(rand.nextInt(256)); 879 sb.append('.'); 880 sb.append(rand.nextInt(256)); 881 sb.append('.'); 882 sb.append(rand.nextInt(256)); 883 InetAddress rh; 884 try { 885 rh = InetAddress.getByName(sb.toString()); 886 } catch (UnknownHostException x) { 887 throw new InternalError("Should not happen"); 888 } 889 return new InetSocketAddress(rh, rand.nextInt(65535)+1); 890 } 891 }