1 /*
   2  * Copyright (c) 2008, 2017, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 /* @test
  25  * @bug 4607272 6842687 6878369 6944810 7023403
  26  * @summary Unit test for AsynchronousSocketChannel(use -Dseed=X to set PRNG seed)
  27  * @library /test/lib
  28  * @build jdk.test.lib.RandomFactory
  29  * @run main Basic -skipSlowConnectTest
  30  * @key randomness intermittent
  31  */
  32 
  33 import java.io.Closeable;
  34 import java.io.IOException;
  35 import java.net.*;
  36 import static java.net.StandardSocketOptions.*;
  37 import java.nio.ByteBuffer;
  38 import java.nio.channels.*;
  39 import java.util.Random;
  40 import java.util.Set;
  41 import java.util.concurrent.*;
  42 import java.util.concurrent.atomic.*;
  43 import jdk.test.lib.RandomFactory;
  44 
  45 public class Basic {
  46     private static final Random RAND = RandomFactory.getRandom();
  47 
  48     static boolean skipSlowConnectTest = false;
  49 
  50     public static void main(String[] args) throws Exception {
  51         for (String arg: args) {
  52             switch (arg) {
  53             case "-skipSlowConnectTest" :
  54                 skipSlowConnectTest = true;
  55                 break;
  56             default:
  57                 throw new RuntimeException("Unrecognized argument: " + arg);
  58             }
  59         }
  60 
  61         testBind();
  62         testSocketOptions();
  63         testConnect();
  64         testCloseWhenPending();
  65         testCancel();
  66         testRead1();
  67         testRead2();
  68         testRead3();
  69         testWrite1();
  70         testWrite2();
  71         // skip timeout tests until 7052549 is fixed
  72         if (!System.getProperty("os.name").startsWith("Windows"))
  73             testTimeout();
  74         testShutdown();
  75     }
  76 
  77     static class Server implements Closeable {
  78         private final ServerSocketChannel ssc;
  79         private final InetSocketAddress address;
  80 
  81         Server() throws IOException {
  82             ssc = ServerSocketChannel.open().bind(new InetSocketAddress(0));
  83 
  84             InetAddress lh = InetAddress.getLocalHost();
  85             int port = ((InetSocketAddress)(ssc.getLocalAddress())).getPort();
  86             address = new InetSocketAddress(lh, port);
  87         }
  88 
  89         InetSocketAddress address() {
  90             return address;
  91         }
  92 
  93         SocketChannel accept() throws IOException {
  94             return ssc.accept();
  95         }
  96 
  97         public void close() throws IOException {
  98             ssc.close();
  99         }
 100 
 101     }
 102 
 103     static void testBind() throws Exception {
 104         System.out.println("-- bind --");
 105 
 106         try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
 107             if (ch.getLocalAddress() != null)
 108                 throw new RuntimeException("Local address should be 'null'");
 109             ch.bind(new InetSocketAddress(0));
 110 
 111             // check local address after binding
 112             InetSocketAddress local = (InetSocketAddress)ch.getLocalAddress();
 113             if (local.getPort() == 0)
 114                 throw new RuntimeException("Unexpected port");
 115             if (!local.getAddress().isAnyLocalAddress())
 116                 throw new RuntimeException("Not bound to a wildcard address");
 117 
 118             // try to re-bind
 119             try {
 120                 ch.bind(new InetSocketAddress(0));
 121                 throw new RuntimeException("AlreadyBoundException expected");
 122             } catch (AlreadyBoundException x) {
 123             }
 124         }
 125 
 126         // check ClosedChannelException
 127         AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 128         ch.close();
 129         try {
 130             ch.bind(new InetSocketAddress(0));
 131             throw new RuntimeException("ClosedChannelException  expected");
 132         } catch (ClosedChannelException  x) {
 133         }
 134     }
 135 
 136     static void testSocketOptions() throws Exception {
 137         System.out.println("-- socket options --");
 138 
 139         try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
 140             ch.setOption(SO_RCVBUF, 128*1024)
 141               .setOption(SO_SNDBUF, 128*1024)
 142               .setOption(SO_REUSEADDR, true);
 143 
 144             // check SO_SNDBUF/SO_RCVBUF limits
 145             int before, after;
 146             before = ch.getOption(SO_SNDBUF);
 147             after = ch.setOption(SO_SNDBUF, Integer.MAX_VALUE).getOption(SO_SNDBUF);
 148             if (after < before)
 149                 throw new RuntimeException("setOption caused SO_SNDBUF to decrease");
 150             before = ch.getOption(SO_RCVBUF);
 151             after = ch.setOption(SO_RCVBUF, Integer.MAX_VALUE).getOption(SO_RCVBUF);
 152             if (after < before)
 153                 throw new RuntimeException("setOption caused SO_RCVBUF to decrease");
 154 
 155             ch.bind(new InetSocketAddress(0));
 156 
 157             // default values
 158             if (ch.getOption(SO_KEEPALIVE))
 159                 throw new RuntimeException("Default of SO_KEEPALIVE should be 'false'");
 160             if (ch.getOption(TCP_NODELAY))
 161                 throw new RuntimeException("Default of TCP_NODELAY should be 'false'");
 162 
 163             // set and check
 164             if (!ch.setOption(SO_KEEPALIVE, true).getOption(SO_KEEPALIVE))
 165                 throw new RuntimeException("SO_KEEPALIVE did not change");
 166             if (!ch.setOption(TCP_NODELAY, true).getOption(TCP_NODELAY))
 167                 throw new RuntimeException("SO_KEEPALIVE did not change");
 168 
 169             // read others (can't check as actual value is implementation dependent)
 170             ch.getOption(SO_RCVBUF);
 171             ch.getOption(SO_SNDBUF);
 172 
 173             Set<SocketOption<?>> options = ch.supportedOptions();
 174             boolean reuseport = options.contains(SO_REUSEPORT);
 175             if (reuseport) {
 176                 if (ch.getOption(SO_REUSEPORT))
 177                     throw new RuntimeException("Default of SO_REUSEPORT should be 'false'");
 178                 if (!ch.setOption(SO_REUSEPORT, true).getOption(SO_REUSEPORT))
 179                     throw new RuntimeException("SO_REUSEPORT did not change");
 180             }
 181         }
 182     }
 183 
 184     static void testConnect() throws Exception {
 185         System.out.println("-- connect --");
 186 
 187         SocketAddress address;
 188 
 189         try (Server server = new Server()) {
 190             address = server.address();
 191 
 192             // connect to server and check local/remote addresses
 193             try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
 194                 ch.connect(address).get();
 195                 // check local address
 196                 if (ch.getLocalAddress() == null)
 197                     throw new RuntimeException("Not bound to local address");
 198 
 199                 // check remote address
 200                 InetSocketAddress remote = (InetSocketAddress)ch.getRemoteAddress();
 201                 if (remote.getPort() != server.address().getPort())
 202                     throw new RuntimeException("Connected to unexpected port");
 203                 if (!remote.getAddress().equals(server.address().getAddress()))
 204                     throw new RuntimeException("Connected to unexpected address");
 205 
 206                 // try to connect again
 207                 try {
 208                     ch.connect(server.address()).get();
 209                     throw new RuntimeException("AlreadyConnectedException expected");
 210                 } catch (AlreadyConnectedException x) {
 211                 }
 212 
 213                 // clean-up
 214                 server.accept().close();
 215             }
 216 
 217             // check that connect fails with ClosedChannelException
 218             AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 219             ch.close();
 220             try {
 221                 ch.connect(server.address()).get();
 222                 throw new RuntimeException("ExecutionException expected");
 223             } catch (ExecutionException x) {
 224                 if (!(x.getCause() instanceof ClosedChannelException))
 225                     throw new RuntimeException("Cause of ClosedChannelException expected",
 226                             x.getCause());
 227             }
 228             final AtomicReference<Throwable> connectException = new AtomicReference<>();
 229             ch.connect(server.address(), (Void)null, new CompletionHandler<Void,Void>() {
 230                 public void completed(Void result, Void att) {
 231                 }
 232                 public void failed(Throwable exc, Void att) {
 233                     connectException.set(exc);
 234                 }
 235             });
 236             while (connectException.get() == null) {
 237                 Thread.sleep(100);
 238             }
 239             if (!(connectException.get() instanceof ClosedChannelException))
 240                 throw new RuntimeException("ClosedChannelException expected",
 241                         connectException.get());
 242         }
 243 
 244         // test that failure to connect closes the channel
 245         try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
 246             try {
 247                 ch.connect(address).get();
 248             } catch (ExecutionException x) {
 249                 // failed to establish connection
 250                 if (ch.isOpen())
 251                     throw new RuntimeException("Channel should be closed");
 252             }
 253         }
 254 
 255         // repeat test by connecting to a (probably) non-existent host. This
 256         // improves the chance that the connect will not fail immediately.
 257         if (!skipSlowConnectTest) {
 258             try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
 259                 try {
 260                     ch.connect(genSocketAddress()).get();
 261                 } catch (ExecutionException x) {
 262                     // failed to establish connection
 263                     if (ch.isOpen())
 264                         throw new RuntimeException("Channel should be closed");
 265                 }
 266             }
 267         }
 268     }
 269 
 270     static void testCloseWhenPending() throws Exception {
 271         System.out.println("-- asynchronous close when connecting --");
 272 
 273         AsynchronousSocketChannel ch;
 274 
 275         // asynchronous close while connecting
 276         ch = AsynchronousSocketChannel.open();
 277         Future<Void> connectResult = ch.connect(genSocketAddress());
 278 
 279         // give time to initiate the connect (SYN)
 280         Thread.sleep(50);
 281 
 282         // close
 283         ch.close();
 284 
 285         // check that exception is thrown in timely manner
 286         try {
 287             connectResult.get(5, TimeUnit.SECONDS);
 288         } catch (TimeoutException x) {
 289             throw new RuntimeException("AsynchronousCloseException not thrown");
 290         } catch (ExecutionException x) {
 291             // expected
 292         }
 293 
 294         System.out.println("-- asynchronous close when reading --");
 295 
 296         try (Server server = new Server()) {
 297             ch = AsynchronousSocketChannel.open();
 298             ch.connect(server.address()).get();
 299 
 300             ByteBuffer dst = ByteBuffer.allocateDirect(100);
 301             Future<Integer> result = ch.read(dst);
 302 
 303             // attempt a second read - should fail with ReadPendingException
 304             ByteBuffer buf = ByteBuffer.allocateDirect(100);
 305             try {
 306                 ch.read(buf);
 307                 throw new RuntimeException("ReadPendingException expected");
 308             } catch (ReadPendingException x) {
 309             }
 310 
 311             // close channel (should cause initial read to complete)
 312             ch.close();
 313             server.accept().close();
 314 
 315             // check that AsynchronousCloseException is thrown
 316             try {
 317                 result.get();
 318                 throw new RuntimeException("Should not read");
 319             } catch (ExecutionException x) {
 320                 if (!(x.getCause() instanceof AsynchronousCloseException))
 321                     throw new RuntimeException(x);
 322             }
 323 
 324             System.out.println("-- asynchronous close when writing --");
 325 
 326             ch = AsynchronousSocketChannel.open();
 327             ch.connect(server.address()).get();
 328 
 329             final AtomicReference<Throwable> writeException =
 330                 new AtomicReference<Throwable>();
 331 
 332             // write bytes to fill socket buffer
 333             final AtomicInteger numCompleted = new AtomicInteger();
 334             ch.write(genBuffer(), ch, new CompletionHandler<Integer,AsynchronousSocketChannel>() {
 335                 public void completed(Integer result, AsynchronousSocketChannel ch) {
 336                     numCompleted.incrementAndGet();
 337                     ch.write(genBuffer(), ch, this);
 338                 }
 339                 public void failed(Throwable x, AsynchronousSocketChannel ch) {
 340                     writeException.set(x);
 341                 }
 342             });
 343 
 344             // give time for socket buffer to fill up -
 345             // take pauses until the handler is no longer being invoked
 346             // because all writes are being pended which guarantees that
 347             // the internal channel state indicates it is writing
 348             int prevNumCompleted = numCompleted.get();
 349             do {
 350                 Thread.sleep(1000);
 351                 if (numCompleted.get() == prevNumCompleted) {
 352                     break;
 353                 }
 354                 prevNumCompleted = numCompleted.get();
 355             } while (true);
 356 
 357             // attempt a concurrent write -
 358             // should fail with WritePendingException
 359             try {
 360                 ch.write(genBuffer());
 361                 throw new RuntimeException("WritePendingException expected");
 362             } catch (WritePendingException x) {
 363             }
 364 
 365             // close channel - should cause initial write to complete
 366             ch.close();
 367             server.accept().close();
 368 
 369             // wait for exception
 370             while (writeException.get() == null) {
 371                 Thread.sleep(100);
 372             }
 373             if (!(writeException.get() instanceof AsynchronousCloseException))
 374                 throw new RuntimeException("AsynchronousCloseException expected",
 375                         writeException.get());
 376         }
 377     }
 378 
 379     static void testCancel() throws Exception {
 380         System.out.println("-- cancel --");
 381 
 382         try (Server server = new Server()) {
 383             for (int i=0; i<2; i++) {
 384                 boolean mayInterruptIfRunning = (i == 0) ? false : true;
 385 
 386                 // establish loopback connection
 387                 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 388                 ch.connect(server.address()).get();
 389                 SocketChannel peer = server.accept();
 390 
 391                 // start read operation
 392                 ByteBuffer buf = ByteBuffer.allocate(1);
 393                 Future<Integer> res = ch.read(buf);
 394 
 395                 // cancel operation
 396                 boolean cancelled = res.cancel(mayInterruptIfRunning);
 397 
 398                 // check post-conditions
 399                 if (!res.isDone())
 400                     throw new RuntimeException("isDone should return true");
 401                 if (res.isCancelled() != cancelled)
 402                     throw new RuntimeException("isCancelled not consistent");
 403                 try {
 404                     res.get();
 405                     throw new RuntimeException("CancellationException expected");
 406                 } catch (CancellationException x) {
 407                 }
 408                 try {
 409                     res.get(1, TimeUnit.SECONDS);
 410                     throw new RuntimeException("CancellationException expected");
 411                 } catch (CancellationException x) {
 412                 }
 413 
 414                 // check that the cancel doesn't impact writing to the channel
 415                 if (!mayInterruptIfRunning) {
 416                     buf = ByteBuffer.wrap("a".getBytes());
 417                     ch.write(buf).get();
 418                 }
 419 
 420                 ch.close();
 421                 peer.close();
 422             }
 423         }
 424     }
 425 
 426     static void testRead1() throws Exception {
 427         System.out.println("-- read (1) --");
 428 
 429         try (Server server = new Server()) {
 430             final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 431             ch.connect(server.address()).get();
 432 
 433             // read with 0 bytes remaining should complete immediately
 434             ByteBuffer buf = ByteBuffer.allocate(1);
 435             buf.put((byte)0);
 436             int n = ch.read(buf).get();
 437             if (n != 0)
 438                 throw new RuntimeException("0 expected");
 439 
 440             // write bytes and close connection
 441             ByteBuffer src = genBuffer();
 442             try (SocketChannel sc = server.accept()) {
 443                 sc.setOption(SO_SNDBUF, src.remaining());
 444                 while (src.hasRemaining())
 445                     sc.write(src);
 446             }
 447 
 448             // reads should complete immediately
 449             final ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity() + 100);
 450             final CountDownLatch latch = new CountDownLatch(1);
 451             ch.read(dst, (Void)null, new CompletionHandler<Integer,Void>() {
 452                 public void completed(Integer result, Void att) {
 453                     int n = result;
 454                     if (n > 0) {
 455                         ch.read(dst, (Void)null, this);
 456                     } else {
 457                         latch.countDown();
 458                     }
 459                 }
 460                 public void failed(Throwable exc, Void att) {
 461                 }
 462             });
 463 
 464             latch.await();
 465 
 466             // check buffers
 467             src.flip();
 468             dst.flip();
 469             if (!src.equals(dst)) {
 470                 throw new RuntimeException("Contents differ");
 471             }
 472 
 473             // close channel
 474             ch.close();
 475 
 476             // check read fails with ClosedChannelException
 477             try {
 478                 ch.read(dst).get();
 479                 throw new RuntimeException("ExecutionException expected");
 480             } catch (ExecutionException x) {
 481                 if (!(x.getCause() instanceof ClosedChannelException))
 482                     throw new RuntimeException("Cause of ClosedChannelException expected",
 483                             x.getCause());
 484             }
 485         }
 486     }
 487 
 488     static void testRead2() throws Exception {
 489         System.out.println("-- read (2) --");
 490 
 491         try (Server server = new Server()) {
 492             final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 493             ch.connect(server.address()).get();
 494             SocketChannel sc = server.accept();
 495 
 496             ByteBuffer src = genBuffer();
 497 
 498             // read until the buffer is full
 499             final ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity());
 500             final CountDownLatch latch = new CountDownLatch(1);
 501             ch.read(dst, (Void)null, new CompletionHandler<Integer,Void>() {
 502                 public void completed(Integer result, Void att) {
 503                     if (dst.hasRemaining()) {
 504                         ch.read(dst, (Void)null, this);
 505                     } else {
 506                         latch.countDown();
 507                     }
 508                 }
 509                 public void failed(Throwable exc, Void att) {
 510                 }
 511             });
 512 
 513             // trickle the writing
 514             do {
 515                 int rem = src.remaining();
 516                 int size = (rem <= 100) ? rem : 50 + RAND.nextInt(rem - 100);
 517                 ByteBuffer buf = ByteBuffer.allocate(size);
 518                 for (int i=0; i<size; i++)
 519                     buf.put(src.get());
 520                 buf.flip();
 521                 Thread.sleep(50 + RAND.nextInt(1500));
 522                 while (buf.hasRemaining())
 523                     sc.write(buf);
 524             } while (src.hasRemaining());
 525 
 526             // wait until ascynrhonous reading has completed
 527             latch.await();
 528 
 529             // check buffers
 530             src.flip();
 531             dst.flip();
 532             if (!src.equals(dst)) {
 533                throw new RuntimeException("Contents differ");
 534             }
 535 
 536             sc.close();
 537             ch.close();
 538         }
 539     }
 540 
 541     // exercise scattering read
 542     static void testRead3() throws Exception {
 543         System.out.println("-- read (3) --");
 544 
 545         try (Server server = new Server()) {
 546             final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 547             ch.connect(server.address()).get();
 548             SocketChannel sc = server.accept();
 549 
 550             ByteBuffer[] dsts = new ByteBuffer[3];
 551             for (int i=0; i<dsts.length; i++) {
 552                 dsts[i] = ByteBuffer.allocateDirect(100);
 553             }
 554 
 555             // scattering read that completes ascynhronously
 556             final CountDownLatch l1 = new CountDownLatch(1);
 557             ch.read(dsts, 0, dsts.length, 0L, TimeUnit.SECONDS, (Void)null,
 558                 new CompletionHandler<Long,Void>() {
 559                     public void completed(Long result, Void att) {
 560                         long n = result;
 561                         if (n <= 0)
 562                             throw new RuntimeException("No bytes read");
 563                         l1.countDown();
 564                     }
 565                     public void failed(Throwable exc, Void att) {
 566                     }
 567             });
 568 
 569             // write some bytes
 570             sc.write(genBuffer());
 571 
 572             // read should now complete
 573             l1.await();
 574 
 575             // write more bytes
 576             sc.write(genBuffer());
 577 
 578             // read should complete immediately
 579             for (int i=0; i<dsts.length; i++) {
 580                 dsts[i].rewind();
 581             }
 582 
 583             final CountDownLatch l2 = new CountDownLatch(1);
 584             ch.read(dsts, 0, dsts.length, 0L, TimeUnit.SECONDS, (Void)null,
 585                 new CompletionHandler<Long,Void>() {
 586                     public void completed(Long result, Void att) {
 587                         long n = result;
 588                         if (n <= 0)
 589                             throw new RuntimeException("No bytes read");
 590                         l2.countDown();
 591                     }
 592                     public void failed(Throwable exc, Void att) {
 593                     }
 594             });
 595             l2.await();
 596 
 597             ch.close();
 598             sc.close();
 599         }
 600     }
 601 
 602     static void testWrite1() throws Exception {
 603         System.out.println("-- write (1) --");
 604 
 605         try (Server server = new Server()) {
 606             final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 607             ch.connect(server.address()).get();
 608             SocketChannel sc = server.accept();
 609 
 610             // write with 0 bytes remaining should complete immediately
 611             ByteBuffer buf = ByteBuffer.allocate(1);
 612             buf.put((byte)0);
 613             int n = ch.write(buf).get();
 614             if (n != 0)
 615                 throw new RuntimeException("0 expected");
 616 
 617             // write all bytes and close connection when done
 618             final ByteBuffer src = genBuffer();
 619             ch.write(src, (Void)null, new CompletionHandler<Integer,Void>() {
 620                 public void completed(Integer result, Void att) {
 621                     if (src.hasRemaining()) {
 622                         ch.write(src, (Void)null, this);
 623                     } else {
 624                         try {
 625                             ch.close();
 626                         } catch (IOException ignore) { }
 627                     }
 628                 }
 629                 public void failed(Throwable exc, Void att) {
 630                 }
 631             });
 632 
 633             // read to EOF or buffer full
 634             ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity() + 100);
 635             do {
 636                 n = sc.read(dst);
 637             } while (n > 0);
 638             sc.close();
 639 
 640             // check buffers
 641             src.flip();
 642             dst.flip();
 643             if (!src.equals(dst)) {
 644                 throw new RuntimeException("Contents differ");
 645             }
 646 
 647             // check write fails with ClosedChannelException
 648             try {
 649                 ch.read(dst).get();
 650                 throw new RuntimeException("ExecutionException expected");
 651             } catch (ExecutionException x) {
 652                 if (!(x.getCause() instanceof ClosedChannelException))
 653                     throw new RuntimeException("Cause of ClosedChannelException expected",
 654                             x.getCause());
 655             }
 656         }
 657     }
 658 
 659     // exercise gathering write
 660     static void testWrite2() throws Exception {
 661         System.out.println("-- write (2) --");
 662 
 663         try (Server server = new Server()) {
 664             final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 665             ch.connect(server.address()).get();
 666             SocketChannel sc = server.accept();
 667 
 668             // number of bytes written
 669             final AtomicLong bytesWritten = new AtomicLong(0);
 670 
 671             // write buffers (should complete immediately)
 672             ByteBuffer[] srcs = genBuffers(1);
 673             final CountDownLatch l1 = new CountDownLatch(1);
 674             ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, (Void)null,
 675                 new CompletionHandler<Long,Void>() {
 676                     public void completed(Long result, Void att) {
 677                         long n = result;
 678                         if (n <= 0)
 679                             throw new RuntimeException("No bytes read");
 680                         bytesWritten.addAndGet(n);
 681                         l1.countDown();
 682                     }
 683                     public void failed(Throwable exc, Void att) {
 684                     }
 685             });
 686             l1.await();
 687 
 688             // set to true to signal that no more buffers should be written
 689             final AtomicBoolean continueWriting = new AtomicBoolean(true);
 690 
 691             // write until socket buffer is full so as to create the conditions
 692             // for when a write does not complete immediately
 693             srcs = genBuffers(1);
 694             ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, (Void)null,
 695                 new CompletionHandler<Long,Void>() {
 696                     public void completed(Long result, Void att) {
 697                         long n = result;
 698                         if (n <= 0)
 699                             throw new RuntimeException("No bytes written");
 700                         bytesWritten.addAndGet(n);
 701                         if (continueWriting.get()) {
 702                             ByteBuffer[] srcs = genBuffers(8);
 703                             ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS,
 704                                 (Void)null, this);
 705                         }
 706                     }
 707                     public void failed(Throwable exc, Void att) {
 708                     }
 709             });
 710 
 711             // give time for socket buffer to fill up.
 712             Thread.sleep(5*1000);
 713 
 714             // signal handler to stop further writing
 715             continueWriting.set(false);
 716 
 717             // read until done
 718             ByteBuffer buf = ByteBuffer.allocateDirect(4096);
 719             long total = 0L;
 720             do {
 721                 int n = sc.read(buf);
 722                 if (n <= 0)
 723                     throw new RuntimeException("No bytes read");
 724                 buf.rewind();
 725                 total += n;
 726             } while (total < bytesWritten.get());
 727 
 728             ch.close();
 729             sc.close();
 730         }
 731     }
 732 
 733     static void testShutdown() throws Exception {
 734         System.out.println("-- shutdown --");
 735 
 736         try (Server server = new Server();
 737              AsynchronousSocketChannel ch = AsynchronousSocketChannel.open())
 738         {
 739             ch.connect(server.address()).get();
 740             try (SocketChannel peer = server.accept()) {
 741                 ByteBuffer buf = ByteBuffer.allocateDirect(1000);
 742                 int n;
 743 
 744                 // check read
 745                 ch.shutdownInput();
 746                 n = ch.read(buf).get();
 747                 if (n != -1)
 748                     throw new RuntimeException("-1 expected");
 749                 // check full with full buffer
 750                 buf.put(new byte[100]);
 751                 n = ch.read(buf).get();
 752                 if (n != -1)
 753                     throw new RuntimeException("-1 expected");
 754 
 755                 // check write
 756                 ch.shutdownOutput();
 757                 try {
 758                     ch.write(buf).get();
 759                     throw new RuntimeException("ClosedChannelException expected");
 760                 } catch (ExecutionException x) {
 761                     if (!(x.getCause() instanceof ClosedChannelException))
 762                         throw new RuntimeException("ClosedChannelException expected",
 763                                 x.getCause());
 764                 }
 765             }
 766         }
 767     }
 768 
 769     static void testTimeout() throws Exception {
 770         System.out.println("-- timeouts --");
 771         testTimeout(Integer.MIN_VALUE, TimeUnit.SECONDS);
 772         testTimeout(-1L, TimeUnit.SECONDS);
 773         testTimeout(0L, TimeUnit.SECONDS);
 774         testTimeout(2L, TimeUnit.SECONDS);
 775     }
 776 
 777     static void testTimeout(final long timeout, final TimeUnit unit) throws Exception {
 778         System.out.printf("---- timeout: %d ms%n", unit.toMillis(timeout));
 779         try (Server server = new Server()) {
 780             AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 781             ch.connect(server.address()).get();
 782 
 783             ByteBuffer dst = ByteBuffer.allocate(512);
 784 
 785             final AtomicReference<Throwable> readException = new AtomicReference<Throwable>();
 786 
 787             // this read should timeout if value is > 0
 788             ch.read(dst, timeout, unit, null, new CompletionHandler<Integer,Void>() {
 789                 public void completed(Integer result, Void att) {
 790                     readException.set(new RuntimeException("Should not complete"));
 791                 }
 792                 public void failed(Throwable exc, Void att) {
 793                     readException.set(exc);
 794                 }
 795             });
 796             if (timeout > 0L) {
 797                 // wait for exception
 798                 while (readException.get() == null) {
 799                     Thread.sleep(100);
 800                 }
 801                 if (!(readException.get() instanceof InterruptedByTimeoutException))
 802                     throw new RuntimeException("InterruptedByTimeoutException expected",
 803                             readException.get());
 804 
 805                 // after a timeout then further reading should throw unspecified runtime exception
 806                 boolean exceptionThrown = false;
 807                 try {
 808                     ch.read(dst);
 809                 } catch (RuntimeException x) {
 810                     exceptionThrown = true;
 811                 }
 812                 if (!exceptionThrown)
 813                     throw new RuntimeException("RuntimeException expected after timeout.");
 814             } else {
 815                 Thread.sleep(1000);
 816                 Throwable exc = readException.get();
 817                 if (exc != null)
 818                     throw new RuntimeException(exc);
 819             }
 820 
 821             final AtomicReference<Throwable> writeException = new AtomicReference<Throwable>();
 822 
 823             // write bytes to fill socket buffer
 824             ch.write(genBuffer(), timeout, unit, ch,
 825                 new CompletionHandler<Integer,AsynchronousSocketChannel>()
 826             {
 827                 public void completed(Integer result, AsynchronousSocketChannel ch) {
 828                     ch.write(genBuffer(), timeout, unit, ch, this);
 829                 }
 830                 public void failed(Throwable exc, AsynchronousSocketChannel ch) {
 831                     writeException.set(exc);
 832                 }
 833             });
 834             if (timeout > 0) {
 835                 // wait for exception
 836                 while (writeException.get() == null) {
 837                     Thread.sleep(100);
 838                 }
 839                 if (!(writeException.get() instanceof InterruptedByTimeoutException))
 840                     throw new RuntimeException("InterruptedByTimeoutException expected",
 841                             writeException.get());
 842 
 843                 // after a timeout then further writing should throw unspecified runtime exception
 844                 boolean exceptionThrown = false;
 845                 try {
 846                     ch.write(genBuffer());
 847                 } catch (RuntimeException x) {
 848                     exceptionThrown = true;
 849                 }
 850                 if (!exceptionThrown)
 851                     throw new RuntimeException("RuntimeException expected after timeout.");
 852             } else {
 853                 Thread.sleep(1000);
 854                 Throwable exc = writeException.get();
 855                 if (exc != null)
 856                     throw new RuntimeException(exc);
 857             }
 858 
 859             // clean-up
 860             server.accept().close();
 861             ch.close();
 862         }
 863     }
 864 
 865     // returns ByteBuffer with random bytes
 866     static ByteBuffer genBuffer() {
 867         int size = 1024 + RAND.nextInt(16000);
 868         byte[] buf = new byte[size];
 869         RAND.nextBytes(buf);
 870         boolean useDirect = RAND.nextBoolean();
 871         if (useDirect) {
 872             ByteBuffer bb = ByteBuffer.allocateDirect(buf.length);
 873             bb.put(buf);
 874             bb.flip();
 875             return bb;
 876         } else {
 877             return ByteBuffer.wrap(buf);
 878         }
 879     }
 880 
 881     // return ByteBuffer[] with random bytes
 882     static ByteBuffer[] genBuffers(int max) {
 883         int len = 1;
 884         if (max > 1)
 885             len += RAND.nextInt(max);
 886         ByteBuffer[] bufs = new ByteBuffer[len];
 887         for (int i=0; i<len; i++)
 888             bufs[i] = genBuffer();
 889         return bufs;
 890     }
 891 
 892     // return random SocketAddress
 893     static SocketAddress genSocketAddress() {
 894         StringBuilder sb = new StringBuilder("10.");
 895         sb.append(RAND.nextInt(256));
 896         sb.append('.');
 897         sb.append(RAND.nextInt(256));
 898         sb.append('.');
 899         sb.append(RAND.nextInt(256));
 900         InetAddress rh;
 901         try {
 902             rh = InetAddress.getByName(sb.toString());
 903         } catch (UnknownHostException x) {
 904             throw new InternalError("Should not happen");
 905         }
 906         return new InetSocketAddress(rh, RAND.nextInt(65535)+1);
 907     }
 908 }