1 /*
   2  * Copyright (c) 2008, 2017, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 /* @test
  25  * @bug 4607272 6842687 6878369 6944810 7023403
  26  * @summary Unit test for AsynchronousSocketChannel
  27  * @run main Basic -skipSlowConnectTest
  28  * @key randomness intermittent
  29  */
  30 
  31 import java.io.Closeable;
  32 import java.io.IOException;
  33 import java.net.*;
  34 import static java.net.StandardSocketOptions.*;
  35 import java.nio.ByteBuffer;
  36 import java.nio.channels.*;
  37 import java.util.Random;
  38 import java.util.Set;
  39 import java.util.concurrent.*;
  40 import java.util.concurrent.atomic.*;
  41 
  42 public class Basic {
  43     static final Random rand = new Random();
  44 
  45     static boolean skipSlowConnectTest = false;
  46 
  47     public static void main(String[] args) throws Exception {
  48         for (String arg: args) {
  49             switch (arg) {
  50             case "-skipSlowConnectTest" :
  51                 skipSlowConnectTest = true;
  52                 break;
  53             default:
  54                 throw new RuntimeException("Unrecognized argument: " + arg);
  55             }
  56         }
  57 
  58         testBind();
  59         testSocketOptions();
  60         testConnect();
  61         testCloseWhenPending();
  62         testCancel();
  63         testRead1();
  64         testRead2();
  65         testRead3();
  66         testWrite1();
  67         testWrite2();
  68         // skip timeout tests until 7052549 is fixed
  69         if (!System.getProperty("os.name").startsWith("Windows"))
  70             testTimeout();
  71         testShutdown();
  72     }
  73 
  74     static class Server implements Closeable {
  75         private final ServerSocketChannel ssc;
  76         private final InetSocketAddress address;
  77 
  78         Server() throws IOException {
  79             ssc = ServerSocketChannel.open().bind(new InetSocketAddress(0));
  80 
  81             InetAddress lh = InetAddress.getLocalHost();
  82             int port = ((InetSocketAddress)(ssc.getLocalAddress())).getPort();
  83             address = new InetSocketAddress(lh, port);
  84         }
  85 
  86         InetSocketAddress address() {
  87             return address;
  88         }
  89 
  90         SocketChannel accept() throws IOException {
  91             return ssc.accept();
  92         }
  93 
  94         public void close() throws IOException {
  95             ssc.close();
  96         }
  97 
  98     }
  99 
 100     static void testBind() throws Exception {
 101         System.out.println("-- bind --");
 102 
 103         try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
 104             if (ch.getLocalAddress() != null)
 105                 throw new RuntimeException("Local address should be 'null'");
 106             ch.bind(new InetSocketAddress(0));
 107 
 108             // check local address after binding
 109             InetSocketAddress local = (InetSocketAddress)ch.getLocalAddress();
 110             if (local.getPort() == 0)
 111                 throw new RuntimeException("Unexpected port");
 112             if (!local.getAddress().isAnyLocalAddress())
 113                 throw new RuntimeException("Not bound to a wildcard address");
 114 
 115             // try to re-bind
 116             try {
 117                 ch.bind(new InetSocketAddress(0));
 118                 throw new RuntimeException("AlreadyBoundException expected");
 119             } catch (AlreadyBoundException x) {
 120             }
 121         }
 122 
 123         // check ClosedChannelException
 124         AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 125         ch.close();
 126         try {
 127             ch.bind(new InetSocketAddress(0));
 128             throw new RuntimeException("ClosedChannelException  expected");
 129         } catch (ClosedChannelException  x) {
 130         }
 131     }
 132 
 133     static void testSocketOptions() throws Exception {
 134         System.out.println("-- socket options --");
 135 
 136         try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
 137             ch.setOption(SO_RCVBUF, 128*1024)
 138               .setOption(SO_SNDBUF, 128*1024)
 139               .setOption(SO_REUSEADDR, true);
 140 
 141             // check SO_SNDBUF/SO_RCVBUF limits
 142             int before, after;
 143             before = ch.getOption(SO_SNDBUF);
 144             after = ch.setOption(SO_SNDBUF, Integer.MAX_VALUE).getOption(SO_SNDBUF);
 145             if (after < before)
 146                 throw new RuntimeException("setOption caused SO_SNDBUF to decrease");
 147             before = ch.getOption(SO_RCVBUF);
 148             after = ch.setOption(SO_RCVBUF, Integer.MAX_VALUE).getOption(SO_RCVBUF);
 149             if (after < before)
 150                 throw new RuntimeException("setOption caused SO_RCVBUF to decrease");
 151 
 152             ch.bind(new InetSocketAddress(0));
 153 
 154             // default values
 155             if (ch.getOption(SO_KEEPALIVE))
 156                 throw new RuntimeException("Default of SO_KEEPALIVE should be 'false'");
 157             if (ch.getOption(TCP_NODELAY))
 158                 throw new RuntimeException("Default of TCP_NODELAY should be 'false'");
 159 
 160             // set and check
 161             if (!ch.setOption(SO_KEEPALIVE, true).getOption(SO_KEEPALIVE))
 162                 throw new RuntimeException("SO_KEEPALIVE did not change");
 163             if (!ch.setOption(TCP_NODELAY, true).getOption(TCP_NODELAY))
 164                 throw new RuntimeException("SO_KEEPALIVE did not change");
 165 
 166             // read others (can't check as actual value is implementation dependent)
 167             ch.getOption(SO_RCVBUF);
 168             ch.getOption(SO_SNDBUF);
 169 
 170             Set<SocketOption<?>> options = ch.supportedOptions();
 171             boolean reuseport = options.contains(SO_REUSEPORT);
 172             if (reuseport) {
 173                 if (ch.getOption(SO_REUSEPORT))
 174                     throw new RuntimeException("Default of SO_REUSEPORT should be 'false'");
 175                 if (!ch.setOption(SO_REUSEPORT, true).getOption(SO_REUSEPORT))
 176                     throw new RuntimeException("SO_REUSEPORT did not change");
 177             }
 178         }
 179     }
 180 
 181     static void testConnect() throws Exception {
 182         System.out.println("-- connect --");
 183 
 184         SocketAddress address;
 185 
 186         try (Server server = new Server()) {
 187             address = server.address();
 188 
 189             // connect to server and check local/remote addresses
 190             try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
 191                 ch.connect(address).get();
 192                 // check local address
 193                 if (ch.getLocalAddress() == null)
 194                     throw new RuntimeException("Not bound to local address");
 195 
 196                 // check remote address
 197                 InetSocketAddress remote = (InetSocketAddress)ch.getRemoteAddress();
 198                 if (remote.getPort() != server.address().getPort())
 199                     throw new RuntimeException("Connected to unexpected port");
 200                 if (!remote.getAddress().equals(server.address().getAddress()))
 201                     throw new RuntimeException("Connected to unexpected address");
 202 
 203                 // try to connect again
 204                 try {
 205                     ch.connect(server.address()).get();
 206                     throw new RuntimeException("AlreadyConnectedException expected");
 207                 } catch (AlreadyConnectedException x) {
 208                 }
 209 
 210                 // clean-up
 211                 server.accept().close();
 212             }
 213 
 214             // check that connect fails with ClosedChannelException
 215             AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 216             ch.close();
 217             try {
 218                 ch.connect(server.address()).get();
 219                 throw new RuntimeException("ExecutionException expected");
 220             } catch (ExecutionException x) {
 221                 if (!(x.getCause() instanceof ClosedChannelException))
 222                     throw new RuntimeException("Cause of ClosedChannelException expected",
 223                             x.getCause());
 224             }
 225             final AtomicReference<Throwable> connectException = new AtomicReference<>();
 226             ch.connect(server.address(), (Void)null, new CompletionHandler<Void,Void>() {
 227                 public void completed(Void result, Void att) {
 228                 }
 229                 public void failed(Throwable exc, Void att) {
 230                     connectException.set(exc);
 231                 }
 232             });
 233             while (connectException.get() == null) {
 234                 Thread.sleep(100);
 235             }
 236             if (!(connectException.get() instanceof ClosedChannelException))
 237                 throw new RuntimeException("ClosedChannelException expected",
 238                         connectException.get());
 239         }
 240 
 241         // test that failure to connect closes the channel
 242         try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
 243             try {
 244                 ch.connect(address).get();
 245             } catch (ExecutionException x) {
 246                 // failed to establish connection
 247                 if (ch.isOpen())
 248                     throw new RuntimeException("Channel should be closed");
 249             }
 250         }
 251 
 252         // repeat test by connecting to a (probably) non-existent host. This
 253         // improves the chance that the connect will not fail immediately.
 254         if (!skipSlowConnectTest) {
 255             try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
 256                 try {
 257                     ch.connect(genSocketAddress()).get();
 258                 } catch (ExecutionException x) {
 259                     // failed to establish connection
 260                     if (ch.isOpen())
 261                         throw new RuntimeException("Channel should be closed");
 262                 }
 263             }
 264         }
 265     }
 266 
 267     static void testCloseWhenPending() throws Exception {
 268         System.out.println("-- asynchronous close when connecting --");
 269 
 270         AsynchronousSocketChannel ch;
 271 
 272         // asynchronous close while connecting
 273         ch = AsynchronousSocketChannel.open();
 274         Future<Void> connectResult = ch.connect(genSocketAddress());
 275 
 276         // give time to initiate the connect (SYN)
 277         Thread.sleep(50);
 278 
 279         // close
 280         ch.close();
 281 
 282         // check that exception is thrown in timely manner
 283         try {
 284             connectResult.get(5, TimeUnit.SECONDS);
 285         } catch (TimeoutException x) {
 286             throw new RuntimeException("AsynchronousCloseException not thrown");
 287         } catch (ExecutionException x) {
 288             // expected
 289         }
 290 
 291         System.out.println("-- asynchronous close when reading --");
 292 
 293         try (Server server = new Server()) {
 294             ch = AsynchronousSocketChannel.open();
 295             ch.connect(server.address()).get();
 296 
 297             ByteBuffer dst = ByteBuffer.allocateDirect(100);
 298             Future<Integer> result = ch.read(dst);
 299 
 300             // attempt a second read - should fail with ReadPendingException
 301             ByteBuffer buf = ByteBuffer.allocateDirect(100);
 302             try {
 303                 ch.read(buf);
 304                 throw new RuntimeException("ReadPendingException expected");
 305             } catch (ReadPendingException x) {
 306             }
 307 
 308             // close channel (should cause initial read to complete)
 309             ch.close();
 310             server.accept().close();
 311 
 312             // check that AsynchronousCloseException is thrown
 313             try {
 314                 result.get();
 315                 throw new RuntimeException("Should not read");
 316             } catch (ExecutionException x) {
 317                 if (!(x.getCause() instanceof AsynchronousCloseException))
 318                     throw new RuntimeException(x);
 319             }
 320 
 321             System.out.println("-- asynchronous close when writing --");
 322 
 323             ch = AsynchronousSocketChannel.open();
 324             ch.connect(server.address()).get();
 325 
 326             final AtomicReference<Throwable> writeException =
 327                 new AtomicReference<Throwable>();
 328 
 329             // write bytes to fill socket buffer
 330             final AtomicInteger numCompleted = new AtomicInteger();
 331             ch.write(genBuffer(), ch, new CompletionHandler<Integer,AsynchronousSocketChannel>() {
 332                 public void completed(Integer result, AsynchronousSocketChannel ch) {
 333                     numCompleted.incrementAndGet();
 334                     ch.write(genBuffer(), ch, this);
 335                 }
 336                 public void failed(Throwable x, AsynchronousSocketChannel ch) {
 337                     writeException.set(x);
 338                 }
 339             });
 340 
 341             // give time for socket buffer to fill up -
 342             // take pauses until the handler is no longer being invoked
 343             // because all writes are being pended which guarantees that
 344             // the internal channel state indicates it is writing
 345             int prevNumCompleted = numCompleted.get();
 346             do {
 347                 Thread.sleep(1000);
 348                 if (numCompleted.get() == prevNumCompleted) {
 349                     break;
 350                 }
 351                 prevNumCompleted = numCompleted.get();
 352             } while (true);
 353 
 354             // attempt a concurrent write - should fail with WritePendingException
 355             try {
 356                 ch.write(genBuffer());
 357                 throw new RuntimeException("WritePendingException expected");
 358             } catch (WritePendingException x) {
 359             }
 360 
 361             // close channel - should cause initial write to complete
 362             ch.close();
 363             server.accept().close();
 364 
 365             // wait for exception
 366             while (writeException.get() == null) {
 367                 Thread.sleep(100);
 368             }
 369             if (!(writeException.get() instanceof AsynchronousCloseException))
 370                 throw new RuntimeException("AsynchronousCloseException expected",
 371                         writeException.get());
 372         }
 373     }
 374 
 375     static void testCancel() throws Exception {
 376         System.out.println("-- cancel --");
 377 
 378         try (Server server = new Server()) {
 379             for (int i=0; i<2; i++) {
 380                 boolean mayInterruptIfRunning = (i == 0) ? false : true;
 381 
 382                 // establish loopback connection
 383                 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 384                 ch.connect(server.address()).get();
 385                 SocketChannel peer = server.accept();
 386 
 387                 // start read operation
 388                 ByteBuffer buf = ByteBuffer.allocate(1);
 389                 Future<Integer> res = ch.read(buf);
 390 
 391                 // cancel operation
 392                 boolean cancelled = res.cancel(mayInterruptIfRunning);
 393 
 394                 // check post-conditions
 395                 if (!res.isDone())
 396                     throw new RuntimeException("isDone should return true");
 397                 if (res.isCancelled() != cancelled)
 398                     throw new RuntimeException("isCancelled not consistent");
 399                 try {
 400                     res.get();
 401                     throw new RuntimeException("CancellationException expected");
 402                 } catch (CancellationException x) {
 403                 }
 404                 try {
 405                     res.get(1, TimeUnit.SECONDS);
 406                     throw new RuntimeException("CancellationException expected");
 407                 } catch (CancellationException x) {
 408                 }
 409 
 410                 // check that the cancel doesn't impact writing to the channel
 411                 if (!mayInterruptIfRunning) {
 412                     buf = ByteBuffer.wrap("a".getBytes());
 413                     ch.write(buf).get();
 414                 }
 415 
 416                 ch.close();
 417                 peer.close();
 418             }
 419         }
 420     }
 421 
 422     static void testRead1() throws Exception {
 423         System.out.println("-- read (1) --");
 424 
 425         try (Server server = new Server()) {
 426             final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 427             ch.connect(server.address()).get();
 428 
 429             // read with 0 bytes remaining should complete immediately
 430             ByteBuffer buf = ByteBuffer.allocate(1);
 431             buf.put((byte)0);
 432             int n = ch.read(buf).get();
 433             if (n != 0)
 434                 throw new RuntimeException("0 expected");
 435 
 436             // write bytes and close connection
 437             ByteBuffer src = genBuffer();
 438             try (SocketChannel sc = server.accept()) {
 439                 sc.setOption(SO_SNDBUF, src.remaining());
 440                 while (src.hasRemaining())
 441                     sc.write(src);
 442             }
 443 
 444             // reads should complete immediately
 445             final ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity() + 100);
 446             final CountDownLatch latch = new CountDownLatch(1);
 447             ch.read(dst, (Void)null, new CompletionHandler<Integer,Void>() {
 448                 public void completed(Integer result, Void att) {
 449                     int n = result;
 450                     if (n > 0) {
 451                         ch.read(dst, (Void)null, this);
 452                     } else {
 453                         latch.countDown();
 454                     }
 455                 }
 456                 public void failed(Throwable exc, Void att) {
 457                 }
 458             });
 459 
 460             latch.await();
 461 
 462             // check buffers
 463             src.flip();
 464             dst.flip();
 465             if (!src.equals(dst)) {
 466                 throw new RuntimeException("Contents differ");
 467             }
 468 
 469             // close channel
 470             ch.close();
 471 
 472             // check read fails with ClosedChannelException
 473             try {
 474                 ch.read(dst).get();
 475                 throw new RuntimeException("ExecutionException expected");
 476             } catch (ExecutionException x) {
 477                 if (!(x.getCause() instanceof ClosedChannelException))
 478                     throw new RuntimeException("Cause of ClosedChannelException expected",
 479                             x.getCause());
 480             }
 481         }
 482     }
 483 
 484     static void testRead2() throws Exception {
 485         System.out.println("-- read (2) --");
 486 
 487         try (Server server = new Server()) {
 488             final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 489             ch.connect(server.address()).get();
 490             SocketChannel sc = server.accept();
 491 
 492             ByteBuffer src = genBuffer();
 493 
 494             // read until the buffer is full
 495             final ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity());
 496             final CountDownLatch latch = new CountDownLatch(1);
 497             ch.read(dst, (Void)null, new CompletionHandler<Integer,Void>() {
 498                 public void completed(Integer result, Void att) {
 499                     if (dst.hasRemaining()) {
 500                         ch.read(dst, (Void)null, this);
 501                     } else {
 502                         latch.countDown();
 503                     }
 504                 }
 505                 public void failed(Throwable exc, Void att) {
 506                 }
 507             });
 508 
 509             // trickle the writing
 510             do {
 511                 int rem = src.remaining();
 512                 int size = (rem <= 100) ? rem : 50 + rand.nextInt(rem - 100);
 513                 ByteBuffer buf = ByteBuffer.allocate(size);
 514                 for (int i=0; i<size; i++)
 515                     buf.put(src.get());
 516                 buf.flip();
 517                 Thread.sleep(50 + rand.nextInt(1500));
 518                 while (buf.hasRemaining())
 519                     sc.write(buf);
 520             } while (src.hasRemaining());
 521 
 522             // wait until ascynrhonous reading has completed
 523             latch.await();
 524 
 525             // check buffers
 526             src.flip();
 527             dst.flip();
 528             if (!src.equals(dst)) {
 529                throw new RuntimeException("Contents differ");
 530             }
 531 
 532             sc.close();
 533             ch.close();
 534         }
 535     }
 536 
 537     // exercise scattering read
 538     static void testRead3() throws Exception {
 539         System.out.println("-- read (3) --");
 540 
 541         try (Server server = new Server()) {
 542             final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 543             ch.connect(server.address()).get();
 544             SocketChannel sc = server.accept();
 545 
 546             ByteBuffer[] dsts = new ByteBuffer[3];
 547             for (int i=0; i<dsts.length; i++) {
 548                 dsts[i] = ByteBuffer.allocateDirect(100);
 549             }
 550 
 551             // scattering read that completes ascynhronously
 552             final CountDownLatch l1 = new CountDownLatch(1);
 553             ch.read(dsts, 0, dsts.length, 0L, TimeUnit.SECONDS, (Void)null,
 554                 new CompletionHandler<Long,Void>() {
 555                     public void completed(Long result, Void att) {
 556                         long n = result;
 557                         if (n <= 0)
 558                             throw new RuntimeException("No bytes read");
 559                         l1.countDown();
 560                     }
 561                     public void failed(Throwable exc, Void att) {
 562                     }
 563             });
 564 
 565             // write some bytes
 566             sc.write(genBuffer());
 567 
 568             // read should now complete
 569             l1.await();
 570 
 571             // write more bytes
 572             sc.write(genBuffer());
 573 
 574             // read should complete immediately
 575             for (int i=0; i<dsts.length; i++) {
 576                 dsts[i].rewind();
 577             }
 578 
 579             final CountDownLatch l2 = new CountDownLatch(1);
 580             ch.read(dsts, 0, dsts.length, 0L, TimeUnit.SECONDS, (Void)null,
 581                 new CompletionHandler<Long,Void>() {
 582                     public void completed(Long result, Void att) {
 583                         long n = result;
 584                         if (n <= 0)
 585                             throw new RuntimeException("No bytes read");
 586                         l2.countDown();
 587                     }
 588                     public void failed(Throwable exc, Void att) {
 589                     }
 590             });
 591             l2.await();
 592 
 593             ch.close();
 594             sc.close();
 595         }
 596     }
 597 
 598     static void testWrite1() throws Exception {
 599         System.out.println("-- write (1) --");
 600 
 601         try (Server server = new Server()) {
 602             final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 603             ch.connect(server.address()).get();
 604             SocketChannel sc = server.accept();
 605 
 606             // write with 0 bytes remaining should complete immediately
 607             ByteBuffer buf = ByteBuffer.allocate(1);
 608             buf.put((byte)0);
 609             int n = ch.write(buf).get();
 610             if (n != 0)
 611                 throw new RuntimeException("0 expected");
 612 
 613             // write all bytes and close connection when done
 614             final ByteBuffer src = genBuffer();
 615             ch.write(src, (Void)null, new CompletionHandler<Integer,Void>() {
 616                 public void completed(Integer result, Void att) {
 617                     if (src.hasRemaining()) {
 618                         ch.write(src, (Void)null, this);
 619                     } else {
 620                         try {
 621                             ch.close();
 622                         } catch (IOException ignore) { }
 623                     }
 624                 }
 625                 public void failed(Throwable exc, Void att) {
 626                 }
 627             });
 628 
 629             // read to EOF or buffer full
 630             ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity() + 100);
 631             do {
 632                 n = sc.read(dst);
 633             } while (n > 0);
 634             sc.close();
 635 
 636             // check buffers
 637             src.flip();
 638             dst.flip();
 639             if (!src.equals(dst)) {
 640                 throw new RuntimeException("Contents differ");
 641             }
 642 
 643             // check write fails with ClosedChannelException
 644             try {
 645                 ch.read(dst).get();
 646                 throw new RuntimeException("ExecutionException expected");
 647             } catch (ExecutionException x) {
 648                 if (!(x.getCause() instanceof ClosedChannelException))
 649                     throw new RuntimeException("Cause of ClosedChannelException expected",
 650                             x.getCause());
 651             }
 652         }
 653     }
 654 
 655     // exercise gathering write
 656     static void testWrite2() throws Exception {
 657         System.out.println("-- write (2) --");
 658 
 659         try (Server server = new Server()) {
 660             final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 661             ch.connect(server.address()).get();
 662             SocketChannel sc = server.accept();
 663 
 664             // number of bytes written
 665             final AtomicLong bytesWritten = new AtomicLong(0);
 666 
 667             // write buffers (should complete immediately)
 668             ByteBuffer[] srcs = genBuffers(1);
 669             final CountDownLatch l1 = new CountDownLatch(1);
 670             ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, (Void)null,
 671                 new CompletionHandler<Long,Void>() {
 672                     public void completed(Long result, Void att) {
 673                         long n = result;
 674                         if (n <= 0)
 675                             throw new RuntimeException("No bytes read");
 676                         bytesWritten.addAndGet(n);
 677                         l1.countDown();
 678                     }
 679                     public void failed(Throwable exc, Void att) {
 680                     }
 681             });
 682             l1.await();
 683 
 684             // set to true to signal that no more buffers should be written
 685             final AtomicBoolean continueWriting = new AtomicBoolean(true);
 686 
 687             // write until socket buffer is full so as to create the conditions
 688             // for when a write does not complete immediately
 689             srcs = genBuffers(1);
 690             ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, (Void)null,
 691                 new CompletionHandler<Long,Void>() {
 692                     public void completed(Long result, Void att) {
 693                         long n = result;
 694                         if (n <= 0)
 695                             throw new RuntimeException("No bytes written");
 696                         bytesWritten.addAndGet(n);
 697                         if (continueWriting.get()) {
 698                             ByteBuffer[] srcs = genBuffers(8);
 699                             ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS,
 700                                 (Void)null, this);
 701                         }
 702                     }
 703                     public void failed(Throwable exc, Void att) {
 704                     }
 705             });
 706 
 707             // give time for socket buffer to fill up.
 708             Thread.sleep(5*1000);
 709 
 710             // signal handler to stop further writing
 711             continueWriting.set(false);
 712 
 713             // read until done
 714             ByteBuffer buf = ByteBuffer.allocateDirect(4096);
 715             long total = 0L;
 716             do {
 717                 int n = sc.read(buf);
 718                 if (n <= 0)
 719                     throw new RuntimeException("No bytes read");
 720                 buf.rewind();
 721                 total += n;
 722             } while (total < bytesWritten.get());
 723 
 724             ch.close();
 725             sc.close();
 726         }
 727     }
 728 
 729     static void testShutdown() throws Exception {
 730         System.out.println("-- shutdown --");
 731 
 732         try (Server server = new Server();
 733              AsynchronousSocketChannel ch = AsynchronousSocketChannel.open())
 734         {
 735             ch.connect(server.address()).get();
 736             try (SocketChannel peer = server.accept()) {
 737                 ByteBuffer buf = ByteBuffer.allocateDirect(1000);
 738                 int n;
 739 
 740                 // check read
 741                 ch.shutdownInput();
 742                 n = ch.read(buf).get();
 743                 if (n != -1)
 744                     throw new RuntimeException("-1 expected");
 745                 // check full with full buffer
 746                 buf.put(new byte[100]);
 747                 n = ch.read(buf).get();
 748                 if (n != -1)
 749                     throw new RuntimeException("-1 expected");
 750 
 751                 // check write
 752                 ch.shutdownOutput();
 753                 try {
 754                     ch.write(buf).get();
 755                     throw new RuntimeException("ClosedChannelException expected");
 756                 } catch (ExecutionException x) {
 757                     if (!(x.getCause() instanceof ClosedChannelException))
 758                         throw new RuntimeException("ClosedChannelException expected",
 759                                 x.getCause());
 760                 }
 761             }
 762         }
 763     }
 764 
 765     static void testTimeout() throws Exception {
 766         System.out.println("-- timeouts --");
 767         testTimeout(Integer.MIN_VALUE, TimeUnit.SECONDS);
 768         testTimeout(-1L, TimeUnit.SECONDS);
 769         testTimeout(0L, TimeUnit.SECONDS);
 770         testTimeout(2L, TimeUnit.SECONDS);
 771     }
 772 
 773     static void testTimeout(final long timeout, final TimeUnit unit) throws Exception {
 774         try (Server server = new Server()) {
 775             AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 776             ch.connect(server.address()).get();
 777 
 778             ByteBuffer dst = ByteBuffer.allocate(512);
 779 
 780             final AtomicReference<Throwable> readException = new AtomicReference<Throwable>();
 781 
 782             // this read should timeout if value is > 0
 783             ch.read(dst, timeout, unit, null, new CompletionHandler<Integer,Void>() {
 784                 public void completed(Integer result, Void att) {
 785                     readException.set(new RuntimeException("Should not complete"));
 786                 }
 787                 public void failed(Throwable exc, Void att) {
 788                     readException.set(exc);
 789                 }
 790             });
 791             if (timeout > 0L) {
 792                 // wait for exception
 793                 while (readException.get() == null) {
 794                     Thread.sleep(100);
 795                 }
 796                 if (!(readException.get() instanceof InterruptedByTimeoutException))
 797                     throw new RuntimeException("InterruptedByTimeoutException expected",
 798                             readException.get());
 799 
 800                 // after a timeout then further reading should throw unspecified runtime exception
 801                 boolean exceptionThrown = false;
 802                 try {
 803                     ch.read(dst);
 804                 } catch (RuntimeException x) {
 805                     exceptionThrown = true;
 806                 }
 807                 if (!exceptionThrown)
 808                     throw new RuntimeException("RuntimeException expected after timeout.");
 809             } else {
 810                 Thread.sleep(1000);
 811                 Throwable exc = readException.get();
 812                 if (exc != null)
 813                     throw new RuntimeException(exc);
 814             }
 815 
 816             final AtomicReference<Throwable> writeException = new AtomicReference<Throwable>();
 817 
 818             // write bytes to fill socket buffer
 819             ch.write(genBuffer(), timeout, unit, ch,
 820                 new CompletionHandler<Integer,AsynchronousSocketChannel>()
 821             {
 822                 public void completed(Integer result, AsynchronousSocketChannel ch) {
 823                     ch.write(genBuffer(), timeout, unit, ch, this);
 824                 }
 825                 public void failed(Throwable exc, AsynchronousSocketChannel ch) {
 826                     writeException.set(exc);
 827                 }
 828             });
 829             if (timeout > 0) {
 830                 // wait for exception
 831                 while (writeException.get() == null) {
 832                     Thread.sleep(100);
 833                 }
 834                 if (!(writeException.get() instanceof InterruptedByTimeoutException))
 835                     throw new RuntimeException("InterruptedByTimeoutException expected",
 836                             writeException.get());
 837 
 838                 // after a timeout then further writing should throw unspecified runtime exception
 839                 boolean exceptionThrown = false;
 840                 try {
 841                     ch.write(genBuffer());
 842                 } catch (RuntimeException x) {
 843                     exceptionThrown = true;
 844                 }
 845                 if (!exceptionThrown)
 846                     throw new RuntimeException("RuntimeException expected after timeout.");
 847             } else {
 848                 Thread.sleep(1000);
 849                 Throwable exc = writeException.get();
 850                 if (exc != null)
 851                     throw new RuntimeException(exc);
 852             }
 853 
 854             // clean-up
 855             server.accept().close();
 856             ch.close();
 857         }
 858     }
 859 
 860     // returns ByteBuffer with random bytes
 861     static ByteBuffer genBuffer() {
 862         int size = 1024 + rand.nextInt(16000);
 863         byte[] buf = new byte[size];
 864         rand.nextBytes(buf);
 865         boolean useDirect = rand.nextBoolean();
 866         if (useDirect) {
 867             ByteBuffer bb = ByteBuffer.allocateDirect(buf.length);
 868             bb.put(buf);
 869             bb.flip();
 870             return bb;
 871         } else {
 872             return ByteBuffer.wrap(buf);
 873         }
 874     }
 875 
 876     // return ByteBuffer[] with random bytes
 877     static ByteBuffer[] genBuffers(int max) {
 878         int len = 1;
 879         if (max > 1)
 880             len += rand.nextInt(max);
 881         ByteBuffer[] bufs = new ByteBuffer[len];
 882         for (int i=0; i<len; i++)
 883             bufs[i] = genBuffer();
 884         return bufs;
 885     }
 886 
 887     // return random SocketAddress
 888     static SocketAddress genSocketAddress() {
 889         StringBuilder sb = new StringBuilder("10.");
 890         sb.append(rand.nextInt(256));
 891         sb.append('.');
 892         sb.append(rand.nextInt(256));
 893         sb.append('.');
 894         sb.append(rand.nextInt(256));
 895         InetAddress rh;
 896         try {
 897             rh = InetAddress.getByName(sb.toString());
 898         } catch (UnknownHostException x) {
 899             throw new InternalError("Should not happen");
 900         }
 901         return new InetSocketAddress(rh, rand.nextInt(65535)+1);
 902     }
 903 }