1 /* 2 * Copyright (c) 2008, 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 /* @test 25 * @bug 4607272 6842687 6878369 6944810 7023403 26 * @summary Unit test for AsynchronousSocketChannel 27 * @run main Basic -skipSlowConnectTest 28 * @key randomness intermittent 29 */ 30 31 import java.io.Closeable; 32 import java.io.IOException; 33 import java.net.*; 34 import static java.net.StandardSocketOptions.*; 35 import java.nio.ByteBuffer; 36 import java.nio.channels.*; 37 import java.util.Random; 38 import java.util.Set; 39 import java.util.concurrent.*; 40 import java.util.concurrent.atomic.*; 41 42 public class Basic { 43 static final Random rand = new Random(); 44 45 static boolean skipSlowConnectTest = false; 46 47 public static void main(String[] args) throws Exception { 48 for (String arg: args) { 49 switch (arg) { 50 case "-skipSlowConnectTest" : 51 skipSlowConnectTest = true; 52 break; 53 default: 54 throw new RuntimeException("Unrecognized argument: " + arg); 55 } 56 } 57 58 testBind(); 59 testSocketOptions(); 60 testConnect(); 61 testCloseWhenPending(); 62 testCancel(); 63 testRead1(); 64 testRead2(); 65 testRead3(); 66 testWrite1(); 67 testWrite2(); 68 // skip timeout tests until 7052549 is fixed 69 if (!System.getProperty("os.name").startsWith("Windows")) 70 testTimeout(); 71 testShutdown(); 72 } 73 74 static class Server implements Closeable { 75 private final ServerSocketChannel ssc; 76 private final InetSocketAddress address; 77 78 Server() throws IOException { 79 ssc = ServerSocketChannel.open().bind(new InetSocketAddress(0)); 80 81 InetAddress lh = InetAddress.getLocalHost(); 82 int port = ((InetSocketAddress)(ssc.getLocalAddress())).getPort(); 83 address = new InetSocketAddress(lh, port); 84 } 85 86 InetSocketAddress address() { 87 return address; 88 } 89 90 SocketChannel accept() throws IOException { 91 return ssc.accept(); 92 } 93 94 public void close() throws IOException { 95 ssc.close(); 96 } 97 98 } 99 100 static void testBind() throws Exception { 101 System.out.println("-- bind --"); 102 103 try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) { 104 if (ch.getLocalAddress() != null) 105 throw new RuntimeException("Local address should be 'null'"); 106 ch.bind(new InetSocketAddress(0)); 107 108 // check local address after binding 109 InetSocketAddress local = (InetSocketAddress)ch.getLocalAddress(); 110 if (local.getPort() == 0) 111 throw new RuntimeException("Unexpected port"); 112 if (!local.getAddress().isAnyLocalAddress()) 113 throw new RuntimeException("Not bound to a wildcard address"); 114 115 // try to re-bind 116 try { 117 ch.bind(new InetSocketAddress(0)); 118 throw new RuntimeException("AlreadyBoundException expected"); 119 } catch (AlreadyBoundException x) { 120 } 121 } 122 123 // check ClosedChannelException 124 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 125 ch.close(); 126 try { 127 ch.bind(new InetSocketAddress(0)); 128 throw new RuntimeException("ClosedChannelException expected"); 129 } catch (ClosedChannelException x) { 130 } 131 } 132 133 static void testSocketOptions() throws Exception { 134 System.out.println("-- socket options --"); 135 136 try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) { 137 ch.setOption(SO_RCVBUF, 128*1024) 138 .setOption(SO_SNDBUF, 128*1024) 139 .setOption(SO_REUSEADDR, true); 140 141 // check SO_SNDBUF/SO_RCVBUF limits 142 int before, after; 143 before = ch.getOption(SO_SNDBUF); 144 after = ch.setOption(SO_SNDBUF, Integer.MAX_VALUE).getOption(SO_SNDBUF); 145 if (after < before) 146 throw new RuntimeException("setOption caused SO_SNDBUF to decrease"); 147 before = ch.getOption(SO_RCVBUF); 148 after = ch.setOption(SO_RCVBUF, Integer.MAX_VALUE).getOption(SO_RCVBUF); 149 if (after < before) 150 throw new RuntimeException("setOption caused SO_RCVBUF to decrease"); 151 152 ch.bind(new InetSocketAddress(0)); 153 154 // default values 155 if (ch.getOption(SO_KEEPALIVE)) 156 throw new RuntimeException("Default of SO_KEEPALIVE should be 'false'"); 157 if (ch.getOption(TCP_NODELAY)) 158 throw new RuntimeException("Default of TCP_NODELAY should be 'false'"); 159 160 // set and check 161 if (!ch.setOption(SO_KEEPALIVE, true).getOption(SO_KEEPALIVE)) 162 throw new RuntimeException("SO_KEEPALIVE did not change"); 163 if (!ch.setOption(TCP_NODELAY, true).getOption(TCP_NODELAY)) 164 throw new RuntimeException("SO_KEEPALIVE did not change"); 165 166 // read others (can't check as actual value is implementation dependent) 167 ch.getOption(SO_RCVBUF); 168 ch.getOption(SO_SNDBUF); 169 170 Set<SocketOption<?>> options = ch.supportedOptions(); 171 boolean reuseport = options.contains(SO_REUSEPORT); 172 if (reuseport) { 173 if (ch.getOption(SO_REUSEPORT)) 174 throw new RuntimeException("Default of SO_REUSEPORT should be 'false'"); 175 if (!ch.setOption(SO_REUSEPORT, true).getOption(SO_REUSEPORT)) 176 throw new RuntimeException("SO_REUSEPORT did not change"); 177 } 178 } 179 } 180 181 static void testConnect() throws Exception { 182 System.out.println("-- connect --"); 183 184 SocketAddress address; 185 186 try (Server server = new Server()) { 187 address = server.address(); 188 189 // connect to server and check local/remote addresses 190 try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) { 191 ch.connect(address).get(); 192 // check local address 193 if (ch.getLocalAddress() == null) 194 throw new RuntimeException("Not bound to local address"); 195 196 // check remote address 197 InetSocketAddress remote = (InetSocketAddress)ch.getRemoteAddress(); 198 if (remote.getPort() != server.address().getPort()) 199 throw new RuntimeException("Connected to unexpected port"); 200 if (!remote.getAddress().equals(server.address().getAddress())) 201 throw new RuntimeException("Connected to unexpected address"); 202 203 // try to connect again 204 try { 205 ch.connect(server.address()).get(); 206 throw new RuntimeException("AlreadyConnectedException expected"); 207 } catch (AlreadyConnectedException x) { 208 } 209 210 // clean-up 211 server.accept().close(); 212 } 213 214 // check that connect fails with ClosedChannelException 215 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 216 ch.close(); 217 try { 218 ch.connect(server.address()).get(); 219 throw new RuntimeException("ExecutionException expected"); 220 } catch (ExecutionException x) { 221 if (!(x.getCause() instanceof ClosedChannelException)) 222 throw new RuntimeException("Cause of ClosedChannelException expected", 223 x.getCause()); 224 } 225 final AtomicReference<Throwable> connectException = new AtomicReference<>(); 226 ch.connect(server.address(), (Void)null, new CompletionHandler<Void,Void>() { 227 public void completed(Void result, Void att) { 228 } 229 public void failed(Throwable exc, Void att) { 230 connectException.set(exc); 231 } 232 }); 233 while (connectException.get() == null) { 234 Thread.sleep(100); 235 } 236 if (!(connectException.get() instanceof ClosedChannelException)) 237 throw new RuntimeException("ClosedChannelException expected", 238 connectException.get()); 239 } 240 241 // test that failure to connect closes the channel 242 try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) { 243 try { 244 ch.connect(address).get(); 245 } catch (ExecutionException x) { 246 // failed to establish connection 247 if (ch.isOpen()) 248 throw new RuntimeException("Channel should be closed"); 249 } 250 } 251 252 // repeat test by connecting to a (probably) non-existent host. This 253 // improves the chance that the connect will not fail immediately. 254 if (!skipSlowConnectTest) { 255 try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) { 256 try { 257 ch.connect(genSocketAddress()).get(); 258 } catch (ExecutionException x) { 259 // failed to establish connection 260 if (ch.isOpen()) 261 throw new RuntimeException("Channel should be closed"); 262 } 263 } 264 } 265 } 266 267 static void testCloseWhenPending() throws Exception { 268 System.out.println("-- asynchronous close when connecting --"); 269 270 AsynchronousSocketChannel ch; 271 272 // asynchronous close while connecting 273 ch = AsynchronousSocketChannel.open(); 274 Future<Void> connectResult = ch.connect(genSocketAddress()); 275 276 // give time to initiate the connect (SYN) 277 Thread.sleep(50); 278 279 // close 280 ch.close(); 281 282 // check that exception is thrown in timely manner 283 try { 284 connectResult.get(5, TimeUnit.SECONDS); 285 } catch (TimeoutException x) { 286 throw new RuntimeException("AsynchronousCloseException not thrown"); 287 } catch (ExecutionException x) { 288 // expected 289 } 290 291 System.out.println("-- asynchronous close when reading --"); 292 293 try (Server server = new Server()) { 294 ch = AsynchronousSocketChannel.open(); 295 ch.connect(server.address()).get(); 296 297 ByteBuffer dst = ByteBuffer.allocateDirect(100); 298 Future<Integer> result = ch.read(dst); 299 300 // attempt a second read - should fail with ReadPendingException 301 ByteBuffer buf = ByteBuffer.allocateDirect(100); 302 try { 303 ch.read(buf); 304 throw new RuntimeException("ReadPendingException expected"); 305 } catch (ReadPendingException x) { 306 } 307 308 // close channel (should cause initial read to complete) 309 ch.close(); 310 server.accept().close(); 311 312 // check that AsynchronousCloseException is thrown 313 try { 314 result.get(); 315 throw new RuntimeException("Should not read"); 316 } catch (ExecutionException x) { 317 if (!(x.getCause() instanceof AsynchronousCloseException)) 318 throw new RuntimeException(x); 319 } 320 321 System.out.println("-- asynchronous close when writing --"); 322 323 ch = AsynchronousSocketChannel.open(); 324 ch.connect(server.address()).get(); 325 326 final AtomicReference<Throwable> writeException = 327 new AtomicReference<Throwable>(); 328 329 // write bytes to fill socket buffer 330 final AtomicLong completedTime = new AtomicLong(); 331 ch.write(genBuffer(), ch, new CompletionHandler<Integer,AsynchronousSocketChannel>() { 332 public void completed(Integer result, AsynchronousSocketChannel ch) { 333 completedTime.set(System.nanoTime()); 334 ch.write(genBuffer(), ch, this); 335 } 336 public void failed(Throwable x, AsynchronousSocketChannel ch) { 337 writeException.set(x); 338 } 339 }); 340 341 // give time for socket buffer to fill up - 342 // take pauses until the handler is no longer being invoked 343 // because all writes are being pended which guarantees that 344 // the internal channel state indicates it is writing 345 long t0 = System.nanoTime(); 346 long previousCompletedTime = completedTime.get(); 347 do { 348 Thread.sleep(1000); 349 if (completedTime.get() == previousCompletedTime) { 350 break; 351 } 352 previousCompletedTime = completedTime.get(); 353 } while (true); 354 355 // attempt a concurrent write - should fail with WritePendingException 356 try { 357 ch.write(genBuffer()); 358 throw new RuntimeException("WritePendingException expected"); 359 } catch (WritePendingException x) { 360 } 361 362 // close channel - should cause initial write to complete 363 ch.close(); 364 server.accept().close(); 365 366 // wait for exception 367 while (writeException.get() == null) { 368 Thread.sleep(100); 369 } 370 if (!(writeException.get() instanceof AsynchronousCloseException)) 371 throw new RuntimeException("AsynchronousCloseException expected", 372 writeException.get()); 373 } 374 } 375 376 static void testCancel() throws Exception { 377 System.out.println("-- cancel --"); 378 379 try (Server server = new Server()) { 380 for (int i=0; i<2; i++) { 381 boolean mayInterruptIfRunning = (i == 0) ? false : true; 382 383 // establish loopback connection 384 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 385 ch.connect(server.address()).get(); 386 SocketChannel peer = server.accept(); 387 388 // start read operation 389 ByteBuffer buf = ByteBuffer.allocate(1); 390 Future<Integer> res = ch.read(buf); 391 392 // cancel operation 393 boolean cancelled = res.cancel(mayInterruptIfRunning); 394 395 // check post-conditions 396 if (!res.isDone()) 397 throw new RuntimeException("isDone should return true"); 398 if (res.isCancelled() != cancelled) 399 throw new RuntimeException("isCancelled not consistent"); 400 try { 401 res.get(); 402 throw new RuntimeException("CancellationException expected"); 403 } catch (CancellationException x) { 404 } 405 try { 406 res.get(1, TimeUnit.SECONDS); 407 throw new RuntimeException("CancellationException expected"); 408 } catch (CancellationException x) { 409 } 410 411 // check that the cancel doesn't impact writing to the channel 412 if (!mayInterruptIfRunning) { 413 buf = ByteBuffer.wrap("a".getBytes()); 414 ch.write(buf).get(); 415 } 416 417 ch.close(); 418 peer.close(); 419 } 420 } 421 } 422 423 static void testRead1() throws Exception { 424 System.out.println("-- read (1) --"); 425 426 try (Server server = new Server()) { 427 final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 428 ch.connect(server.address()).get(); 429 430 // read with 0 bytes remaining should complete immediately 431 ByteBuffer buf = ByteBuffer.allocate(1); 432 buf.put((byte)0); 433 int n = ch.read(buf).get(); 434 if (n != 0) 435 throw new RuntimeException("0 expected"); 436 437 // write bytes and close connection 438 ByteBuffer src = genBuffer(); 439 try (SocketChannel sc = server.accept()) { 440 sc.setOption(SO_SNDBUF, src.remaining()); 441 while (src.hasRemaining()) 442 sc.write(src); 443 } 444 445 // reads should complete immediately 446 final ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity() + 100); 447 final CountDownLatch latch = new CountDownLatch(1); 448 ch.read(dst, (Void)null, new CompletionHandler<Integer,Void>() { 449 public void completed(Integer result, Void att) { 450 int n = result; 451 if (n > 0) { 452 ch.read(dst, (Void)null, this); 453 } else { 454 latch.countDown(); 455 } 456 } 457 public void failed(Throwable exc, Void att) { 458 } 459 }); 460 461 latch.await(); 462 463 // check buffers 464 src.flip(); 465 dst.flip(); 466 if (!src.equals(dst)) { 467 throw new RuntimeException("Contents differ"); 468 } 469 470 // close channel 471 ch.close(); 472 473 // check read fails with ClosedChannelException 474 try { 475 ch.read(dst).get(); 476 throw new RuntimeException("ExecutionException expected"); 477 } catch (ExecutionException x) { 478 if (!(x.getCause() instanceof ClosedChannelException)) 479 throw new RuntimeException("Cause of ClosedChannelException expected", 480 x.getCause()); 481 } 482 } 483 } 484 485 static void testRead2() throws Exception { 486 System.out.println("-- read (2) --"); 487 488 try (Server server = new Server()) { 489 final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 490 ch.connect(server.address()).get(); 491 SocketChannel sc = server.accept(); 492 493 ByteBuffer src = genBuffer(); 494 495 // read until the buffer is full 496 final ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity()); 497 final CountDownLatch latch = new CountDownLatch(1); 498 ch.read(dst, (Void)null, new CompletionHandler<Integer,Void>() { 499 public void completed(Integer result, Void att) { 500 if (dst.hasRemaining()) { 501 ch.read(dst, (Void)null, this); 502 } else { 503 latch.countDown(); 504 } 505 } 506 public void failed(Throwable exc, Void att) { 507 } 508 }); 509 510 // trickle the writing 511 do { 512 int rem = src.remaining(); 513 int size = (rem <= 100) ? rem : 50 + rand.nextInt(rem - 100); 514 ByteBuffer buf = ByteBuffer.allocate(size); 515 for (int i=0; i<size; i++) 516 buf.put(src.get()); 517 buf.flip(); 518 Thread.sleep(50 + rand.nextInt(1500)); 519 while (buf.hasRemaining()) 520 sc.write(buf); 521 } while (src.hasRemaining()); 522 523 // wait until ascynrhonous reading has completed 524 latch.await(); 525 526 // check buffers 527 src.flip(); 528 dst.flip(); 529 if (!src.equals(dst)) { 530 throw new RuntimeException("Contents differ"); 531 } 532 533 sc.close(); 534 ch.close(); 535 } 536 } 537 538 // exercise scattering read 539 static void testRead3() throws Exception { 540 System.out.println("-- read (3) --"); 541 542 try (Server server = new Server()) { 543 final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 544 ch.connect(server.address()).get(); 545 SocketChannel sc = server.accept(); 546 547 ByteBuffer[] dsts = new ByteBuffer[3]; 548 for (int i=0; i<dsts.length; i++) { 549 dsts[i] = ByteBuffer.allocateDirect(100); 550 } 551 552 // scattering read that completes ascynhronously 553 final CountDownLatch l1 = new CountDownLatch(1); 554 ch.read(dsts, 0, dsts.length, 0L, TimeUnit.SECONDS, (Void)null, 555 new CompletionHandler<Long,Void>() { 556 public void completed(Long result, Void att) { 557 long n = result; 558 if (n <= 0) 559 throw new RuntimeException("No bytes read"); 560 l1.countDown(); 561 } 562 public void failed(Throwable exc, Void att) { 563 } 564 }); 565 566 // write some bytes 567 sc.write(genBuffer()); 568 569 // read should now complete 570 l1.await(); 571 572 // write more bytes 573 sc.write(genBuffer()); 574 575 // read should complete immediately 576 for (int i=0; i<dsts.length; i++) { 577 dsts[i].rewind(); 578 } 579 580 final CountDownLatch l2 = new CountDownLatch(1); 581 ch.read(dsts, 0, dsts.length, 0L, TimeUnit.SECONDS, (Void)null, 582 new CompletionHandler<Long,Void>() { 583 public void completed(Long result, Void att) { 584 long n = result; 585 if (n <= 0) 586 throw new RuntimeException("No bytes read"); 587 l2.countDown(); 588 } 589 public void failed(Throwable exc, Void att) { 590 } 591 }); 592 l2.await(); 593 594 ch.close(); 595 sc.close(); 596 } 597 } 598 599 static void testWrite1() throws Exception { 600 System.out.println("-- write (1) --"); 601 602 try (Server server = new Server()) { 603 final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 604 ch.connect(server.address()).get(); 605 SocketChannel sc = server.accept(); 606 607 // write with 0 bytes remaining should complete immediately 608 ByteBuffer buf = ByteBuffer.allocate(1); 609 buf.put((byte)0); 610 int n = ch.write(buf).get(); 611 if (n != 0) 612 throw new RuntimeException("0 expected"); 613 614 // write all bytes and close connection when done 615 final ByteBuffer src = genBuffer(); 616 ch.write(src, (Void)null, new CompletionHandler<Integer,Void>() { 617 public void completed(Integer result, Void att) { 618 if (src.hasRemaining()) { 619 ch.write(src, (Void)null, this); 620 } else { 621 try { 622 ch.close(); 623 } catch (IOException ignore) { } 624 } 625 } 626 public void failed(Throwable exc, Void att) { 627 } 628 }); 629 630 // read to EOF or buffer full 631 ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity() + 100); 632 do { 633 n = sc.read(dst); 634 } while (n > 0); 635 sc.close(); 636 637 // check buffers 638 src.flip(); 639 dst.flip(); 640 if (!src.equals(dst)) { 641 throw new RuntimeException("Contents differ"); 642 } 643 644 // check write fails with ClosedChannelException 645 try { 646 ch.read(dst).get(); 647 throw new RuntimeException("ExecutionException expected"); 648 } catch (ExecutionException x) { 649 if (!(x.getCause() instanceof ClosedChannelException)) 650 throw new RuntimeException("Cause of ClosedChannelException expected", 651 x.getCause()); 652 } 653 } 654 } 655 656 // exercise gathering write 657 static void testWrite2() throws Exception { 658 System.out.println("-- write (2) --"); 659 660 try (Server server = new Server()) { 661 final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 662 ch.connect(server.address()).get(); 663 SocketChannel sc = server.accept(); 664 665 // number of bytes written 666 final AtomicLong bytesWritten = new AtomicLong(0); 667 668 // write buffers (should complete immediately) 669 ByteBuffer[] srcs = genBuffers(1); 670 final CountDownLatch l1 = new CountDownLatch(1); 671 ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, (Void)null, 672 new CompletionHandler<Long,Void>() { 673 public void completed(Long result, Void att) { 674 long n = result; 675 if (n <= 0) 676 throw new RuntimeException("No bytes read"); 677 bytesWritten.addAndGet(n); 678 l1.countDown(); 679 } 680 public void failed(Throwable exc, Void att) { 681 } 682 }); 683 l1.await(); 684 685 // set to true to signal that no more buffers should be written 686 final AtomicBoolean continueWriting = new AtomicBoolean(true); 687 688 // write until socket buffer is full so as to create the conditions 689 // for when a write does not complete immediately 690 srcs = genBuffers(1); 691 ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, (Void)null, 692 new CompletionHandler<Long,Void>() { 693 public void completed(Long result, Void att) { 694 long n = result; 695 if (n <= 0) 696 throw new RuntimeException("No bytes written"); 697 bytesWritten.addAndGet(n); 698 if (continueWriting.get()) { 699 ByteBuffer[] srcs = genBuffers(8); 700 ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, 701 (Void)null, this); 702 } 703 } 704 public void failed(Throwable exc, Void att) { 705 } 706 }); 707 708 // give time for socket buffer to fill up. 709 Thread.sleep(5*1000); 710 711 // signal handler to stop further writing 712 continueWriting.set(false); 713 714 // read until done 715 ByteBuffer buf = ByteBuffer.allocateDirect(4096); 716 long total = 0L; 717 do { 718 int n = sc.read(buf); 719 if (n <= 0) 720 throw new RuntimeException("No bytes read"); 721 buf.rewind(); 722 total += n; 723 } while (total < bytesWritten.get()); 724 725 ch.close(); 726 sc.close(); 727 } 728 } 729 730 static void testShutdown() throws Exception { 731 System.out.println("-- shutdown--"); 732 733 try (Server server = new Server(); 734 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) 735 { 736 ch.connect(server.address()).get(); 737 try (SocketChannel peer = server.accept()) { 738 ByteBuffer buf = ByteBuffer.allocateDirect(1000); 739 int n; 740 741 // check read 742 ch.shutdownInput(); 743 n = ch.read(buf).get(); 744 if (n != -1) 745 throw new RuntimeException("-1 expected"); 746 // check full with full buffer 747 buf.put(new byte[100]); 748 n = ch.read(buf).get(); 749 if (n != -1) 750 throw new RuntimeException("-1 expected"); 751 752 // check write 753 ch.shutdownOutput(); 754 try { 755 ch.write(buf).get(); 756 throw new RuntimeException("ClosedChannelException expected"); 757 } catch (ExecutionException x) { 758 if (!(x.getCause() instanceof ClosedChannelException)) 759 throw new RuntimeException("ClosedChannelException expected", 760 x.getCause()); 761 } 762 } 763 } 764 } 765 766 static void testTimeout() throws Exception { 767 System.out.println("-- timeouts --"); 768 testTimeout(Integer.MIN_VALUE, TimeUnit.SECONDS); 769 testTimeout(-1L, TimeUnit.SECONDS); 770 testTimeout(0L, TimeUnit.SECONDS); 771 testTimeout(2L, TimeUnit.SECONDS); 772 } 773 774 static void testTimeout(final long timeout, final TimeUnit unit) throws Exception { 775 try (Server server = new Server()) { 776 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 777 ch.connect(server.address()).get(); 778 779 ByteBuffer dst = ByteBuffer.allocate(512); 780 781 final AtomicReference<Throwable> readException = new AtomicReference<Throwable>(); 782 783 // this read should timeout if value is > 0 784 ch.read(dst, timeout, unit, null, new CompletionHandler<Integer,Void>() { 785 public void completed(Integer result, Void att) { 786 readException.set(new RuntimeException("Should not complete")); 787 } 788 public void failed(Throwable exc, Void att) { 789 readException.set(exc); 790 } 791 }); 792 if (timeout > 0L) { 793 // wait for exception 794 while (readException.get() == null) { 795 Thread.sleep(100); 796 } 797 if (!(readException.get() instanceof InterruptedByTimeoutException)) 798 throw new RuntimeException("InterruptedByTimeoutException expected", 799 readException.get()); 800 801 // after a timeout then further reading should throw unspecified runtime exception 802 boolean exceptionThrown = false; 803 try { 804 ch.read(dst); 805 } catch (RuntimeException x) { 806 exceptionThrown = true; 807 } 808 if (!exceptionThrown) 809 throw new RuntimeException("RuntimeException expected after timeout."); 810 } else { 811 Thread.sleep(1000); 812 Throwable exc = readException.get(); 813 if (exc != null) 814 throw new RuntimeException(exc); 815 } 816 817 final AtomicReference<Throwable> writeException = new AtomicReference<Throwable>(); 818 819 // write bytes to fill socket buffer 820 ch.write(genBuffer(), timeout, unit, ch, 821 new CompletionHandler<Integer,AsynchronousSocketChannel>() 822 { 823 public void completed(Integer result, AsynchronousSocketChannel ch) { 824 ch.write(genBuffer(), timeout, unit, ch, this); 825 } 826 public void failed(Throwable exc, AsynchronousSocketChannel ch) { 827 writeException.set(exc); 828 } 829 }); 830 if (timeout > 0) { 831 // wait for exception 832 while (writeException.get() == null) { 833 Thread.sleep(100); 834 } 835 if (!(writeException.get() instanceof InterruptedByTimeoutException)) 836 throw new RuntimeException("InterruptedByTimeoutException expected", 837 writeException.get()); 838 839 // after a timeout then further writing should throw unspecified runtime exception 840 boolean exceptionThrown = false; 841 try { 842 ch.write(genBuffer()); 843 } catch (RuntimeException x) { 844 exceptionThrown = true; 845 } 846 if (!exceptionThrown) 847 throw new RuntimeException("RuntimeException expected after timeout."); 848 } else { 849 Thread.sleep(1000); 850 Throwable exc = writeException.get(); 851 if (exc != null) 852 throw new RuntimeException(exc); 853 } 854 855 // clean-up 856 server.accept().close(); 857 ch.close(); 858 } 859 } 860 861 // returns ByteBuffer with random bytes 862 static ByteBuffer genBuffer() { 863 int size = 1024 + rand.nextInt(16000); 864 byte[] buf = new byte[size]; 865 rand.nextBytes(buf); 866 boolean useDirect = rand.nextBoolean(); 867 if (useDirect) { 868 ByteBuffer bb = ByteBuffer.allocateDirect(buf.length); 869 bb.put(buf); 870 bb.flip(); 871 return bb; 872 } else { 873 return ByteBuffer.wrap(buf); 874 } 875 } 876 877 // return ByteBuffer[] with random bytes 878 static ByteBuffer[] genBuffers(int max) { 879 int len = 1; 880 if (max > 1) 881 len += rand.nextInt(max); 882 ByteBuffer[] bufs = new ByteBuffer[len]; 883 for (int i=0; i<len; i++) 884 bufs[i] = genBuffer(); 885 return bufs; 886 } 887 888 // return random SocketAddress 889 static SocketAddress genSocketAddress() { 890 StringBuilder sb = new StringBuilder("10."); 891 sb.append(rand.nextInt(256)); 892 sb.append('.'); 893 sb.append(rand.nextInt(256)); 894 sb.append('.'); 895 sb.append(rand.nextInt(256)); 896 InetAddress rh; 897 try { 898 rh = InetAddress.getByName(sb.toString()); 899 } catch (UnknownHostException x) { 900 throw new InternalError("Should not happen"); 901 } 902 return new InetSocketAddress(rh, rand.nextInt(65535)+1); 903 } 904 }