1 /* 2 * Copyright (c) 2008, 2016, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 /* @test 25 * @bug 4607272 6842687 6878369 6944810 7023403 26 * @summary Unit test for AsynchronousSocketChannel 27 * @run main Basic -skipSlowConnectTest 28 * @key randomness 29 */ 30 31 import java.nio.ByteBuffer; 32 import java.nio.channels.*; 33 import static java.net.StandardSocketOptions.*; 34 import java.net.*; 35 import java.util.Random; 36 import java.util.concurrent.*; 37 import java.util.concurrent.atomic.*; 38 import java.io.Closeable; 39 import java.io.IOException; 40 import java.util.Set; 41 42 public class Basic { 43 static final Random rand = new Random(); 44 45 static boolean skipSlowConnectTest = false; 46 47 public static void main(String[] args) throws Exception { 48 for (String arg: args) { 49 switch (arg) { 50 case "-skipSlowConnectTest" : 51 skipSlowConnectTest = true; 52 break; 53 default: 54 throw new RuntimeException("Unrecognized argument: " + arg); 55 } 56 } 57 58 testBind(); 59 testSocketOptions(); 60 testConnect(); 61 testCloseWhenPending(); 62 testCancel(); 63 testRead1(); 64 testRead2(); 65 testRead3(); 66 testWrite1(); 67 testWrite2(); 68 // skip timeout tests until 7052549 is fixed 69 if (!System.getProperty("os.name").startsWith("Windows")) 70 testTimeout(); 71 testShutdown(); 72 } 73 74 static class Server implements Closeable { 75 private final ServerSocketChannel ssc; 76 private final InetSocketAddress address; 77 78 Server() throws IOException { 79 ssc = ServerSocketChannel.open().bind(new InetSocketAddress(0)); 80 81 InetAddress lh = InetAddress.getLocalHost(); 82 int port = ((InetSocketAddress)(ssc.getLocalAddress())).getPort(); 83 address = new InetSocketAddress(lh, port); 84 } 85 86 InetSocketAddress address() { 87 return address; 88 } 89 90 SocketChannel accept() throws IOException { 91 return ssc.accept(); 92 } 93 94 public void close() throws IOException { 95 ssc.close(); 96 } 97 98 } 99 100 static void testBind() throws Exception { 101 System.out.println("-- bind --"); 102 103 try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) { 104 if (ch.getLocalAddress() != null) 105 throw new RuntimeException("Local address should be 'null'"); 106 ch.bind(new InetSocketAddress(0)); 107 108 // check local address after binding 109 InetSocketAddress local = (InetSocketAddress)ch.getLocalAddress(); 110 if (local.getPort() == 0) 111 throw new RuntimeException("Unexpected port"); 112 if (!local.getAddress().isAnyLocalAddress()) 113 throw new RuntimeException("Not bound to a wildcard address"); 114 115 // try to re-bind 116 try { 117 ch.bind(new InetSocketAddress(0)); 118 throw new RuntimeException("AlreadyBoundException expected"); 119 } catch (AlreadyBoundException x) { 120 } 121 } 122 123 // check ClosedChannelException 124 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 125 ch.close(); 126 try { 127 ch.bind(new InetSocketAddress(0)); 128 throw new RuntimeException("ClosedChannelException expected"); 129 } catch (ClosedChannelException x) { 130 } 131 } 132 133 static void testSocketOptions() throws Exception { 134 System.out.println("-- socket options --"); 135 136 try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) { 137 ch.setOption(SO_RCVBUF, 128*1024) 138 .setOption(SO_SNDBUF, 128*1024) 139 .setOption(SO_REUSEADDR, true); 140 141 // check SO_SNDBUF/SO_RCVBUF limits 142 int before, after; 143 before = ch.getOption(SO_SNDBUF); 144 after = ch.setOption(SO_SNDBUF, Integer.MAX_VALUE).getOption(SO_SNDBUF); 145 if (after < before) 146 throw new RuntimeException("setOption caused SO_SNDBUF to decrease"); 147 before = ch.getOption(SO_RCVBUF); 148 after = ch.setOption(SO_RCVBUF, Integer.MAX_VALUE).getOption(SO_RCVBUF); 149 if (after < before) 150 throw new RuntimeException("setOption caused SO_RCVBUF to decrease"); 151 152 ch.bind(new InetSocketAddress(0)); 153 154 // default values 155 if (ch.getOption(SO_KEEPALIVE)) 156 throw new RuntimeException("Default of SO_KEEPALIVE should be 'false'"); 157 if (ch.getOption(TCP_NODELAY)) 158 throw new RuntimeException("Default of TCP_NODELAY should be 'false'"); 159 160 // set and check 161 if (!ch.setOption(SO_KEEPALIVE, true).getOption(SO_KEEPALIVE)) 162 throw new RuntimeException("SO_KEEPALIVE did not change"); 163 if (!ch.setOption(TCP_NODELAY, true).getOption(TCP_NODELAY)) 164 throw new RuntimeException("SO_KEEPALIVE did not change"); 165 166 // read others (can't check as actual value is implementation dependent) 167 ch.getOption(SO_RCVBUF); 168 ch.getOption(SO_SNDBUF); 169 170 Set<SocketOption<?>> options = ch.supportedOptions(); 171 boolean quickAck = options.contains(jdk.net.ExtendedSocketOptions.SO_QUICKACK); 172 if (quickAck) { 173 if (ch.getOption(jdk.net.ExtendedSocketOptions.SO_QUICKACK)) { 174 throw new RuntimeException("Default of SO_QUICKACK should be 'false'"); 175 } 176 177 if (!ch.setOption(jdk.net.ExtendedSocketOptions.SO_QUICKACK, true). 178 getOption(jdk.net.ExtendedSocketOptions.SO_QUICKACK)) { 179 throw new RuntimeException("SO_QUICKACK did not change"); 180 } 181 } 182 } 183 } 184 185 static void testConnect() throws Exception { 186 System.out.println("-- connect --"); 187 188 SocketAddress address; 189 190 try (Server server = new Server()) { 191 address = server.address(); 192 193 // connect to server and check local/remote addresses 194 try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) { 195 ch.connect(address).get(); 196 // check local address 197 if (ch.getLocalAddress() == null) 198 throw new RuntimeException("Not bound to local address"); 199 200 // check remote address 201 InetSocketAddress remote = (InetSocketAddress)ch.getRemoteAddress(); 202 if (remote.getPort() != server.address().getPort()) 203 throw new RuntimeException("Connected to unexpected port"); 204 if (!remote.getAddress().equals(server.address().getAddress())) 205 throw new RuntimeException("Connected to unexpected address"); 206 207 // try to connect again 208 try { 209 ch.connect(server.address()).get(); 210 throw new RuntimeException("AlreadyConnectedException expected"); 211 } catch (AlreadyConnectedException x) { 212 } 213 214 // clean-up 215 server.accept().close(); 216 } 217 218 // check that connect fails with ClosedChannelException 219 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 220 ch.close(); 221 try { 222 ch.connect(server.address()).get(); 223 throw new RuntimeException("ExecutionException expected"); 224 } catch (ExecutionException x) { 225 if (!(x.getCause() instanceof ClosedChannelException)) 226 throw new RuntimeException("Cause of ClosedChannelException expected"); 227 } 228 final AtomicReference<Throwable> connectException = new AtomicReference<>(); 229 ch.connect(server.address(), (Void)null, new CompletionHandler<Void,Void>() { 230 public void completed(Void result, Void att) { 231 } 232 public void failed(Throwable exc, Void att) { 233 connectException.set(exc); 234 } 235 }); 236 while (connectException.get() == null) { 237 Thread.sleep(100); 238 } 239 if (!(connectException.get() instanceof ClosedChannelException)) 240 throw new RuntimeException("ClosedChannelException expected"); 241 } 242 243 // test that failure to connect closes the channel 244 try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) { 245 try { 246 ch.connect(address).get(); 247 } catch (ExecutionException x) { 248 // failed to establish connection 249 if (ch.isOpen()) 250 throw new RuntimeException("Channel should be closed"); 251 } 252 } 253 254 // repeat test by connecting to a (probably) non-existent host. This 255 // improves the chance that the connect will not fail immediately. 256 if (!skipSlowConnectTest) { 257 try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) { 258 try { 259 ch.connect(genSocketAddress()).get(); 260 } catch (ExecutionException x) { 261 // failed to establish connection 262 if (ch.isOpen()) 263 throw new RuntimeException("Channel should be closed"); 264 } 265 } 266 } 267 } 268 269 static void testCloseWhenPending() throws Exception { 270 System.out.println("-- asynchronous close when connecting --"); 271 272 AsynchronousSocketChannel ch; 273 274 // asynchronous close while connecting 275 ch = AsynchronousSocketChannel.open(); 276 Future<Void> connectResult = ch.connect(genSocketAddress()); 277 278 // give time to initiate the connect (SYN) 279 Thread.sleep(50); 280 281 // close 282 ch.close(); 283 284 // check that exception is thrown in timely manner 285 try { 286 connectResult.get(5, TimeUnit.SECONDS); 287 } catch (TimeoutException x) { 288 throw new RuntimeException("AsynchronousCloseException not thrown"); 289 } catch (ExecutionException x) { 290 // expected 291 } 292 293 System.out.println("-- asynchronous close when reading --"); 294 295 try (Server server = new Server()) { 296 ch = AsynchronousSocketChannel.open(); 297 ch.connect(server.address()).get(); 298 299 ByteBuffer dst = ByteBuffer.allocateDirect(100); 300 Future<Integer> result = ch.read(dst); 301 302 // attempt a second read - should fail with ReadPendingException 303 ByteBuffer buf = ByteBuffer.allocateDirect(100); 304 try { 305 ch.read(buf); 306 throw new RuntimeException("ReadPendingException expected"); 307 } catch (ReadPendingException x) { 308 } 309 310 // close channel (should cause initial read to complete) 311 ch.close(); 312 server.accept().close(); 313 314 // check that AsynchronousCloseException is thrown 315 try { 316 result.get(); 317 throw new RuntimeException("Should not read"); 318 } catch (ExecutionException x) { 319 if (!(x.getCause() instanceof AsynchronousCloseException)) 320 throw new RuntimeException(x); 321 } 322 323 System.out.println("-- asynchronous close when writing --"); 324 325 ch = AsynchronousSocketChannel.open(); 326 ch.connect(server.address()).get(); 327 328 final AtomicReference<Throwable> writeException = 329 new AtomicReference<Throwable>(); 330 331 // write bytes to fill socket buffer 332 ch.write(genBuffer(), ch, new CompletionHandler<Integer,AsynchronousSocketChannel>() { 333 public void completed(Integer result, AsynchronousSocketChannel ch) { 334 ch.write(genBuffer(), ch, this); 335 } 336 public void failed(Throwable x, AsynchronousSocketChannel ch) { 337 writeException.set(x); 338 } 339 }); 340 341 // give time for socket buffer to fill up. 342 Thread.sleep(5*1000); 343 344 // attempt a concurrent write - should fail with WritePendingException 345 try { 346 ch.write(genBuffer()); 347 throw new RuntimeException("WritePendingException expected"); 348 } catch (WritePendingException x) { 349 } 350 351 // close channel - should cause initial write to complete 352 ch.close(); 353 server.accept().close(); 354 355 // wait for exception 356 while (writeException.get() == null) { 357 Thread.sleep(100); 358 } 359 if (!(writeException.get() instanceof AsynchronousCloseException)) 360 throw new RuntimeException("AsynchronousCloseException expected"); 361 } 362 } 363 364 static void testCancel() throws Exception { 365 System.out.println("-- cancel --"); 366 367 try (Server server = new Server()) { 368 for (int i=0; i<2; i++) { 369 boolean mayInterruptIfRunning = (i == 0) ? false : true; 370 371 // establish loopback connection 372 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 373 ch.connect(server.address()).get(); 374 SocketChannel peer = server.accept(); 375 376 // start read operation 377 ByteBuffer buf = ByteBuffer.allocate(1); 378 Future<Integer> res = ch.read(buf); 379 380 // cancel operation 381 boolean cancelled = res.cancel(mayInterruptIfRunning); 382 383 // check post-conditions 384 if (!res.isDone()) 385 throw new RuntimeException("isDone should return true"); 386 if (res.isCancelled() != cancelled) 387 throw new RuntimeException("isCancelled not consistent"); 388 try { 389 res.get(); 390 throw new RuntimeException("CancellationException expected"); 391 } catch (CancellationException x) { 392 } 393 try { 394 res.get(1, TimeUnit.SECONDS); 395 throw new RuntimeException("CancellationException expected"); 396 } catch (CancellationException x) { 397 } 398 399 // check that the cancel doesn't impact writing to the channel 400 if (!mayInterruptIfRunning) { 401 buf = ByteBuffer.wrap("a".getBytes()); 402 ch.write(buf).get(); 403 } 404 405 ch.close(); 406 peer.close(); 407 } 408 } 409 } 410 411 static void testRead1() throws Exception { 412 System.out.println("-- read (1) --"); 413 414 try (Server server = new Server()) { 415 final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 416 ch.connect(server.address()).get(); 417 418 // read with 0 bytes remaining should complete immediately 419 ByteBuffer buf = ByteBuffer.allocate(1); 420 buf.put((byte)0); 421 int n = ch.read(buf).get(); 422 if (n != 0) 423 throw new RuntimeException("0 expected"); 424 425 // write bytes and close connection 426 ByteBuffer src = genBuffer(); 427 try (SocketChannel sc = server.accept()) { 428 sc.setOption(SO_SNDBUF, src.remaining()); 429 while (src.hasRemaining()) 430 sc.write(src); 431 } 432 433 // reads should complete immediately 434 final ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity() + 100); 435 final CountDownLatch latch = new CountDownLatch(1); 436 ch.read(dst, (Void)null, new CompletionHandler<Integer,Void>() { 437 public void completed(Integer result, Void att) { 438 int n = result; 439 if (n > 0) { 440 ch.read(dst, (Void)null, this); 441 } else { 442 latch.countDown(); 443 } 444 } 445 public void failed(Throwable exc, Void att) { 446 } 447 }); 448 449 latch.await(); 450 451 // check buffers 452 src.flip(); 453 dst.flip(); 454 if (!src.equals(dst)) { 455 throw new RuntimeException("Contents differ"); 456 } 457 458 // close channel 459 ch.close(); 460 461 // check read fails with ClosedChannelException 462 try { 463 ch.read(dst).get(); 464 throw new RuntimeException("ExecutionException expected"); 465 } catch (ExecutionException x) { 466 if (!(x.getCause() instanceof ClosedChannelException)) 467 throw new RuntimeException("Cause of ClosedChannelException expected"); 468 } 469 } 470 } 471 472 static void testRead2() throws Exception { 473 System.out.println("-- read (2) --"); 474 475 try (Server server = new Server()) { 476 final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 477 ch.connect(server.address()).get(); 478 SocketChannel sc = server.accept(); 479 480 ByteBuffer src = genBuffer(); 481 482 // read until the buffer is full 483 final ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity()); 484 final CountDownLatch latch = new CountDownLatch(1); 485 ch.read(dst, (Void)null, new CompletionHandler<Integer,Void>() { 486 public void completed(Integer result, Void att) { 487 if (dst.hasRemaining()) { 488 ch.read(dst, (Void)null, this); 489 } else { 490 latch.countDown(); 491 } 492 } 493 public void failed(Throwable exc, Void att) { 494 } 495 }); 496 497 // trickle the writing 498 do { 499 int rem = src.remaining(); 500 int size = (rem <= 100) ? rem : 50 + rand.nextInt(rem - 100); 501 ByteBuffer buf = ByteBuffer.allocate(size); 502 for (int i=0; i<size; i++) 503 buf.put(src.get()); 504 buf.flip(); 505 Thread.sleep(50 + rand.nextInt(1500)); 506 while (buf.hasRemaining()) 507 sc.write(buf); 508 } while (src.hasRemaining()); 509 510 // wait until ascynrhonous reading has completed 511 latch.await(); 512 513 // check buffers 514 src.flip(); 515 dst.flip(); 516 if (!src.equals(dst)) { 517 throw new RuntimeException("Contents differ"); 518 } 519 520 sc.close(); 521 ch.close(); 522 } 523 } 524 525 // exercise scattering read 526 static void testRead3() throws Exception { 527 System.out.println("-- read (3) --"); 528 529 try (Server server = new Server()) { 530 final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 531 ch.connect(server.address()).get(); 532 SocketChannel sc = server.accept(); 533 534 ByteBuffer[] dsts = new ByteBuffer[3]; 535 for (int i=0; i<dsts.length; i++) { 536 dsts[i] = ByteBuffer.allocateDirect(100); 537 } 538 539 // scattering read that completes ascynhronously 540 final CountDownLatch l1 = new CountDownLatch(1); 541 ch.read(dsts, 0, dsts.length, 0L, TimeUnit.SECONDS, (Void)null, 542 new CompletionHandler<Long,Void>() { 543 public void completed(Long result, Void att) { 544 long n = result; 545 if (n <= 0) 546 throw new RuntimeException("No bytes read"); 547 l1.countDown(); 548 } 549 public void failed(Throwable exc, Void att) { 550 } 551 }); 552 553 // write some bytes 554 sc.write(genBuffer()); 555 556 // read should now complete 557 l1.await(); 558 559 // write more bytes 560 sc.write(genBuffer()); 561 562 // read should complete immediately 563 for (int i=0; i<dsts.length; i++) { 564 dsts[i].rewind(); 565 } 566 567 final CountDownLatch l2 = new CountDownLatch(1); 568 ch.read(dsts, 0, dsts.length, 0L, TimeUnit.SECONDS, (Void)null, 569 new CompletionHandler<Long,Void>() { 570 public void completed(Long result, Void att) { 571 long n = result; 572 if (n <= 0) 573 throw new RuntimeException("No bytes read"); 574 l2.countDown(); 575 } 576 public void failed(Throwable exc, Void att) { 577 } 578 }); 579 l2.await(); 580 581 ch.close(); 582 sc.close(); 583 } 584 } 585 586 static void testWrite1() throws Exception { 587 System.out.println("-- write (1) --"); 588 589 try (Server server = new Server()) { 590 final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 591 ch.connect(server.address()).get(); 592 SocketChannel sc = server.accept(); 593 594 // write with 0 bytes remaining should complete immediately 595 ByteBuffer buf = ByteBuffer.allocate(1); 596 buf.put((byte)0); 597 int n = ch.write(buf).get(); 598 if (n != 0) 599 throw new RuntimeException("0 expected"); 600 601 // write all bytes and close connection when done 602 final ByteBuffer src = genBuffer(); 603 ch.write(src, (Void)null, new CompletionHandler<Integer,Void>() { 604 public void completed(Integer result, Void att) { 605 if (src.hasRemaining()) { 606 ch.write(src, (Void)null, this); 607 } else { 608 try { 609 ch.close(); 610 } catch (IOException ignore) { } 611 } 612 } 613 public void failed(Throwable exc, Void att) { 614 } 615 }); 616 617 // read to EOF or buffer full 618 ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity() + 100); 619 do { 620 n = sc.read(dst); 621 } while (n > 0); 622 sc.close(); 623 624 // check buffers 625 src.flip(); 626 dst.flip(); 627 if (!src.equals(dst)) { 628 throw new RuntimeException("Contents differ"); 629 } 630 631 // check write fails with ClosedChannelException 632 try { 633 ch.read(dst).get(); 634 throw new RuntimeException("ExecutionException expected"); 635 } catch (ExecutionException x) { 636 if (!(x.getCause() instanceof ClosedChannelException)) 637 throw new RuntimeException("Cause of ClosedChannelException expected"); 638 } 639 } 640 } 641 642 // exercise gathering write 643 static void testWrite2() throws Exception { 644 System.out.println("-- write (2) --"); 645 646 try (Server server = new Server()) { 647 final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 648 ch.connect(server.address()).get(); 649 SocketChannel sc = server.accept(); 650 651 // number of bytes written 652 final AtomicLong bytesWritten = new AtomicLong(0); 653 654 // write buffers (should complete immediately) 655 ByteBuffer[] srcs = genBuffers(1); 656 final CountDownLatch l1 = new CountDownLatch(1); 657 ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, (Void)null, 658 new CompletionHandler<Long,Void>() { 659 public void completed(Long result, Void att) { 660 long n = result; 661 if (n <= 0) 662 throw new RuntimeException("No bytes read"); 663 bytesWritten.addAndGet(n); 664 l1.countDown(); 665 } 666 public void failed(Throwable exc, Void att) { 667 } 668 }); 669 l1.await(); 670 671 // set to true to signal that no more buffers should be written 672 final AtomicBoolean continueWriting = new AtomicBoolean(true); 673 674 // write until socket buffer is full so as to create the conditions 675 // for when a write does not complete immediately 676 srcs = genBuffers(1); 677 ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, (Void)null, 678 new CompletionHandler<Long,Void>() { 679 public void completed(Long result, Void att) { 680 long n = result; 681 if (n <= 0) 682 throw new RuntimeException("No bytes written"); 683 bytesWritten.addAndGet(n); 684 if (continueWriting.get()) { 685 ByteBuffer[] srcs = genBuffers(8); 686 ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, 687 (Void)null, this); 688 } 689 } 690 public void failed(Throwable exc, Void att) { 691 } 692 }); 693 694 // give time for socket buffer to fill up. 695 Thread.sleep(5*1000); 696 697 // signal handler to stop further writing 698 continueWriting.set(false); 699 700 // read until done 701 ByteBuffer buf = ByteBuffer.allocateDirect(4096); 702 long total = 0L; 703 do { 704 int n = sc.read(buf); 705 if (n <= 0) 706 throw new RuntimeException("No bytes read"); 707 buf.rewind(); 708 total += n; 709 } while (total < bytesWritten.get()); 710 711 ch.close(); 712 sc.close(); 713 } 714 } 715 716 static void testShutdown() throws Exception { 717 System.out.println("-- shutdown--"); 718 719 try (Server server = new Server(); 720 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) 721 { 722 ch.connect(server.address()).get(); 723 try (SocketChannel peer = server.accept()) { 724 ByteBuffer buf = ByteBuffer.allocateDirect(1000); 725 int n; 726 727 // check read 728 ch.shutdownInput(); 729 n = ch.read(buf).get(); 730 if (n != -1) 731 throw new RuntimeException("-1 expected"); 732 // check full with full buffer 733 buf.put(new byte[100]); 734 n = ch.read(buf).get(); 735 if (n != -1) 736 throw new RuntimeException("-1 expected"); 737 738 // check write 739 ch.shutdownOutput(); 740 try { 741 ch.write(buf).get(); 742 throw new RuntimeException("ClosedChannelException expected"); 743 } catch (ExecutionException x) { 744 if (!(x.getCause() instanceof ClosedChannelException)) 745 throw new RuntimeException("ClosedChannelException expected"); 746 } 747 } 748 } 749 } 750 751 static void testTimeout() throws Exception { 752 System.out.println("-- timeouts --"); 753 testTimeout(Integer.MIN_VALUE, TimeUnit.SECONDS); 754 testTimeout(-1L, TimeUnit.SECONDS); 755 testTimeout(0L, TimeUnit.SECONDS); 756 testTimeout(2L, TimeUnit.SECONDS); 757 } 758 759 static void testTimeout(final long timeout, final TimeUnit unit) throws Exception { 760 try (Server server = new Server()) { 761 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 762 ch.connect(server.address()).get(); 763 764 ByteBuffer dst = ByteBuffer.allocate(512); 765 766 final AtomicReference<Throwable> readException = new AtomicReference<Throwable>(); 767 768 // this read should timeout if value is > 0 769 ch.read(dst, timeout, unit, null, new CompletionHandler<Integer,Void>() { 770 public void completed(Integer result, Void att) { 771 readException.set(new RuntimeException("Should not complete")); 772 } 773 public void failed(Throwable exc, Void att) { 774 readException.set(exc); 775 } 776 }); 777 if (timeout > 0L) { 778 // wait for exception 779 while (readException.get() == null) { 780 Thread.sleep(100); 781 } 782 if (!(readException.get() instanceof InterruptedByTimeoutException)) 783 throw new RuntimeException("InterruptedByTimeoutException expected"); 784 785 // after a timeout then further reading should throw unspecified runtime exception 786 boolean exceptionThrown = false; 787 try { 788 ch.read(dst); 789 } catch (RuntimeException x) { 790 exceptionThrown = true; 791 } 792 if (!exceptionThrown) 793 throw new RuntimeException("RuntimeException expected after timeout."); 794 } else { 795 Thread.sleep(1000); 796 Throwable exc = readException.get(); 797 if (exc != null) 798 throw new RuntimeException(exc); 799 } 800 801 final AtomicReference<Throwable> writeException = new AtomicReference<Throwable>(); 802 803 // write bytes to fill socket buffer 804 ch.write(genBuffer(), timeout, unit, ch, 805 new CompletionHandler<Integer,AsynchronousSocketChannel>() 806 { 807 public void completed(Integer result, AsynchronousSocketChannel ch) { 808 ch.write(genBuffer(), timeout, unit, ch, this); 809 } 810 public void failed(Throwable exc, AsynchronousSocketChannel ch) { 811 writeException.set(exc); 812 } 813 }); 814 if (timeout > 0) { 815 // wait for exception 816 while (writeException.get() == null) { 817 Thread.sleep(100); 818 } 819 if (!(writeException.get() instanceof InterruptedByTimeoutException)) 820 throw new RuntimeException("InterruptedByTimeoutException expected"); 821 822 // after a timeout then further writing should throw unspecified runtime exception 823 boolean exceptionThrown = false; 824 try { 825 ch.write(genBuffer()); 826 } catch (RuntimeException x) { 827 exceptionThrown = true; 828 } 829 if (!exceptionThrown) 830 throw new RuntimeException("RuntimeException expected after timeout."); 831 } else { 832 Thread.sleep(1000); 833 Throwable exc = writeException.get(); 834 if (exc != null) 835 throw new RuntimeException(exc); 836 } 837 838 // clean-up 839 server.accept().close(); 840 ch.close(); 841 } 842 } 843 844 // returns ByteBuffer with random bytes 845 static ByteBuffer genBuffer() { 846 int size = 1024 + rand.nextInt(16000); 847 byte[] buf = new byte[size]; 848 rand.nextBytes(buf); 849 boolean useDirect = rand.nextBoolean(); 850 if (useDirect) { 851 ByteBuffer bb = ByteBuffer.allocateDirect(buf.length); 852 bb.put(buf); 853 bb.flip(); 854 return bb; 855 } else { 856 return ByteBuffer.wrap(buf); 857 } 858 } 859 860 // return ByteBuffer[] with random bytes 861 static ByteBuffer[] genBuffers(int max) { 862 int len = 1; 863 if (max > 1) 864 len += rand.nextInt(max); 865 ByteBuffer[] bufs = new ByteBuffer[len]; 866 for (int i=0; i<len; i++) 867 bufs[i] = genBuffer(); 868 return bufs; 869 } 870 871 // return random SocketAddress 872 static SocketAddress genSocketAddress() { 873 StringBuilder sb = new StringBuilder("10."); 874 sb.append(rand.nextInt(256)); 875 sb.append('.'); 876 sb.append(rand.nextInt(256)); 877 sb.append('.'); 878 sb.append(rand.nextInt(256)); 879 InetAddress rh; 880 try { 881 rh = InetAddress.getByName(sb.toString()); 882 } catch (UnknownHostException x) { 883 throw new InternalError("Should not happen"); 884 } 885 return new InetSocketAddress(rh, rand.nextInt(65535)+1); 886 } 887 }