1 /* 2 * Copyright (c) 2008, 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 /* @test 25 * @bug 4607272 6842687 6878369 6944810 7023403 26 * @summary Unit test for AsynchronousSocketChannel 27 * @run main Basic -skipSlowConnectTest 28 * @key randomness intermittent 29 */ 30 31 import java.io.Closeable; 32 import java.io.IOException; 33 import java.net.*; 34 import static java.net.StandardSocketOptions.*; 35 import java.nio.ByteBuffer; 36 import java.nio.channels.*; 37 import java.util.Random; 38 import java.util.Set; 39 import java.util.concurrent.*; 40 import java.util.concurrent.atomic.*; 41 42 public class Basic { 43 static final Random rand = new Random(); 44 45 static boolean skipSlowConnectTest = false; 46 47 public static void main(String[] args) throws Exception { 48 for (String arg: args) { 49 switch (arg) { 50 case "-skipSlowConnectTest" : 51 skipSlowConnectTest = true; 52 break; 53 default: 54 throw new RuntimeException("Unrecognized argument: " + arg); 55 } 56 } 57 58 testBind(); 59 testSocketOptions(); 60 testConnect(); 61 testCloseWhenPending(); 62 testCancel(); 63 testRead1(); 64 testRead2(); 65 testRead3(); 66 testWrite1(); 67 testWrite2(); 68 // skip timeout tests until 7052549 is fixed 69 if (!System.getProperty("os.name").startsWith("Windows")) 70 testTimeout(); 71 testShutdown(); 72 } 73 74 static class Server implements Closeable { 75 private final ServerSocketChannel ssc; 76 private final InetSocketAddress address; 77 78 Server() throws IOException { 79 ssc = ServerSocketChannel.open().bind(new InetSocketAddress(0)); 80 81 InetAddress lh = InetAddress.getLocalHost(); 82 int port = ((InetSocketAddress)(ssc.getLocalAddress())).getPort(); 83 address = new InetSocketAddress(lh, port); 84 } 85 86 InetSocketAddress address() { 87 return address; 88 } 89 90 SocketChannel accept() throws IOException { 91 return ssc.accept(); 92 } 93 94 public void close() throws IOException { 95 ssc.close(); 96 } 97 98 } 99 100 static void testBind() throws Exception { 101 System.out.println("-- bind --"); 102 103 try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) { 104 if (ch.getLocalAddress() != null) 105 throw new RuntimeException("Local address should be 'null'"); 106 ch.bind(new InetSocketAddress(0)); 107 108 // check local address after binding 109 InetSocketAddress local = (InetSocketAddress)ch.getLocalAddress(); 110 if (local.getPort() == 0) 111 throw new RuntimeException("Unexpected port"); 112 if (!local.getAddress().isAnyLocalAddress()) 113 throw new RuntimeException("Not bound to a wildcard address"); 114 115 // try to re-bind 116 try { 117 ch.bind(new InetSocketAddress(0)); 118 throw new RuntimeException("AlreadyBoundException expected"); 119 } catch (AlreadyBoundException x) { 120 } 121 } 122 123 // check ClosedChannelException 124 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 125 ch.close(); 126 try { 127 ch.bind(new InetSocketAddress(0)); 128 throw new RuntimeException("ClosedChannelException expected"); 129 } catch (ClosedChannelException x) { 130 } 131 } 132 133 static void testSocketOptions() throws Exception { 134 System.out.println("-- socket options --"); 135 136 try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) { 137 ch.setOption(SO_RCVBUF, 128*1024) 138 .setOption(SO_SNDBUF, 128*1024) 139 .setOption(SO_REUSEADDR, true); 140 141 // check SO_SNDBUF/SO_RCVBUF limits 142 int before, after; 143 before = ch.getOption(SO_SNDBUF); 144 after = ch.setOption(SO_SNDBUF, Integer.MAX_VALUE).getOption(SO_SNDBUF); 145 if (after < before) 146 throw new RuntimeException("setOption caused SO_SNDBUF to decrease"); 147 before = ch.getOption(SO_RCVBUF); 148 after = ch.setOption(SO_RCVBUF, Integer.MAX_VALUE).getOption(SO_RCVBUF); 149 if (after < before) 150 throw new RuntimeException("setOption caused SO_RCVBUF to decrease"); 151 152 ch.bind(new InetSocketAddress(0)); 153 154 // default values 155 if (ch.getOption(SO_KEEPALIVE)) 156 throw new RuntimeException("Default of SO_KEEPALIVE should be 'false'"); 157 if (ch.getOption(TCP_NODELAY)) 158 throw new RuntimeException("Default of TCP_NODELAY should be 'false'"); 159 160 // set and check 161 if (!ch.setOption(SO_KEEPALIVE, true).getOption(SO_KEEPALIVE)) 162 throw new RuntimeException("SO_KEEPALIVE did not change"); 163 if (!ch.setOption(TCP_NODELAY, true).getOption(TCP_NODELAY)) 164 throw new RuntimeException("SO_KEEPALIVE did not change"); 165 166 // read others (can't check as actual value is implementation dependent) 167 ch.getOption(SO_RCVBUF); 168 ch.getOption(SO_SNDBUF); 169 170 Set<SocketOption<?>> options = ch.supportedOptions(); 171 boolean reuseport = options.contains(SO_REUSEPORT); 172 if (reuseport) { 173 if (ch.getOption(SO_REUSEPORT)) 174 throw new RuntimeException("Default of SO_REUSEPORT should be 'false'"); 175 if (!ch.setOption(SO_REUSEPORT, true).getOption(SO_REUSEPORT)) 176 throw new RuntimeException("SO_REUSEPORT did not change"); 177 } 178 } 179 } 180 181 static void testConnect() throws Exception { 182 System.out.println("-- connect --"); 183 184 SocketAddress address; 185 186 try (Server server = new Server()) { 187 address = server.address(); 188 189 // connect to server and check local/remote addresses 190 try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) { 191 ch.connect(address).get(); 192 // check local address 193 if (ch.getLocalAddress() == null) 194 throw new RuntimeException("Not bound to local address"); 195 196 // check remote address 197 InetSocketAddress remote = (InetSocketAddress)ch.getRemoteAddress(); 198 if (remote.getPort() != server.address().getPort()) 199 throw new RuntimeException("Connected to unexpected port"); 200 if (!remote.getAddress().equals(server.address().getAddress())) 201 throw new RuntimeException("Connected to unexpected address"); 202 203 // try to connect again 204 try { 205 ch.connect(server.address()).get(); 206 throw new RuntimeException("AlreadyConnectedException expected"); 207 } catch (AlreadyConnectedException x) { 208 } 209 210 // clean-up 211 server.accept().close(); 212 } 213 214 // check that connect fails with ClosedChannelException 215 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 216 ch.close(); 217 try { 218 ch.connect(server.address()).get(); 219 throw new RuntimeException("ExecutionException expected"); 220 } catch (ExecutionException x) { 221 if (!(x.getCause() instanceof ClosedChannelException)) 222 throw new RuntimeException("Cause of ClosedChannelException expected", 223 x.getCause()); 224 } 225 final AtomicReference<Throwable> connectException = new AtomicReference<>(); 226 ch.connect(server.address(), (Void)null, new CompletionHandler<Void,Void>() { 227 public void completed(Void result, Void att) { 228 } 229 public void failed(Throwable exc, Void att) { 230 connectException.set(exc); 231 } 232 }); 233 while (connectException.get() == null) { 234 Thread.sleep(100); 235 } 236 if (!(connectException.get() instanceof ClosedChannelException)) 237 throw new RuntimeException("ClosedChannelException expected", 238 connectException.get()); 239 } 240 241 // test that failure to connect closes the channel 242 try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) { 243 try { 244 ch.connect(address).get(); 245 } catch (ExecutionException x) { 246 // failed to establish connection 247 if (ch.isOpen()) 248 throw new RuntimeException("Channel should be closed"); 249 } 250 } 251 252 // repeat test by connecting to a (probably) non-existent host. This 253 // improves the chance that the connect will not fail immediately. 254 if (!skipSlowConnectTest) { 255 try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) { 256 try { 257 ch.connect(genSocketAddress()).get(); 258 } catch (ExecutionException x) { 259 // failed to establish connection 260 if (ch.isOpen()) 261 throw new RuntimeException("Channel should be closed"); 262 } 263 } 264 } 265 } 266 267 static void testCloseWhenPending() throws Exception { 268 System.out.println("-- asynchronous close when connecting --"); 269 270 AsynchronousSocketChannel ch; 271 272 // asynchronous close while connecting 273 ch = AsynchronousSocketChannel.open(); 274 Future<Void> connectResult = ch.connect(genSocketAddress()); 275 276 // give time to initiate the connect (SYN) 277 Thread.sleep(50); 278 279 // close 280 ch.close(); 281 282 // check that exception is thrown in timely manner 283 try { 284 connectResult.get(5, TimeUnit.SECONDS); 285 } catch (TimeoutException x) { 286 throw new RuntimeException("AsynchronousCloseException not thrown"); 287 } catch (ExecutionException x) { 288 // expected 289 } 290 291 System.out.println("-- asynchronous close when reading --"); 292 293 try (Server server = new Server()) { 294 ch = AsynchronousSocketChannel.open(); 295 ch.connect(server.address()).get(); 296 297 ByteBuffer dst = ByteBuffer.allocateDirect(100); 298 Future<Integer> result = ch.read(dst); 299 300 // attempt a second read - should fail with ReadPendingException 301 ByteBuffer buf = ByteBuffer.allocateDirect(100); 302 try { 303 ch.read(buf); 304 throw new RuntimeException("ReadPendingException expected"); 305 } catch (ReadPendingException x) { 306 } 307 308 // close channel (should cause initial read to complete) 309 ch.close(); 310 server.accept().close(); 311 312 // check that AsynchronousCloseException is thrown 313 try { 314 result.get(); 315 throw new RuntimeException("Should not read"); 316 } catch (ExecutionException x) { 317 if (!(x.getCause() instanceof AsynchronousCloseException)) 318 throw new RuntimeException(x); 319 } 320 321 System.out.println("-- asynchronous close when writing --"); 322 323 ch = AsynchronousSocketChannel.open(); 324 ch.connect(server.address()).get(); 325 326 final AtomicReference<Throwable> writeException = 327 new AtomicReference<Throwable>(); 328 329 // write bytes to fill socket buffer 330 final AtomicInteger numCompleted = new AtomicInteger(); 331 ch.write(genBuffer(), ch, new CompletionHandler<Integer,AsynchronousSocketChannel>() { 332 public void completed(Integer result, AsynchronousSocketChannel ch) { 333 numCompleted.incrementAndGet(); 334 ch.write(genBuffer(), ch, this); 335 } 336 public void failed(Throwable x, AsynchronousSocketChannel ch) { 337 writeException.set(x); 338 } 339 }); 340 341 // give time for socket buffer to fill up - 342 // take pauses until the handler is no longer being invoked 343 // because all writes are being pended which guarantees that 344 // the internal channel state indicates it is writing 345 int prevNumCompleted = numCompleted.get(); 346 do { 347 Thread.sleep(1000); 348 if (numCompleted.get() == prevNumCompleted) { 349 break; 350 } 351 prevNumCompleted = numCompleted.get(); 352 } while (true); 353 354 // attempt a concurrent write - should fail with WritePendingException 355 try { 356 ch.write(genBuffer()); 357 throw new RuntimeException("WritePendingException expected"); 358 } catch (WritePendingException x) { 359 } 360 361 // close channel - should cause initial write to complete 362 ch.close(); 363 server.accept().close(); 364 365 // wait for exception 366 while (writeException.get() == null) { 367 Thread.sleep(100); 368 } 369 if (!(writeException.get() instanceof AsynchronousCloseException)) 370 throw new RuntimeException("AsynchronousCloseException expected", 371 writeException.get()); 372 } 373 } 374 375 static void testCancel() throws Exception { 376 System.out.println("-- cancel --"); 377 378 try (Server server = new Server()) { 379 for (int i=0; i<2; i++) { 380 boolean mayInterruptIfRunning = (i == 0) ? false : true; 381 382 // establish loopback connection 383 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 384 ch.connect(server.address()).get(); 385 SocketChannel peer = server.accept(); 386 387 // start read operation 388 ByteBuffer buf = ByteBuffer.allocate(1); 389 Future<Integer> res = ch.read(buf); 390 391 // cancel operation 392 boolean cancelled = res.cancel(mayInterruptIfRunning); 393 394 // check post-conditions 395 if (!res.isDone()) 396 throw new RuntimeException("isDone should return true"); 397 if (res.isCancelled() != cancelled) 398 throw new RuntimeException("isCancelled not consistent"); 399 try { 400 res.get(); 401 throw new RuntimeException("CancellationException expected"); 402 } catch (CancellationException x) { 403 } 404 try { 405 res.get(1, TimeUnit.SECONDS); 406 throw new RuntimeException("CancellationException expected"); 407 } catch (CancellationException x) { 408 } 409 410 // check that the cancel doesn't impact writing to the channel 411 if (!mayInterruptIfRunning) { 412 buf = ByteBuffer.wrap("a".getBytes()); 413 ch.write(buf).get(); 414 } 415 416 ch.close(); 417 peer.close(); 418 } 419 } 420 } 421 422 static void testRead1() throws Exception { 423 System.out.println("-- read (1) --"); 424 425 try (Server server = new Server()) { 426 final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 427 ch.connect(server.address()).get(); 428 429 // read with 0 bytes remaining should complete immediately 430 ByteBuffer buf = ByteBuffer.allocate(1); 431 buf.put((byte)0); 432 int n = ch.read(buf).get(); 433 if (n != 0) 434 throw new RuntimeException("0 expected"); 435 436 // write bytes and close connection 437 ByteBuffer src = genBuffer(); 438 try (SocketChannel sc = server.accept()) { 439 sc.setOption(SO_SNDBUF, src.remaining()); 440 while (src.hasRemaining()) 441 sc.write(src); 442 } 443 444 // reads should complete immediately 445 final ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity() + 100); 446 final CountDownLatch latch = new CountDownLatch(1); 447 ch.read(dst, (Void)null, new CompletionHandler<Integer,Void>() { 448 public void completed(Integer result, Void att) { 449 int n = result; 450 if (n > 0) { 451 ch.read(dst, (Void)null, this); 452 } else { 453 latch.countDown(); 454 } 455 } 456 public void failed(Throwable exc, Void att) { 457 } 458 }); 459 460 latch.await(); 461 462 // check buffers 463 src.flip(); 464 dst.flip(); 465 if (!src.equals(dst)) { 466 throw new RuntimeException("Contents differ"); 467 } 468 469 // close channel 470 ch.close(); 471 472 // check read fails with ClosedChannelException 473 try { 474 ch.read(dst).get(); 475 throw new RuntimeException("ExecutionException expected"); 476 } catch (ExecutionException x) { 477 if (!(x.getCause() instanceof ClosedChannelException)) 478 throw new RuntimeException("Cause of ClosedChannelException expected", 479 x.getCause()); 480 } 481 } 482 } 483 484 static void testRead2() throws Exception { 485 System.out.println("-- read (2) --"); 486 487 try (Server server = new Server()) { 488 final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 489 ch.connect(server.address()).get(); 490 SocketChannel sc = server.accept(); 491 492 ByteBuffer src = genBuffer(); 493 494 // read until the buffer is full 495 final ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity()); 496 final CountDownLatch latch = new CountDownLatch(1); 497 ch.read(dst, (Void)null, new CompletionHandler<Integer,Void>() { 498 public void completed(Integer result, Void att) { 499 if (dst.hasRemaining()) { 500 ch.read(dst, (Void)null, this); 501 } else { 502 latch.countDown(); 503 } 504 } 505 public void failed(Throwable exc, Void att) { 506 } 507 }); 508 509 // trickle the writing 510 do { 511 int rem = src.remaining(); 512 int size = (rem <= 100) ? rem : 50 + rand.nextInt(rem - 100); 513 ByteBuffer buf = ByteBuffer.allocate(size); 514 for (int i=0; i<size; i++) 515 buf.put(src.get()); 516 buf.flip(); 517 Thread.sleep(50 + rand.nextInt(1500)); 518 while (buf.hasRemaining()) 519 sc.write(buf); 520 } while (src.hasRemaining()); 521 522 // wait until ascynrhonous reading has completed 523 latch.await(); 524 525 // check buffers 526 src.flip(); 527 dst.flip(); 528 if (!src.equals(dst)) { 529 throw new RuntimeException("Contents differ"); 530 } 531 532 sc.close(); 533 ch.close(); 534 } 535 } 536 537 // exercise scattering read 538 static void testRead3() throws Exception { 539 System.out.println("-- read (3) --"); 540 541 try (Server server = new Server()) { 542 final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 543 ch.connect(server.address()).get(); 544 SocketChannel sc = server.accept(); 545 546 ByteBuffer[] dsts = new ByteBuffer[3]; 547 for (int i=0; i<dsts.length; i++) { 548 dsts[i] = ByteBuffer.allocateDirect(100); 549 } 550 551 // scattering read that completes ascynhronously 552 final CountDownLatch l1 = new CountDownLatch(1); 553 ch.read(dsts, 0, dsts.length, 0L, TimeUnit.SECONDS, (Void)null, 554 new CompletionHandler<Long,Void>() { 555 public void completed(Long result, Void att) { 556 long n = result; 557 if (n <= 0) 558 throw new RuntimeException("No bytes read"); 559 l1.countDown(); 560 } 561 public void failed(Throwable exc, Void att) { 562 } 563 }); 564 565 // write some bytes 566 sc.write(genBuffer()); 567 568 // read should now complete 569 l1.await(); 570 571 // write more bytes 572 sc.write(genBuffer()); 573 574 // read should complete immediately 575 for (int i=0; i<dsts.length; i++) { 576 dsts[i].rewind(); 577 } 578 579 final CountDownLatch l2 = new CountDownLatch(1); 580 ch.read(dsts, 0, dsts.length, 0L, TimeUnit.SECONDS, (Void)null, 581 new CompletionHandler<Long,Void>() { 582 public void completed(Long result, Void att) { 583 long n = result; 584 if (n <= 0) 585 throw new RuntimeException("No bytes read"); 586 l2.countDown(); 587 } 588 public void failed(Throwable exc, Void att) { 589 } 590 }); 591 l2.await(); 592 593 ch.close(); 594 sc.close(); 595 } 596 } 597 598 static void testWrite1() throws Exception { 599 System.out.println("-- write (1) --"); 600 601 try (Server server = new Server()) { 602 final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 603 ch.connect(server.address()).get(); 604 SocketChannel sc = server.accept(); 605 606 // write with 0 bytes remaining should complete immediately 607 ByteBuffer buf = ByteBuffer.allocate(1); 608 buf.put((byte)0); 609 int n = ch.write(buf).get(); 610 if (n != 0) 611 throw new RuntimeException("0 expected"); 612 613 // write all bytes and close connection when done 614 final ByteBuffer src = genBuffer(); 615 ch.write(src, (Void)null, new CompletionHandler<Integer,Void>() { 616 public void completed(Integer result, Void att) { 617 if (src.hasRemaining()) { 618 ch.write(src, (Void)null, this); 619 } else { 620 try { 621 ch.close(); 622 } catch (IOException ignore) { } 623 } 624 } 625 public void failed(Throwable exc, Void att) { 626 } 627 }); 628 629 // read to EOF or buffer full 630 ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity() + 100); 631 do { 632 n = sc.read(dst); 633 } while (n > 0); 634 sc.close(); 635 636 // check buffers 637 src.flip(); 638 dst.flip(); 639 if (!src.equals(dst)) { 640 throw new RuntimeException("Contents differ"); 641 } 642 643 // check write fails with ClosedChannelException 644 try { 645 ch.read(dst).get(); 646 throw new RuntimeException("ExecutionException expected"); 647 } catch (ExecutionException x) { 648 if (!(x.getCause() instanceof ClosedChannelException)) 649 throw new RuntimeException("Cause of ClosedChannelException expected", 650 x.getCause()); 651 } 652 } 653 } 654 655 // exercise gathering write 656 static void testWrite2() throws Exception { 657 System.out.println("-- write (2) --"); 658 659 try (Server server = new Server()) { 660 final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 661 ch.connect(server.address()).get(); 662 SocketChannel sc = server.accept(); 663 664 // number of bytes written 665 final AtomicLong bytesWritten = new AtomicLong(0); 666 667 // write buffers (should complete immediately) 668 ByteBuffer[] srcs = genBuffers(1); 669 final CountDownLatch l1 = new CountDownLatch(1); 670 ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, (Void)null, 671 new CompletionHandler<Long,Void>() { 672 public void completed(Long result, Void att) { 673 long n = result; 674 if (n <= 0) 675 throw new RuntimeException("No bytes read"); 676 bytesWritten.addAndGet(n); 677 l1.countDown(); 678 } 679 public void failed(Throwable exc, Void att) { 680 } 681 }); 682 l1.await(); 683 684 // set to true to signal that no more buffers should be written 685 final AtomicBoolean continueWriting = new AtomicBoolean(true); 686 687 // write until socket buffer is full so as to create the conditions 688 // for when a write does not complete immediately 689 srcs = genBuffers(1); 690 ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, (Void)null, 691 new CompletionHandler<Long,Void>() { 692 public void completed(Long result, Void att) { 693 long n = result; 694 if (n <= 0) 695 throw new RuntimeException("No bytes written"); 696 bytesWritten.addAndGet(n); 697 if (continueWriting.get()) { 698 ByteBuffer[] srcs = genBuffers(8); 699 ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, 700 (Void)null, this); 701 } 702 } 703 public void failed(Throwable exc, Void att) { 704 } 705 }); 706 707 // give time for socket buffer to fill up. 708 Thread.sleep(5*1000); 709 710 // signal handler to stop further writing 711 continueWriting.set(false); 712 713 // read until done 714 ByteBuffer buf = ByteBuffer.allocateDirect(4096); 715 long total = 0L; 716 do { 717 int n = sc.read(buf); 718 if (n <= 0) 719 throw new RuntimeException("No bytes read"); 720 buf.rewind(); 721 total += n; 722 } while (total < bytesWritten.get()); 723 724 ch.close(); 725 sc.close(); 726 } 727 } 728 729 static void testShutdown() throws Exception { 730 System.out.println("-- shutdown --"); 731 732 try (Server server = new Server(); 733 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) 734 { 735 ch.connect(server.address()).get(); 736 try (SocketChannel peer = server.accept()) { 737 ByteBuffer buf = ByteBuffer.allocateDirect(1000); 738 int n; 739 740 // check read 741 ch.shutdownInput(); 742 n = ch.read(buf).get(); 743 if (n != -1) 744 throw new RuntimeException("-1 expected"); 745 // check full with full buffer 746 buf.put(new byte[100]); 747 n = ch.read(buf).get(); 748 if (n != -1) 749 throw new RuntimeException("-1 expected"); 750 751 // check write 752 ch.shutdownOutput(); 753 try { 754 ch.write(buf).get(); 755 throw new RuntimeException("ClosedChannelException expected"); 756 } catch (ExecutionException x) { 757 if (!(x.getCause() instanceof ClosedChannelException)) 758 throw new RuntimeException("ClosedChannelException expected", 759 x.getCause()); 760 } 761 } 762 } 763 } 764 765 static void testTimeout() throws Exception { 766 System.out.println("-- timeouts --"); 767 testTimeout(Integer.MIN_VALUE, TimeUnit.SECONDS); 768 testTimeout(-1L, TimeUnit.SECONDS); 769 testTimeout(0L, TimeUnit.SECONDS); 770 testTimeout(2L, TimeUnit.SECONDS); 771 } 772 773 static void testTimeout(final long timeout, final TimeUnit unit) throws Exception { 774 try (Server server = new Server()) { 775 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 776 ch.connect(server.address()).get(); 777 778 ByteBuffer dst = ByteBuffer.allocate(512); 779 780 final AtomicReference<Throwable> readException = new AtomicReference<Throwable>(); 781 782 // this read should timeout if value is > 0 783 ch.read(dst, timeout, unit, null, new CompletionHandler<Integer,Void>() { 784 public void completed(Integer result, Void att) { 785 readException.set(new RuntimeException("Should not complete")); 786 } 787 public void failed(Throwable exc, Void att) { 788 readException.set(exc); 789 } 790 }); 791 if (timeout > 0L) { 792 // wait for exception 793 while (readException.get() == null) { 794 Thread.sleep(100); 795 } 796 if (!(readException.get() instanceof InterruptedByTimeoutException)) 797 throw new RuntimeException("InterruptedByTimeoutException expected", 798 readException.get()); 799 800 // after a timeout then further reading should throw unspecified runtime exception 801 boolean exceptionThrown = false; 802 try { 803 ch.read(dst); 804 } catch (RuntimeException x) { 805 exceptionThrown = true; 806 } 807 if (!exceptionThrown) 808 throw new RuntimeException("RuntimeException expected after timeout."); 809 } else { 810 Thread.sleep(1000); 811 Throwable exc = readException.get(); 812 if (exc != null) 813 throw new RuntimeException(exc); 814 } 815 816 final AtomicReference<Throwable> writeException = new AtomicReference<Throwable>(); 817 818 // write bytes to fill socket buffer 819 ch.write(genBuffer(), timeout, unit, ch, 820 new CompletionHandler<Integer,AsynchronousSocketChannel>() 821 { 822 public void completed(Integer result, AsynchronousSocketChannel ch) { 823 ch.write(genBuffer(), timeout, unit, ch, this); 824 } 825 public void failed(Throwable exc, AsynchronousSocketChannel ch) { 826 writeException.set(exc); 827 } 828 }); 829 if (timeout > 0) { 830 // wait for exception 831 while (writeException.get() == null) { 832 Thread.sleep(100); 833 } 834 if (!(writeException.get() instanceof InterruptedByTimeoutException)) 835 throw new RuntimeException("InterruptedByTimeoutException expected", 836 writeException.get()); 837 838 // after a timeout then further writing should throw unspecified runtime exception 839 boolean exceptionThrown = false; 840 try { 841 ch.write(genBuffer()); 842 } catch (RuntimeException x) { 843 exceptionThrown = true; 844 } 845 if (!exceptionThrown) 846 throw new RuntimeException("RuntimeException expected after timeout."); 847 } else { 848 Thread.sleep(1000); 849 Throwable exc = writeException.get(); 850 if (exc != null) 851 throw new RuntimeException(exc); 852 } 853 854 // clean-up 855 server.accept().close(); 856 ch.close(); 857 } 858 } 859 860 // returns ByteBuffer with random bytes 861 static ByteBuffer genBuffer() { 862 int size = 1024 + rand.nextInt(16000); 863 byte[] buf = new byte[size]; 864 rand.nextBytes(buf); 865 boolean useDirect = rand.nextBoolean(); 866 if (useDirect) { 867 ByteBuffer bb = ByteBuffer.allocateDirect(buf.length); 868 bb.put(buf); 869 bb.flip(); 870 return bb; 871 } else { 872 return ByteBuffer.wrap(buf); 873 } 874 } 875 876 // return ByteBuffer[] with random bytes 877 static ByteBuffer[] genBuffers(int max) { 878 int len = 1; 879 if (max > 1) 880 len += rand.nextInt(max); 881 ByteBuffer[] bufs = new ByteBuffer[len]; 882 for (int i=0; i<len; i++) 883 bufs[i] = genBuffer(); 884 return bufs; 885 } 886 887 // return random SocketAddress 888 static SocketAddress genSocketAddress() { 889 StringBuilder sb = new StringBuilder("10."); 890 sb.append(rand.nextInt(256)); 891 sb.append('.'); 892 sb.append(rand.nextInt(256)); 893 sb.append('.'); 894 sb.append(rand.nextInt(256)); 895 InetAddress rh; 896 try { 897 rh = InetAddress.getByName(sb.toString()); 898 } catch (UnknownHostException x) { 899 throw new InternalError("Should not happen"); 900 } 901 return new InetSocketAddress(rh, rand.nextInt(65535)+1); 902 } 903 }