1 /*
   2  * Copyright (c) 2008, 2011, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 /* @test
  25  * @bug 4607272 6842687 6878369 6944810 7023403
  26  * @summary Unit test for AsynchronousSocketChannel
  27  * @run main Basic -skipSlowConnectTest
  28  */
  29 
  30 import java.nio.ByteBuffer;
  31 import java.nio.channels.*;
  32 import static java.net.StandardSocketOptions.*;
  33 import java.net.*;
  34 import java.util.Random;
  35 import java.util.concurrent.*;
  36 import java.util.concurrent.atomic.*;
  37 import java.io.Closeable;
  38 import java.io.IOException;
  39 
  40 public class Basic {
  41     static final Random rand = new Random();
  42 
  43     static boolean skipSlowConnectTest = false;
  44 
  45     public static void main(String[] args) throws Exception {
  46         for (String arg: args) {
  47             switch (arg) {
  48             case "-skipSlowConnectTest" :
  49                 skipSlowConnectTest = true;
  50                 break;
  51             default:
  52                 throw new RuntimeException("Unrecognized argument: " + arg);
  53             }
  54         }
  55 
  56         testBind();
  57         testSocketOptions();
  58         testConnect();
  59         testCloseWhenPending();
  60         testCancel();
  61         testRead1();
  62         testRead2();
  63         testRead3();
  64         testWrite1();
  65         testWrite2();
  66         // skip timeout tests until 7052549 is fixed
  67         if (!System.getProperty("os.name").startsWith("Windows"))
  68             testTimeout();
  69         testShutdown();
  70     }
  71 
  72     static class Server implements Closeable {
  73         private final ServerSocketChannel ssc;
  74         private final InetSocketAddress address;
  75 
  76         Server() throws IOException {
  77             ssc = ServerSocketChannel.open().bind(new InetSocketAddress(0));
  78 
  79             InetAddress lh = InetAddress.getLocalHost();
  80             int port = ((InetSocketAddress)(ssc.getLocalAddress())).getPort();
  81             address = new InetSocketAddress(lh, port);
  82         }
  83 
  84         InetSocketAddress address() {
  85             return address;
  86         }
  87 
  88         SocketChannel accept() throws IOException {
  89             return ssc.accept();
  90         }
  91 
  92         public void close() throws IOException {
  93             ssc.close();
  94         }
  95 
  96     }
  97 
  98     static void testBind() throws Exception {
  99         System.out.println("-- bind --");
 100 
 101         try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
 102             if (ch.getLocalAddress() != null)
 103                 throw new RuntimeException("Local address should be 'null'");
 104             ch.bind(new InetSocketAddress(0));
 105 
 106             // check local address after binding
 107             InetSocketAddress local = (InetSocketAddress)ch.getLocalAddress();
 108             if (local.getPort() == 0)
 109                 throw new RuntimeException("Unexpected port");
 110             if (!local.getAddress().isAnyLocalAddress())
 111                 throw new RuntimeException("Not bound to a wildcard address");
 112 
 113             // try to re-bind
 114             try {
 115                 ch.bind(new InetSocketAddress(0));
 116                 throw new RuntimeException("AlreadyBoundException expected");
 117             } catch (AlreadyBoundException x) {
 118             }
 119         }
 120 
 121         // check ClosedChannelException
 122         AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 123         ch.close();
 124         try {
 125             ch.bind(new InetSocketAddress(0));
 126             throw new RuntimeException("ClosedChannelException  expected");
 127         } catch (ClosedChannelException  x) {
 128         }
 129     }
 130 
 131     static void testSocketOptions() throws Exception {
 132         System.out.println("-- socket options --");
 133 
 134         try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
 135             ch.setOption(SO_RCVBUF, 128*1024)
 136               .setOption(SO_SNDBUF, 128*1024)
 137               .setOption(SO_REUSEADDR, true);
 138 
 139             // check SO_SNDBUF/SO_RCVBUF limits
 140             int before, after;
 141             before = ch.getOption(SO_SNDBUF);
 142             after = ch.setOption(SO_SNDBUF, Integer.MAX_VALUE).getOption(SO_SNDBUF);
 143             if (after < before)
 144                 throw new RuntimeException("setOption caused SO_SNDBUF to decrease");
 145             before = ch.getOption(SO_RCVBUF);
 146             after = ch.setOption(SO_RCVBUF, Integer.MAX_VALUE).getOption(SO_RCVBUF);
 147             if (after < before)
 148                 throw new RuntimeException("setOption caused SO_RCVBUF to decrease");
 149 
 150             ch.bind(new InetSocketAddress(0));
 151 
 152             // default values
 153             if (ch.getOption(SO_KEEPALIVE))
 154                 throw new RuntimeException("Default of SO_KEEPALIVE should be 'false'");
 155             if (ch.getOption(TCP_NODELAY))
 156                 throw new RuntimeException("Default of TCP_NODELAY should be 'false'");
 157 
 158             // set and check
 159             if (!ch.setOption(SO_KEEPALIVE, true).getOption(SO_KEEPALIVE))
 160                 throw new RuntimeException("SO_KEEPALIVE did not change");
 161             if (!ch.setOption(TCP_NODELAY, true).getOption(TCP_NODELAY))
 162                 throw new RuntimeException("SO_KEEPALIVE did not change");
 163 
 164             // read others (can't check as actual value is implementation dependent)
 165             ch.getOption(SO_RCVBUF);
 166             ch.getOption(SO_SNDBUF);
 167         }
 168     }
 169 
 170     static void testConnect() throws Exception {
 171         System.out.println("-- connect --");
 172 
 173         SocketAddress address;
 174 
 175         try (Server server = new Server()) {
 176             address = server.address();
 177 
 178             // connect to server and check local/remote addresses
 179             try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
 180                 ch.connect(address).get();
 181                 // check local address
 182                 if (ch.getLocalAddress() == null)
 183                     throw new RuntimeException("Not bound to local address");
 184 
 185                 // check remote address
 186                 InetSocketAddress remote = (InetSocketAddress)ch.getRemoteAddress();
 187                 if (remote.getPort() != server.address().getPort())
 188                     throw new RuntimeException("Connected to unexpected port");
 189                 if (!remote.getAddress().equals(server.address().getAddress()))
 190                     throw new RuntimeException("Connected to unexpected address");
 191 
 192                 // try to connect again
 193                 try {
 194                     ch.connect(server.address()).get();
 195                     throw new RuntimeException("AlreadyConnectedException expected");
 196                 } catch (AlreadyConnectedException x) {
 197                 }
 198 
 199                 // clean-up
 200                 server.accept().close();
 201             }
 202 
 203             // check that connect fails with ClosedChannelException
 204             AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 205             ch.close();
 206             try {
 207                 ch.connect(server.address()).get();
 208                 throw new RuntimeException("ExecutionException expected");
 209             } catch (ExecutionException x) {
 210                 if (!(x.getCause() instanceof ClosedChannelException))
 211                     throw new RuntimeException("Cause of ClosedChannelException expected");
 212             }
 213             final AtomicReference<Throwable> connectException = new AtomicReference<>();
 214             ch.connect(server.address(), (Void)null, new CompletionHandler<Void,Void>() {
 215                 public void completed(Void result, Void att) {
 216                 }
 217                 public void failed(Throwable exc, Void att) {
 218                     connectException.set(exc);
 219                 }
 220             });
 221             while (connectException.get() == null) {
 222                 Thread.sleep(100);
 223             }
 224             if (!(connectException.get() instanceof ClosedChannelException))
 225                 throw new RuntimeException("ClosedChannelException expected");
 226         }
 227 
 228         // test that failure to connect closes the channel
 229         try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
 230             try {
 231                 ch.connect(address).get();
 232             } catch (ExecutionException x) {
 233                 // failed to establish connection
 234                 if (ch.isOpen())
 235                     throw new RuntimeException("Channel should be closed");
 236             }
 237         }
 238 
 239         // repeat test by connecting to a (probably) non-existent host. This
 240         // improves the chance that the connect will not fail immediately.
 241         if (!skipSlowConnectTest) {
 242             try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
 243                 try {
 244                     ch.connect(genSocketAddress()).get();
 245                 } catch (ExecutionException x) {
 246                     // failed to establish connection
 247                     if (ch.isOpen())
 248                         throw new RuntimeException("Channel should be closed");
 249                 }
 250             }
 251         }
 252     }
 253 
 254     static void testCloseWhenPending() throws Exception {
 255         System.out.println("-- asynchronous close when connecting --");
 256 
 257         AsynchronousSocketChannel ch;
 258 
 259         // asynchronous close while connecting
 260         ch = AsynchronousSocketChannel.open();
 261         Future<Void> connectResult = ch.connect(genSocketAddress());
 262 
 263         // give time to initiate the connect (SYN)
 264         Thread.sleep(50);
 265 
 266         // close
 267         ch.close();
 268 
 269         // check that exception is thrown in timely manner
 270         try {
 271             connectResult.get(5, TimeUnit.SECONDS);
 272         } catch (TimeoutException x) {
 273             throw new RuntimeException("AsynchronousCloseException not thrown");
 274         } catch (ExecutionException x) {
 275             // expected
 276         }
 277 
 278         System.out.println("-- asynchronous close when reading --");
 279 
 280         try (Server server = new Server()) {
 281             ch = AsynchronousSocketChannel.open();
 282             ch.connect(server.address()).get();
 283 
 284             ByteBuffer dst = ByteBuffer.allocateDirect(100);
 285             Future<Integer> result = ch.read(dst);
 286 
 287             // attempt a second read - should fail with ReadPendingException
 288             ByteBuffer buf = ByteBuffer.allocateDirect(100);
 289             try {
 290                 ch.read(buf);
 291                 throw new RuntimeException("ReadPendingException expected");
 292             } catch (ReadPendingException x) {
 293             }
 294 
 295             // close channel (should cause initial read to complete)
 296             ch.close();
 297             server.accept().close();
 298 
 299             // check that AsynchronousCloseException is thrown
 300             try {
 301                 result.get();
 302                 throw new RuntimeException("Should not read");
 303             } catch (ExecutionException x) {
 304                 if (!(x.getCause() instanceof AsynchronousCloseException))
 305                     throw new RuntimeException(x);
 306             }
 307 
 308             System.out.println("-- asynchronous close when writing --");
 309 
 310             ch = AsynchronousSocketChannel.open();
 311             ch.connect(server.address()).get();
 312 
 313             final AtomicReference<Throwable> writeException =
 314                 new AtomicReference<Throwable>();
 315 
 316             // write bytes to fill socket buffer
 317             ch.write(genBuffer(), ch, new CompletionHandler<Integer,AsynchronousSocketChannel>() {
 318                 public void completed(Integer result, AsynchronousSocketChannel ch) {
 319                     ch.write(genBuffer(), ch, this);
 320                 }
 321                 public void failed(Throwable x, AsynchronousSocketChannel ch) {
 322                     writeException.set(x);
 323                 }
 324             });
 325 
 326             // give time for socket buffer to fill up.
 327             Thread.sleep(5*1000);
 328 
 329             //  attempt a concurrent write - should fail with WritePendingException
 330             try {
 331                 ch.write(genBuffer());
 332                 throw new RuntimeException("WritePendingException expected");
 333             } catch (WritePendingException x) {
 334             }
 335 
 336             // close channel - should cause initial write to complete
 337             ch.close();
 338             server.accept().close();
 339 
 340             // wait for exception
 341             while (writeException.get() == null) {
 342                 Thread.sleep(100);
 343             }
 344             if (!(writeException.get() instanceof AsynchronousCloseException))
 345                 throw new RuntimeException("AsynchronousCloseException expected");
 346         }
 347     }
 348 
 349     static void testCancel() throws Exception {
 350         System.out.println("-- cancel --");
 351 
 352         try (Server server = new Server()) {
 353             for (int i=0; i<2; i++) {
 354                 boolean mayInterruptIfRunning = (i == 0) ? false : true;
 355 
 356                 // establish loopback connection
 357                 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 358                 ch.connect(server.address()).get();
 359                 SocketChannel peer = server.accept();
 360 
 361                 // start read operation
 362                 ByteBuffer buf = ByteBuffer.allocate(1);
 363                 Future<Integer> res = ch.read(buf);
 364 
 365                 // cancel operation
 366                 boolean cancelled = res.cancel(mayInterruptIfRunning);
 367 
 368                 // check post-conditions
 369                 if (!res.isDone())
 370                     throw new RuntimeException("isDone should return true");
 371                 if (res.isCancelled() != cancelled)
 372                     throw new RuntimeException("isCancelled not consistent");
 373                 try {
 374                     res.get();
 375                     throw new RuntimeException("CancellationException expected");
 376                 } catch (CancellationException x) {
 377                 }
 378                 try {
 379                     res.get(1, TimeUnit.SECONDS);
 380                     throw new RuntimeException("CancellationException expected");
 381                 } catch (CancellationException x) {
 382                 }
 383 
 384                 // check that the cancel doesn't impact writing to the channel
 385                 if (!mayInterruptIfRunning) {
 386                     buf = ByteBuffer.wrap("a".getBytes());
 387                     ch.write(buf).get();
 388                 }
 389 
 390                 ch.close();
 391                 peer.close();
 392             }
 393         }
 394     }
 395 
 396     static void testRead1() throws Exception {
 397         System.out.println("-- read (1) --");
 398 
 399         try (Server server = new Server()) {
 400             final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 401             ch.connect(server.address()).get();
 402 
 403             // read with 0 bytes remaining should complete immediately
 404             ByteBuffer buf = ByteBuffer.allocate(1);
 405             buf.put((byte)0);
 406             int n = ch.read(buf).get();
 407             if (n != 0)
 408                 throw new RuntimeException("0 expected");
 409 
 410             // write bytes and close connection
 411             ByteBuffer src = genBuffer();
 412             try (SocketChannel sc = server.accept()) {
 413                 sc.setOption(SO_SNDBUF, src.remaining());
 414                 while (src.hasRemaining())
 415                     sc.write(src);
 416             }
 417 
 418             // reads should complete immediately
 419             final ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity() + 100);
 420             final CountDownLatch latch = new CountDownLatch(1);
 421             ch.read(dst, (Void)null, new CompletionHandler<Integer,Void>() {
 422                 public void completed(Integer result, Void att) {
 423                     int n = result;
 424                     if (n > 0) {
 425                         ch.read(dst, (Void)null, this);
 426                     } else {
 427                         latch.countDown();
 428                     }
 429                 }
 430                 public void failed(Throwable exc, Void att) {
 431                 }
 432             });
 433 
 434             latch.await();
 435 
 436             // check buffers
 437             src.flip();
 438             dst.flip();
 439             if (!src.equals(dst)) {
 440                 throw new RuntimeException("Contents differ");
 441             }
 442 
 443             // close channel
 444             ch.close();
 445 
 446             // check read fails with ClosedChannelException
 447             try {
 448                 ch.read(dst).get();
 449                 throw new RuntimeException("ExecutionException expected");
 450             } catch (ExecutionException x) {
 451                 if (!(x.getCause() instanceof ClosedChannelException))
 452                     throw new RuntimeException("Cause of ClosedChannelException expected");
 453             }
 454         }
 455     }
 456 
 457     static void testRead2() throws Exception {
 458         System.out.println("-- read (2) --");
 459 
 460         try (Server server = new Server()) {
 461             final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 462             ch.connect(server.address()).get();
 463             SocketChannel sc = server.accept();
 464 
 465             ByteBuffer src = genBuffer();
 466 
 467             // read until the buffer is full
 468             final ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity());
 469             final CountDownLatch latch = new CountDownLatch(1);
 470             ch.read(dst, (Void)null, new CompletionHandler<Integer,Void>() {
 471                 public void completed(Integer result, Void att) {
 472                     if (dst.hasRemaining()) {
 473                         ch.read(dst, (Void)null, this);
 474                     } else {
 475                         latch.countDown();
 476                     }
 477                 }
 478                 public void failed(Throwable exc, Void att) {
 479                 }
 480             });
 481 
 482             // trickle the writing
 483             do {
 484                 int rem = src.remaining();
 485                 int size = (rem <= 100) ? rem : 50 + rand.nextInt(rem - 100);
 486                 ByteBuffer buf = ByteBuffer.allocate(size);
 487                 for (int i=0; i<size; i++)
 488                     buf.put(src.get());
 489                 buf.flip();
 490                 Thread.sleep(50 + rand.nextInt(1500));
 491                 while (buf.hasRemaining())
 492                     sc.write(buf);
 493             } while (src.hasRemaining());
 494 
 495             // wait until ascynrhonous reading has completed
 496             latch.await();
 497 
 498             // check buffers
 499             src.flip();
 500             dst.flip();
 501             if (!src.equals(dst)) {
 502                throw new RuntimeException("Contents differ");
 503             }
 504 
 505             sc.close();
 506             ch.close();
 507         }
 508     }
 509 
 510     // exercise scattering read
 511     static void testRead3() throws Exception {
 512         System.out.println("-- read (3) --");
 513 
 514         try (Server server = new Server()) {
 515             final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 516             ch.connect(server.address()).get();
 517             SocketChannel sc = server.accept();
 518 
 519             ByteBuffer[] dsts = new ByteBuffer[3];
 520             for (int i=0; i<dsts.length; i++) {
 521                 dsts[i] = ByteBuffer.allocateDirect(100);
 522             }
 523 
 524             // scattering read that completes ascynhronously
 525             final CountDownLatch l1 = new CountDownLatch(1);
 526             ch.read(dsts, 0, dsts.length, 0L, TimeUnit.SECONDS, (Void)null,
 527                 new CompletionHandler<Long,Void>() {
 528                     public void completed(Long result, Void att) {
 529                         long n = result;
 530                         if (n <= 0)
 531                             throw new RuntimeException("No bytes read");
 532                         l1.countDown();
 533                     }
 534                     public void failed(Throwable exc, Void att) {
 535                     }
 536             });
 537 
 538             // write some bytes
 539             sc.write(genBuffer());
 540 
 541             // read should now complete
 542             l1.await();
 543 
 544             // write more bytes
 545             sc.write(genBuffer());
 546 
 547             // read should complete immediately
 548             for (int i=0; i<dsts.length; i++) {
 549                 dsts[i].rewind();
 550             }
 551 
 552             final CountDownLatch l2 = new CountDownLatch(1);
 553             ch.read(dsts, 0, dsts.length, 0L, TimeUnit.SECONDS, (Void)null,
 554                 new CompletionHandler<Long,Void>() {
 555                     public void completed(Long result, Void att) {
 556                         long n = result;
 557                         if (n <= 0)
 558                             throw new RuntimeException("No bytes read");
 559                         l2.countDown();
 560                     }
 561                     public void failed(Throwable exc, Void att) {
 562                     }
 563             });
 564             l2.await();
 565 
 566             ch.close();
 567             sc.close();
 568         }
 569     }
 570 
 571     static void testWrite1() throws Exception {
 572         System.out.println("-- write (1) --");
 573 
 574         try (Server server = new Server()) {
 575             final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 576             ch.connect(server.address()).get();
 577             SocketChannel sc = server.accept();
 578 
 579             // write with 0 bytes remaining should complete immediately
 580             ByteBuffer buf = ByteBuffer.allocate(1);
 581             buf.put((byte)0);
 582             int n = ch.write(buf).get();
 583             if (n != 0)
 584                 throw new RuntimeException("0 expected");
 585 
 586             // write all bytes and close connection when done
 587             final ByteBuffer src = genBuffer();
 588             ch.write(src, (Void)null, new CompletionHandler<Integer,Void>() {
 589                 public void completed(Integer result, Void att) {
 590                     if (src.hasRemaining()) {
 591                         ch.write(src, (Void)null, this);
 592                     } else {
 593                         try {
 594                             ch.close();
 595                         } catch (IOException ignore) { }
 596                     }
 597                 }
 598                 public void failed(Throwable exc, Void att) {
 599                 }
 600             });
 601 
 602             // read to EOF or buffer full
 603             ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity() + 100);
 604             do {
 605                 n = sc.read(dst);
 606             } while (n > 0);
 607             sc.close();
 608 
 609             // check buffers
 610             src.flip();
 611             dst.flip();
 612             if (!src.equals(dst)) {
 613                 throw new RuntimeException("Contents differ");
 614             }
 615 
 616             // check write fails with ClosedChannelException
 617             try {
 618                 ch.read(dst).get();
 619                 throw new RuntimeException("ExecutionException expected");
 620             } catch (ExecutionException x) {
 621                 if (!(x.getCause() instanceof ClosedChannelException))
 622                     throw new RuntimeException("Cause of ClosedChannelException expected");
 623             }
 624         }
 625     }
 626 
 627     // exercise gathering write
 628     static void testWrite2() throws Exception {
 629         System.out.println("-- write (2) --");
 630 
 631         try (Server server = new Server()) {
 632             final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 633             ch.connect(server.address()).get();
 634             SocketChannel sc = server.accept();
 635 
 636             // number of bytes written
 637             final AtomicLong bytesWritten = new AtomicLong(0);
 638 
 639             // write buffers (should complete immediately)
 640             ByteBuffer[] srcs = genBuffers(1);
 641             final CountDownLatch l1 = new CountDownLatch(1);
 642             ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, (Void)null,
 643                 new CompletionHandler<Long,Void>() {
 644                     public void completed(Long result, Void att) {
 645                         long n = result;
 646                         if (n <= 0)
 647                             throw new RuntimeException("No bytes read");
 648                         bytesWritten.addAndGet(n);
 649                         l1.countDown();
 650                     }
 651                     public void failed(Throwable exc, Void att) {
 652                     }
 653             });
 654             l1.await();
 655 
 656             // set to true to signal that no more buffers should be written
 657             final AtomicBoolean continueWriting = new AtomicBoolean(true);
 658 
 659             // write until socket buffer is full so as to create the conditions
 660             // for when a write does not complete immediately
 661             srcs = genBuffers(1);
 662             ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, (Void)null,
 663                 new CompletionHandler<Long,Void>() {
 664                     public void completed(Long result, Void att) {
 665                         long n = result;
 666                         if (n <= 0)
 667                             throw new RuntimeException("No bytes written");
 668                         bytesWritten.addAndGet(n);
 669                         if (continueWriting.get()) {
 670                             ByteBuffer[] srcs = genBuffers(8);
 671                             ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS,
 672                                 (Void)null, this);
 673                         }
 674                     }
 675                     public void failed(Throwable exc, Void att) {
 676                     }
 677             });
 678 
 679             // give time for socket buffer to fill up.
 680             Thread.sleep(5*1000);
 681 
 682             // signal handler to stop further writing
 683             continueWriting.set(false);
 684 
 685             // read until done
 686             ByteBuffer buf = ByteBuffer.allocateDirect(4096);
 687             long total = 0L;
 688             do {
 689                 int n = sc.read(buf);
 690                 if (n <= 0)
 691                     throw new RuntimeException("No bytes read");
 692                 buf.rewind();
 693                 total += n;
 694             } while (total < bytesWritten.get());
 695 
 696             ch.close();
 697             sc.close();
 698         }
 699     }
 700 
 701     static void testShutdown() throws Exception {
 702         System.out.println("-- shutdown--");
 703 
 704         try (Server server = new Server();
 705              AsynchronousSocketChannel ch = AsynchronousSocketChannel.open())
 706         {
 707             ch.connect(server.address()).get();
 708             try (SocketChannel peer = server.accept()) {
 709                 ByteBuffer buf = ByteBuffer.allocateDirect(1000);
 710                 int n;
 711 
 712                 // check read
 713                 ch.shutdownInput();
 714                 n = ch.read(buf).get();
 715                 if (n != -1)
 716                     throw new RuntimeException("-1 expected");
 717                 // check full with full buffer
 718                 buf.put(new byte[100]);
 719                 n = ch.read(buf).get();
 720                 if (n != -1)
 721                     throw new RuntimeException("-1 expected");
 722 
 723                 // check write
 724                 ch.shutdownOutput();
 725                 try {
 726                     ch.write(buf).get();
 727                     throw new RuntimeException("ClosedChannelException expected");
 728                 } catch (ExecutionException x) {
 729                     if (!(x.getCause() instanceof ClosedChannelException))
 730                         throw new RuntimeException("ClosedChannelException expected");
 731                 }
 732             }
 733         }
 734     }
 735 
 736     static void testTimeout() throws Exception {
 737         System.out.println("-- timeouts --");
 738         testTimeout(Integer.MIN_VALUE, TimeUnit.SECONDS);
 739         testTimeout(-1L, TimeUnit.SECONDS);
 740         testTimeout(0L, TimeUnit.SECONDS);
 741         testTimeout(2L, TimeUnit.SECONDS);
 742     }
 743 
 744     static void testTimeout(final long timeout, final TimeUnit unit) throws Exception {
 745         try (Server server = new Server()) {
 746             AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 747             ch.connect(server.address()).get();
 748 
 749             ByteBuffer dst = ByteBuffer.allocate(512);
 750 
 751             final AtomicReference<Throwable> readException = new AtomicReference<Throwable>();
 752 
 753             // this read should timeout if value is > 0
 754             ch.read(dst, timeout, unit, null, new CompletionHandler<Integer,Void>() {
 755                 public void completed(Integer result, Void att) {
 756                     readException.set(new RuntimeException("Should not complete"));
 757                 }
 758                 public void failed(Throwable exc, Void att) {
 759                     readException.set(exc);
 760                 }
 761             });
 762             if (timeout > 0L) {
 763                 // wait for exception
 764                 while (readException.get() == null) {
 765                     Thread.sleep(100);
 766                 }
 767                 if (!(readException.get() instanceof InterruptedByTimeoutException))
 768                     throw new RuntimeException("InterruptedByTimeoutException expected");
 769 
 770                 // after a timeout then further reading should throw unspecified runtime exception
 771                 boolean exceptionThrown = false;
 772                 try {
 773                     ch.read(dst);
 774                 } catch (RuntimeException x) {
 775                     exceptionThrown = true;
 776                 }
 777                 if (!exceptionThrown)
 778                     throw new RuntimeException("RuntimeException expected after timeout.");
 779             } else {
 780                 Thread.sleep(1000);
 781                 Throwable exc = readException.get();
 782                 if (exc != null)
 783                     throw new RuntimeException(exc);
 784             }
 785 
 786             final AtomicReference<Throwable> writeException = new AtomicReference<Throwable>();
 787 
 788             // write bytes to fill socket buffer
 789             ch.write(genBuffer(), timeout, unit, ch,
 790                 new CompletionHandler<Integer,AsynchronousSocketChannel>()
 791             {
 792                 public void completed(Integer result, AsynchronousSocketChannel ch) {
 793                     ch.write(genBuffer(), timeout, unit, ch, this);
 794                 }
 795                 public void failed(Throwable exc, AsynchronousSocketChannel ch) {
 796                     writeException.set(exc);
 797                 }
 798             });
 799             if (timeout > 0) {
 800                 // wait for exception
 801                 while (writeException.get() == null) {
 802                     Thread.sleep(100);
 803                 }
 804                 if (!(writeException.get() instanceof InterruptedByTimeoutException))
 805                     throw new RuntimeException("InterruptedByTimeoutException expected");
 806 
 807                 // after a timeout then further writing should throw unspecified runtime exception
 808                 boolean exceptionThrown = false;
 809                 try {
 810                     ch.write(genBuffer());
 811                 } catch (RuntimeException x) {
 812                     exceptionThrown = true;
 813                 }
 814                 if (!exceptionThrown)
 815                     throw new RuntimeException("RuntimeException expected after timeout.");
 816             } else {
 817                 Thread.sleep(1000);
 818                 Throwable exc = writeException.get();
 819                 if (exc != null)
 820                     throw new RuntimeException(exc);
 821             }
 822 
 823             // clean-up
 824             server.accept().close();
 825             ch.close();
 826         }
 827     }
 828 
 829     // returns ByteBuffer with random bytes
 830     static ByteBuffer genBuffer() {
 831         int size = 1024 + rand.nextInt(16000);
 832         byte[] buf = new byte[size];
 833         rand.nextBytes(buf);
 834         boolean useDirect = rand.nextBoolean();
 835         if (useDirect) {
 836             ByteBuffer bb = ByteBuffer.allocateDirect(buf.length);
 837             bb.put(buf);
 838             bb.flip();
 839             return bb;
 840         } else {
 841             return ByteBuffer.wrap(buf);
 842         }
 843     }
 844 
 845     // return ByteBuffer[] with random bytes
 846     static ByteBuffer[] genBuffers(int max) {
 847         int len = 1;
 848         if (max > 1)
 849             len += rand.nextInt(max);
 850         ByteBuffer[] bufs = new ByteBuffer[len];
 851         for (int i=0; i<len; i++)
 852             bufs[i] = genBuffer();
 853         return bufs;
 854     }
 855 
 856     // return random SocketAddress
 857     static SocketAddress genSocketAddress() {
 858         StringBuilder sb = new StringBuilder("10.");
 859         sb.append(rand.nextInt(256));
 860         sb.append('.');
 861         sb.append(rand.nextInt(256));
 862         sb.append('.');
 863         sb.append(rand.nextInt(256));
 864         InetAddress rh;
 865         try {
 866             rh = InetAddress.getByName(sb.toString());
 867         } catch (UnknownHostException x) {
 868             throw new InternalError("Should not happen");
 869         }
 870         return new InetSocketAddress(rh, rand.nextInt(65535)+1);
 871     }
 872 }