1 /* 2 * Copyright (c) 2008, 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 /* @test 25 * @bug 4607272 6842687 6878369 6944810 7023403 26 * @summary Unit test for AsynchronousSocketChannel(use -Dseed=X to set PRNG seed) 27 * @library /test/lib 28 * @build jdk.test.lib.RandomFactory 29 * @run main Basic -skipSlowConnectTest 30 * @key randomness intermittent 31 */ 32 33 import java.io.Closeable; 34 import java.io.IOException; 35 import java.net.*; 36 import static java.net.StandardSocketOptions.*; 37 import java.nio.ByteBuffer; 38 import java.nio.channels.*; 39 import java.util.Random; 40 import java.util.Set; 41 import java.util.concurrent.*; 42 import java.util.concurrent.atomic.*; 43 import jdk.test.lib.RandomFactory; 44 45 public class Basic { 46 private static final Random RAND = RandomFactory.getRandom(); 47 48 static boolean skipSlowConnectTest = false; 49 50 public static void main(String[] args) throws Exception { 51 for (String arg: args) { 52 switch (arg) { 53 case "-skipSlowConnectTest" : 54 skipSlowConnectTest = true; 55 break; 56 default: 57 throw new RuntimeException("Unrecognized argument: " + arg); 58 } 59 } 60 61 testBind(); 62 testSocketOptions(); 63 testConnect(); 64 testCloseWhenPending(); 65 testCancel(); 66 testRead1(); 67 testRead2(); 68 testRead3(); 69 testWrite1(); 70 testWrite2(); 71 // skip timeout tests until 7052549 is fixed 72 if (!System.getProperty("os.name").startsWith("Windows")) 73 testTimeout(); 74 testShutdown(); 75 } 76 77 static class Server implements Closeable { 78 private final ServerSocketChannel ssc; 79 private final InetSocketAddress address; 80 81 Server() throws IOException { 82 ssc = ServerSocketChannel.open().bind(new InetSocketAddress(0)); 83 84 InetAddress lh = InetAddress.getLocalHost(); 85 int port = ((InetSocketAddress)(ssc.getLocalAddress())).getPort(); 86 address = new InetSocketAddress(lh, port); 87 } 88 89 InetSocketAddress address() { 90 return address; 91 } 92 93 SocketChannel accept() throws IOException { 94 return ssc.accept(); 95 } 96 97 public void close() throws IOException { 98 ssc.close(); 99 } 100 101 } 102 103 static void testBind() throws Exception { 104 System.out.println("-- bind --"); 105 106 try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) { 107 if (ch.getLocalAddress() != null) 108 throw new RuntimeException("Local address should be 'null'"); 109 ch.bind(new InetSocketAddress(0)); 110 111 // check local address after binding 112 InetSocketAddress local = (InetSocketAddress)ch.getLocalAddress(); 113 if (local.getPort() == 0) 114 throw new RuntimeException("Unexpected port"); 115 if (!local.getAddress().isAnyLocalAddress()) 116 throw new RuntimeException("Not bound to a wildcard address"); 117 118 // try to re-bind 119 try { 120 ch.bind(new InetSocketAddress(0)); 121 throw new RuntimeException("AlreadyBoundException expected"); 122 } catch (AlreadyBoundException x) { 123 } 124 } 125 126 // check ClosedChannelException 127 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 128 ch.close(); 129 try { 130 ch.bind(new InetSocketAddress(0)); 131 throw new RuntimeException("ClosedChannelException expected"); 132 } catch (ClosedChannelException x) { 133 } 134 } 135 136 static void testSocketOptions() throws Exception { 137 System.out.println("-- socket options --"); 138 139 try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) { 140 ch.setOption(SO_RCVBUF, 128*1024) 141 .setOption(SO_SNDBUF, 128*1024) 142 .setOption(SO_REUSEADDR, true); 143 144 // check SO_SNDBUF/SO_RCVBUF limits 145 int before, after; 146 before = ch.getOption(SO_SNDBUF); 147 after = ch.setOption(SO_SNDBUF, Integer.MAX_VALUE).getOption(SO_SNDBUF); 148 if (after < before) 149 throw new RuntimeException("setOption caused SO_SNDBUF to decrease"); 150 before = ch.getOption(SO_RCVBUF); 151 after = ch.setOption(SO_RCVBUF, Integer.MAX_VALUE).getOption(SO_RCVBUF); 152 if (after < before) 153 throw new RuntimeException("setOption caused SO_RCVBUF to decrease"); 154 155 ch.bind(new InetSocketAddress(0)); 156 157 // default values 158 if (ch.getOption(SO_KEEPALIVE)) 159 throw new RuntimeException("Default of SO_KEEPALIVE should be 'false'"); 160 if (ch.getOption(TCP_NODELAY)) 161 throw new RuntimeException("Default of TCP_NODELAY should be 'false'"); 162 163 // set and check 164 if (!ch.setOption(SO_KEEPALIVE, true).getOption(SO_KEEPALIVE)) 165 throw new RuntimeException("SO_KEEPALIVE did not change"); 166 if (!ch.setOption(TCP_NODELAY, true).getOption(TCP_NODELAY)) 167 throw new RuntimeException("SO_KEEPALIVE did not change"); 168 169 // read others (can't check as actual value is implementation dependent) 170 ch.getOption(SO_RCVBUF); 171 ch.getOption(SO_SNDBUF); 172 173 Set<SocketOption<?>> options = ch.supportedOptions(); 174 boolean reuseport = options.contains(SO_REUSEPORT); 175 if (reuseport) { 176 if (ch.getOption(SO_REUSEPORT)) 177 throw new RuntimeException("Default of SO_REUSEPORT should be 'false'"); 178 if (!ch.setOption(SO_REUSEPORT, true).getOption(SO_REUSEPORT)) 179 throw new RuntimeException("SO_REUSEPORT did not change"); 180 } 181 } 182 } 183 184 static void testConnect() throws Exception { 185 System.out.println("-- connect --"); 186 187 SocketAddress address; 188 189 try (Server server = new Server()) { 190 address = server.address(); 191 192 // connect to server and check local/remote addresses 193 try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) { 194 ch.connect(address).get(); 195 // check local address 196 if (ch.getLocalAddress() == null) 197 throw new RuntimeException("Not bound to local address"); 198 199 // check remote address 200 InetSocketAddress remote = (InetSocketAddress)ch.getRemoteAddress(); 201 if (remote.getPort() != server.address().getPort()) 202 throw new RuntimeException("Connected to unexpected port"); 203 if (!remote.getAddress().equals(server.address().getAddress())) 204 throw new RuntimeException("Connected to unexpected address"); 205 206 // try to connect again 207 try { 208 ch.connect(server.address()).get(); 209 throw new RuntimeException("AlreadyConnectedException expected"); 210 } catch (AlreadyConnectedException x) { 211 } 212 213 // clean-up 214 server.accept().close(); 215 } 216 217 // check that connect fails with ClosedChannelException 218 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 219 ch.close(); 220 try { 221 ch.connect(server.address()).get(); 222 throw new RuntimeException("ExecutionException expected"); 223 } catch (ExecutionException x) { 224 if (!(x.getCause() instanceof ClosedChannelException)) 225 throw new RuntimeException("Cause of ClosedChannelException expected", 226 x.getCause()); 227 } 228 final AtomicReference<Throwable> connectException = new AtomicReference<>(); 229 ch.connect(server.address(), (Void)null, new CompletionHandler<Void,Void>() { 230 public void completed(Void result, Void att) { 231 } 232 public void failed(Throwable exc, Void att) { 233 connectException.set(exc); 234 } 235 }); 236 while (connectException.get() == null) { 237 Thread.sleep(100); 238 } 239 if (!(connectException.get() instanceof ClosedChannelException)) 240 throw new RuntimeException("ClosedChannelException expected", 241 connectException.get()); 242 } 243 244 // test that failure to connect closes the channel 245 try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) { 246 try { 247 ch.connect(address).get(); 248 } catch (ExecutionException x) { 249 // failed to establish connection 250 if (ch.isOpen()) 251 throw new RuntimeException("Channel should be closed"); 252 } 253 } 254 255 // repeat test by connecting to a (probably) non-existent host. This 256 // improves the chance that the connect will not fail immediately. 257 if (!skipSlowConnectTest) { 258 try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) { 259 try { 260 ch.connect(genSocketAddress()).get(); 261 } catch (ExecutionException x) { 262 // failed to establish connection 263 if (ch.isOpen()) 264 throw new RuntimeException("Channel should be closed"); 265 } 266 } 267 } 268 } 269 270 static void testCloseWhenPending() throws Exception { 271 System.out.println("-- asynchronous close when connecting --"); 272 273 AsynchronousSocketChannel ch; 274 275 // asynchronous close while connecting 276 ch = AsynchronousSocketChannel.open(); 277 Future<Void> connectResult = ch.connect(genSocketAddress()); 278 279 // give time to initiate the connect (SYN) 280 Thread.sleep(50); 281 282 // close 283 ch.close(); 284 285 // check that exception is thrown in timely manner 286 try { 287 connectResult.get(5, TimeUnit.SECONDS); 288 } catch (TimeoutException x) { 289 throw new RuntimeException("AsynchronousCloseException not thrown"); 290 } catch (ExecutionException x) { 291 // expected 292 } 293 294 System.out.println("-- asynchronous close when reading --"); 295 296 try (Server server = new Server()) { 297 ch = AsynchronousSocketChannel.open(); 298 ch.connect(server.address()).get(); 299 300 ByteBuffer dst = ByteBuffer.allocateDirect(100); 301 Future<Integer> result = ch.read(dst); 302 303 // attempt a second read - should fail with ReadPendingException 304 ByteBuffer buf = ByteBuffer.allocateDirect(100); 305 try { 306 ch.read(buf); 307 throw new RuntimeException("ReadPendingException expected"); 308 } catch (ReadPendingException x) { 309 } 310 311 // close channel (should cause initial read to complete) 312 ch.close(); 313 server.accept().close(); 314 315 // check that AsynchronousCloseException is thrown 316 try { 317 result.get(); 318 throw new RuntimeException("Should not read"); 319 } catch (ExecutionException x) { 320 if (!(x.getCause() instanceof AsynchronousCloseException)) 321 throw new RuntimeException(x); 322 } 323 324 System.out.println("-- asynchronous close when writing --"); 325 326 ch = AsynchronousSocketChannel.open(); 327 ch.connect(server.address()).get(); 328 329 final AtomicReference<Throwable> writeException = 330 new AtomicReference<Throwable>(); 331 332 // write bytes to fill socket buffer 333 final AtomicInteger numCompleted = new AtomicInteger(); 334 ch.write(genBuffer(), ch, new CompletionHandler<Integer,AsynchronousSocketChannel>() { 335 public void completed(Integer result, AsynchronousSocketChannel ch) { 336 numCompleted.incrementAndGet(); 337 ch.write(genBuffer(), ch, this); 338 } 339 public void failed(Throwable x, AsynchronousSocketChannel ch) { 340 writeException.set(x); 341 } 342 }); 343 344 // give time for socket buffer to fill up - 345 // take pauses until the handler is no longer being invoked 346 // because all writes are being pended which guarantees that 347 // the internal channel state indicates it is writing 348 int prevNumCompleted = numCompleted.get(); 349 do { 350 Thread.sleep(1000); 351 if (numCompleted.get() == prevNumCompleted) { 352 break; 353 } 354 prevNumCompleted = numCompleted.get(); 355 } while (true); 356 357 // attempt a concurrent write - 358 // should fail with WritePendingException 359 try { 360 ch.write(genBuffer()); 361 throw new RuntimeException("WritePendingException expected"); 362 } catch (WritePendingException x) { 363 } 364 365 // close channel - should cause initial write to complete 366 ch.close(); 367 server.accept().close(); 368 369 // wait for exception 370 while (writeException.get() == null) { 371 Thread.sleep(100); 372 } 373 if (!(writeException.get() instanceof AsynchronousCloseException)) 374 throw new RuntimeException("AsynchronousCloseException expected", 375 writeException.get()); 376 } 377 } 378 379 static void testCancel() throws Exception { 380 System.out.println("-- cancel --"); 381 382 try (Server server = new Server()) { 383 for (int i=0; i<2; i++) { 384 boolean mayInterruptIfRunning = (i == 0) ? false : true; 385 386 // establish loopback connection 387 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 388 ch.connect(server.address()).get(); 389 SocketChannel peer = server.accept(); 390 391 // start read operation 392 ByteBuffer buf = ByteBuffer.allocate(1); 393 Future<Integer> res = ch.read(buf); 394 395 // cancel operation 396 boolean cancelled = res.cancel(mayInterruptIfRunning); 397 398 // check post-conditions 399 if (!res.isDone()) 400 throw new RuntimeException("isDone should return true"); 401 if (res.isCancelled() != cancelled) 402 throw new RuntimeException("isCancelled not consistent"); 403 try { 404 res.get(); 405 throw new RuntimeException("CancellationException expected"); 406 } catch (CancellationException x) { 407 } 408 try { 409 res.get(1, TimeUnit.SECONDS); 410 throw new RuntimeException("CancellationException expected"); 411 } catch (CancellationException x) { 412 } 413 414 // check that the cancel doesn't impact writing to the channel 415 if (!mayInterruptIfRunning) { 416 buf = ByteBuffer.wrap("a".getBytes()); 417 ch.write(buf).get(); 418 } 419 420 ch.close(); 421 peer.close(); 422 } 423 } 424 } 425 426 static void testRead1() throws Exception { 427 System.out.println("-- read (1) --"); 428 429 try (Server server = new Server()) { 430 final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 431 ch.connect(server.address()).get(); 432 433 // read with 0 bytes remaining should complete immediately 434 ByteBuffer buf = ByteBuffer.allocate(1); 435 buf.put((byte)0); 436 int n = ch.read(buf).get(); 437 if (n != 0) 438 throw new RuntimeException("0 expected"); 439 440 // write bytes and close connection 441 ByteBuffer src = genBuffer(); 442 try (SocketChannel sc = server.accept()) { 443 sc.setOption(SO_SNDBUF, src.remaining()); 444 while (src.hasRemaining()) 445 sc.write(src); 446 } 447 448 // reads should complete immediately 449 final ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity() + 100); 450 final CountDownLatch latch = new CountDownLatch(1); 451 ch.read(dst, (Void)null, new CompletionHandler<Integer,Void>() { 452 public void completed(Integer result, Void att) { 453 int n = result; 454 if (n > 0) { 455 ch.read(dst, (Void)null, this); 456 } else { 457 latch.countDown(); 458 } 459 } 460 public void failed(Throwable exc, Void att) { 461 } 462 }); 463 464 latch.await(); 465 466 // check buffers 467 src.flip(); 468 dst.flip(); 469 if (!src.equals(dst)) { 470 throw new RuntimeException("Contents differ"); 471 } 472 473 // close channel 474 ch.close(); 475 476 // check read fails with ClosedChannelException 477 try { 478 ch.read(dst).get(); 479 throw new RuntimeException("ExecutionException expected"); 480 } catch (ExecutionException x) { 481 if (!(x.getCause() instanceof ClosedChannelException)) 482 throw new RuntimeException("Cause of ClosedChannelException expected", 483 x.getCause()); 484 } 485 } 486 } 487 488 static void testRead2() throws Exception { 489 System.out.println("-- read (2) --"); 490 491 try (Server server = new Server()) { 492 final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 493 ch.connect(server.address()).get(); 494 SocketChannel sc = server.accept(); 495 496 ByteBuffer src = genBuffer(); 497 498 // read until the buffer is full 499 final ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity()); 500 final CountDownLatch latch = new CountDownLatch(1); 501 ch.read(dst, (Void)null, new CompletionHandler<Integer,Void>() { 502 public void completed(Integer result, Void att) { 503 if (dst.hasRemaining()) { 504 ch.read(dst, (Void)null, this); 505 } else { 506 latch.countDown(); 507 } 508 } 509 public void failed(Throwable exc, Void att) { 510 } 511 }); 512 513 // trickle the writing 514 do { 515 int rem = src.remaining(); 516 int size = (rem <= 100) ? rem : 50 + RAND.nextInt(rem - 100); 517 ByteBuffer buf = ByteBuffer.allocate(size); 518 for (int i=0; i<size; i++) 519 buf.put(src.get()); 520 buf.flip(); 521 Thread.sleep(50 + RAND.nextInt(1500)); 522 while (buf.hasRemaining()) 523 sc.write(buf); 524 } while (src.hasRemaining()); 525 526 // wait until ascynrhonous reading has completed 527 latch.await(); 528 529 // check buffers 530 src.flip(); 531 dst.flip(); 532 if (!src.equals(dst)) { 533 throw new RuntimeException("Contents differ"); 534 } 535 536 sc.close(); 537 ch.close(); 538 } 539 } 540 541 // exercise scattering read 542 static void testRead3() throws Exception { 543 System.out.println("-- read (3) --"); 544 545 try (Server server = new Server()) { 546 final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 547 ch.connect(server.address()).get(); 548 SocketChannel sc = server.accept(); 549 550 ByteBuffer[] dsts = new ByteBuffer[3]; 551 for (int i=0; i<dsts.length; i++) { 552 dsts[i] = ByteBuffer.allocateDirect(100); 553 } 554 555 // scattering read that completes ascynhronously 556 final CountDownLatch l1 = new CountDownLatch(1); 557 ch.read(dsts, 0, dsts.length, 0L, TimeUnit.SECONDS, (Void)null, 558 new CompletionHandler<Long,Void>() { 559 public void completed(Long result, Void att) { 560 long n = result; 561 if (n <= 0) 562 throw new RuntimeException("No bytes read"); 563 l1.countDown(); 564 } 565 public void failed(Throwable exc, Void att) { 566 } 567 }); 568 569 // write some bytes 570 sc.write(genBuffer()); 571 572 // read should now complete 573 l1.await(); 574 575 // write more bytes 576 sc.write(genBuffer()); 577 578 // read should complete immediately 579 for (int i=0; i<dsts.length; i++) { 580 dsts[i].rewind(); 581 } 582 583 final CountDownLatch l2 = new CountDownLatch(1); 584 ch.read(dsts, 0, dsts.length, 0L, TimeUnit.SECONDS, (Void)null, 585 new CompletionHandler<Long,Void>() { 586 public void completed(Long result, Void att) { 587 long n = result; 588 if (n <= 0) 589 throw new RuntimeException("No bytes read"); 590 l2.countDown(); 591 } 592 public void failed(Throwable exc, Void att) { 593 } 594 }); 595 l2.await(); 596 597 ch.close(); 598 sc.close(); 599 } 600 } 601 602 static void testWrite1() throws Exception { 603 System.out.println("-- write (1) --"); 604 605 try (Server server = new Server()) { 606 final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 607 ch.connect(server.address()).get(); 608 SocketChannel sc = server.accept(); 609 610 // write with 0 bytes remaining should complete immediately 611 ByteBuffer buf = ByteBuffer.allocate(1); 612 buf.put((byte)0); 613 int n = ch.write(buf).get(); 614 if (n != 0) 615 throw new RuntimeException("0 expected"); 616 617 // write all bytes and close connection when done 618 final ByteBuffer src = genBuffer(); 619 ch.write(src, (Void)null, new CompletionHandler<Integer,Void>() { 620 public void completed(Integer result, Void att) { 621 if (src.hasRemaining()) { 622 ch.write(src, (Void)null, this); 623 } else { 624 try { 625 ch.close(); 626 } catch (IOException ignore) { } 627 } 628 } 629 public void failed(Throwable exc, Void att) { 630 } 631 }); 632 633 // read to EOF or buffer full 634 ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity() + 100); 635 do { 636 n = sc.read(dst); 637 } while (n > 0); 638 sc.close(); 639 640 // check buffers 641 src.flip(); 642 dst.flip(); 643 if (!src.equals(dst)) { 644 throw new RuntimeException("Contents differ"); 645 } 646 647 // check write fails with ClosedChannelException 648 try { 649 ch.read(dst).get(); 650 throw new RuntimeException("ExecutionException expected"); 651 } catch (ExecutionException x) { 652 if (!(x.getCause() instanceof ClosedChannelException)) 653 throw new RuntimeException("Cause of ClosedChannelException expected", 654 x.getCause()); 655 } 656 } 657 } 658 659 // exercise gathering write 660 static void testWrite2() throws Exception { 661 System.out.println("-- write (2) --"); 662 663 try (Server server = new Server()) { 664 final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 665 ch.connect(server.address()).get(); 666 SocketChannel sc = server.accept(); 667 668 // number of bytes written 669 final AtomicLong bytesWritten = new AtomicLong(0); 670 671 // write buffers (should complete immediately) 672 ByteBuffer[] srcs = genBuffers(1); 673 final CountDownLatch l1 = new CountDownLatch(1); 674 ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, (Void)null, 675 new CompletionHandler<Long,Void>() { 676 public void completed(Long result, Void att) { 677 long n = result; 678 if (n <= 0) 679 throw new RuntimeException("No bytes read"); 680 bytesWritten.addAndGet(n); 681 l1.countDown(); 682 } 683 public void failed(Throwable exc, Void att) { 684 } 685 }); 686 l1.await(); 687 688 // set to true to signal that no more buffers should be written 689 final AtomicBoolean continueWriting = new AtomicBoolean(true); 690 691 // write until socket buffer is full so as to create the conditions 692 // for when a write does not complete immediately 693 srcs = genBuffers(1); 694 ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, (Void)null, 695 new CompletionHandler<Long,Void>() { 696 public void completed(Long result, Void att) { 697 long n = result; 698 if (n <= 0) 699 throw new RuntimeException("No bytes written"); 700 bytesWritten.addAndGet(n); 701 if (continueWriting.get()) { 702 ByteBuffer[] srcs = genBuffers(8); 703 ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, 704 (Void)null, this); 705 } 706 } 707 public void failed(Throwable exc, Void att) { 708 } 709 }); 710 711 // give time for socket buffer to fill up. 712 Thread.sleep(5*1000); 713 714 // signal handler to stop further writing 715 continueWriting.set(false); 716 717 // read until done 718 ByteBuffer buf = ByteBuffer.allocateDirect(4096); 719 long total = 0L; 720 do { 721 int n = sc.read(buf); 722 if (n <= 0) 723 throw new RuntimeException("No bytes read"); 724 buf.rewind(); 725 total += n; 726 } while (total < bytesWritten.get()); 727 728 ch.close(); 729 sc.close(); 730 } 731 } 732 733 static void testShutdown() throws Exception { 734 System.out.println("-- shutdown --"); 735 736 try (Server server = new Server(); 737 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) 738 { 739 ch.connect(server.address()).get(); 740 try (SocketChannel peer = server.accept()) { 741 ByteBuffer buf = ByteBuffer.allocateDirect(1000); 742 int n; 743 744 // check read 745 ch.shutdownInput(); 746 n = ch.read(buf).get(); 747 if (n != -1) 748 throw new RuntimeException("-1 expected"); 749 // check full with full buffer 750 buf.put(new byte[100]); 751 n = ch.read(buf).get(); 752 if (n != -1) 753 throw new RuntimeException("-1 expected"); 754 755 // check write 756 ch.shutdownOutput(); 757 try { 758 ch.write(buf).get(); 759 throw new RuntimeException("ClosedChannelException expected"); 760 } catch (ExecutionException x) { 761 if (!(x.getCause() instanceof ClosedChannelException)) 762 throw new RuntimeException("ClosedChannelException expected", 763 x.getCause()); 764 } 765 } 766 } 767 } 768 769 static void testTimeout() throws Exception { 770 System.out.println("-- timeouts --"); 771 testTimeout(Integer.MIN_VALUE, TimeUnit.SECONDS); 772 testTimeout(-1L, TimeUnit.SECONDS); 773 testTimeout(0L, TimeUnit.SECONDS); 774 testTimeout(2L, TimeUnit.SECONDS); 775 } 776 777 static void testTimeout(final long timeout, final TimeUnit unit) throws Exception { 778 System.out.printf("---- timeout: %d ms%n", unit.toMillis(timeout)); 779 try (Server server = new Server()) { 780 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 781 ch.connect(server.address()).get(); 782 783 ByteBuffer dst = ByteBuffer.allocate(512); 784 785 final AtomicReference<Throwable> readException = new AtomicReference<Throwable>(); 786 787 // this read should timeout if value is > 0 788 ch.read(dst, timeout, unit, null, new CompletionHandler<Integer,Void>() { 789 public void completed(Integer result, Void att) { 790 readException.set(new RuntimeException("Should not complete")); 791 } 792 public void failed(Throwable exc, Void att) { 793 readException.set(exc); 794 } 795 }); 796 if (timeout > 0L) { 797 // wait for exception 798 while (readException.get() == null) { 799 Thread.sleep(100); 800 } 801 if (!(readException.get() instanceof InterruptedByTimeoutException)) 802 throw new RuntimeException("InterruptedByTimeoutException expected", 803 readException.get()); 804 805 // after a timeout then further reading should throw unspecified runtime exception 806 boolean exceptionThrown = false; 807 try { 808 ch.read(dst); 809 } catch (RuntimeException x) { 810 exceptionThrown = true; 811 } 812 if (!exceptionThrown) 813 throw new RuntimeException("RuntimeException expected after timeout."); 814 } else { 815 Thread.sleep(1000); 816 Throwable exc = readException.get(); 817 if (exc != null) 818 throw new RuntimeException(exc); 819 } 820 821 final AtomicReference<Throwable> writeException = new AtomicReference<Throwable>(); 822 823 // write bytes to fill socket buffer 824 ch.write(genBuffer(), timeout, unit, ch, 825 new CompletionHandler<Integer,AsynchronousSocketChannel>() 826 { 827 public void completed(Integer result, AsynchronousSocketChannel ch) { 828 ch.write(genBuffer(), timeout, unit, ch, this); 829 } 830 public void failed(Throwable exc, AsynchronousSocketChannel ch) { 831 writeException.set(exc); 832 } 833 }); 834 if (timeout > 0) { 835 // wait for exception 836 while (writeException.get() == null) { 837 Thread.sleep(100); 838 } 839 if (!(writeException.get() instanceof InterruptedByTimeoutException)) 840 throw new RuntimeException("InterruptedByTimeoutException expected", 841 writeException.get()); 842 843 // after a timeout then further writing should throw unspecified runtime exception 844 boolean exceptionThrown = false; 845 try { 846 ch.write(genBuffer()); 847 } catch (RuntimeException x) { 848 exceptionThrown = true; 849 } 850 if (!exceptionThrown) 851 throw new RuntimeException("RuntimeException expected after timeout."); 852 } else { 853 Thread.sleep(1000); 854 Throwable exc = writeException.get(); 855 if (exc != null) 856 throw new RuntimeException(exc); 857 } 858 859 // clean-up 860 server.accept().close(); 861 ch.close(); 862 } 863 } 864 865 // returns ByteBuffer with random bytes 866 static ByteBuffer genBuffer() { 867 int size = 1024 + RAND.nextInt(16000); 868 byte[] buf = new byte[size]; 869 RAND.nextBytes(buf); 870 boolean useDirect = RAND.nextBoolean(); 871 if (useDirect) { 872 ByteBuffer bb = ByteBuffer.allocateDirect(buf.length); 873 bb.put(buf); 874 bb.flip(); 875 return bb; 876 } else { 877 return ByteBuffer.wrap(buf); 878 } 879 } 880 881 // return ByteBuffer[] with random bytes 882 static ByteBuffer[] genBuffers(int max) { 883 int len = 1; 884 if (max > 1) 885 len += RAND.nextInt(max); 886 ByteBuffer[] bufs = new ByteBuffer[len]; 887 for (int i=0; i<len; i++) 888 bufs[i] = genBuffer(); 889 return bufs; 890 } 891 892 // return random SocketAddress 893 static SocketAddress genSocketAddress() { 894 StringBuilder sb = new StringBuilder("10."); 895 sb.append(RAND.nextInt(256)); 896 sb.append('.'); 897 sb.append(RAND.nextInt(256)); 898 sb.append('.'); 899 sb.append(RAND.nextInt(256)); 900 InetAddress rh; 901 try { 902 rh = InetAddress.getByName(sb.toString()); 903 } catch (UnknownHostException x) { 904 throw new InternalError("Should not happen"); 905 } 906 return new InetSocketAddress(rh, RAND.nextInt(65535)+1); 907 } 908 }