1 /*
   2  * Copyright (c) 2008, 2011, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 /* @test
  25  * @bug 4607272 6842687 6878369 6944810 7023403
  26  * @summary Unit test for AsynchronousSocketChannel
  27  * @run main Basic -skipSlowConnectTest
  28  * @key randomness
  29  */
  30 
  31 import java.nio.ByteBuffer;
  32 import java.nio.channels.*;
  33 import static java.net.StandardSocketOptions.*;
  34 import java.net.*;
  35 import java.util.Random;
  36 import java.util.concurrent.*;
  37 import java.util.concurrent.atomic.*;
  38 import java.io.Closeable;
  39 import java.io.IOException;
  40 
  41 public class Basic {
  42     static final Random rand = new Random();
  43 
  44     static boolean skipSlowConnectTest = false;
  45 
  46     public static void main(String[] args) throws Exception {
  47         for (String arg: args) {
  48             switch (arg) {
  49             case "-skipSlowConnectTest" :
  50                 skipSlowConnectTest = true;
  51                 break;
  52             default:
  53                 throw new RuntimeException("Unrecognized argument: " + arg);
  54             }
  55         }
  56 
  57         testBind();
  58         testSocketOptions();
  59         testConnect();
  60         testCloseWhenPending();
  61         testCancel();
  62         testRead1();
  63         testRead2();
  64         testRead3();
  65         testWrite1();
  66         testWrite2();
  67         // skip timeout tests until 7052549 is fixed
  68         if (!System.getProperty("os.name").startsWith("Windows"))
  69             testTimeout();
  70         testShutdown();
  71     }
  72 
  73     static class Server implements Closeable {
  74         private final ServerSocketChannel ssc;
  75         private final InetSocketAddress address;
  76 
  77         Server() throws IOException {
  78             ssc = ServerSocketChannel.open().bind(new InetSocketAddress(0));
  79 
  80             InetAddress lh = InetAddress.getLocalHost();
  81             int port = ((InetSocketAddress)(ssc.getLocalAddress())).getPort();
  82             address = new InetSocketAddress(lh, port);
  83         }
  84 
  85         InetSocketAddress address() {
  86             return address;
  87         }
  88 
  89         SocketChannel accept() throws IOException {
  90             return ssc.accept();
  91         }
  92 
  93         public void close() throws IOException {
  94             ssc.close();
  95         }
  96 
  97     }
  98 
  99     static void testBind() throws Exception {
 100         System.out.println("-- bind --");
 101 
 102         try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
 103             if (ch.getLocalAddress() != null)
 104                 throw new RuntimeException("Local address should be 'null'");
 105             ch.bind(new InetSocketAddress(0));
 106 
 107             // check local address after binding
 108             InetSocketAddress local = (InetSocketAddress)ch.getLocalAddress();
 109             if (local.getPort() == 0)
 110                 throw new RuntimeException("Unexpected port");
 111             if (!local.getAddress().isAnyLocalAddress())
 112                 throw new RuntimeException("Not bound to a wildcard address");
 113 
 114             // try to re-bind
 115             try {
 116                 ch.bind(new InetSocketAddress(0));
 117                 throw new RuntimeException("AlreadyBoundException expected");
 118             } catch (AlreadyBoundException x) {
 119             }
 120         }
 121 
 122         // check ClosedChannelException
 123         AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 124         ch.close();
 125         try {
 126             ch.bind(new InetSocketAddress(0));
 127             throw new RuntimeException("ClosedChannelException  expected");
 128         } catch (ClosedChannelException  x) {
 129         }
 130     }
 131 
 132     static void testSocketOptions() throws Exception {
 133         System.out.println("-- socket options --");
 134 
 135         try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
 136             ch.setOption(SO_RCVBUF, 128*1024)
 137               .setOption(SO_SNDBUF, 128*1024)
 138               .setOption(SO_REUSEADDR, true);
 139 
 140             // check SO_SNDBUF/SO_RCVBUF limits
 141             int before, after;
 142             before = ch.getOption(SO_SNDBUF);
 143             after = ch.setOption(SO_SNDBUF, Integer.MAX_VALUE).getOption(SO_SNDBUF);
 144             if (after < before)
 145                 throw new RuntimeException("setOption caused SO_SNDBUF to decrease");
 146             before = ch.getOption(SO_RCVBUF);
 147             after = ch.setOption(SO_RCVBUF, Integer.MAX_VALUE).getOption(SO_RCVBUF);
 148             if (after < before)
 149                 throw new RuntimeException("setOption caused SO_RCVBUF to decrease");
 150 
 151             ch.bind(new InetSocketAddress(0));
 152 
 153             // default values
 154             if (ch.getOption(SO_KEEPALIVE))
 155                 throw new RuntimeException("Default of SO_KEEPALIVE should be 'false'");
 156             if (ch.getOption(TCP_NODELAY))
 157                 throw new RuntimeException("Default of TCP_NODELAY should be 'false'");
 158 
 159             // set and check
 160             if (!ch.setOption(SO_KEEPALIVE, true).getOption(SO_KEEPALIVE))
 161                 throw new RuntimeException("SO_KEEPALIVE did not change");
 162             if (!ch.setOption(TCP_NODELAY, true).getOption(TCP_NODELAY))
 163                 throw new RuntimeException("SO_KEEPALIVE did not change");
 164 
 165             // read others (can't check as actual value is implementation dependent)
 166             ch.getOption(SO_RCVBUF);
 167             ch.getOption(SO_SNDBUF);
 168         }
 169     }
 170 
 171     static void testConnect() throws Exception {
 172         System.out.println("-- connect --");
 173 
 174         SocketAddress address;
 175 
 176         try (Server server = new Server()) {
 177             address = server.address();
 178 
 179             // connect to server and check local/remote addresses
 180             try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
 181                 ch.connect(address).get();
 182                 // check local address
 183                 if (ch.getLocalAddress() == null)
 184                     throw new RuntimeException("Not bound to local address");
 185 
 186                 // check remote address
 187                 InetSocketAddress remote = (InetSocketAddress)ch.getRemoteAddress();
 188                 if (remote.getPort() != server.address().getPort())
 189                     throw new RuntimeException("Connected to unexpected port");
 190                 if (!remote.getAddress().equals(server.address().getAddress()))
 191                     throw new RuntimeException("Connected to unexpected address");
 192 
 193                 // try to connect again
 194                 try {
 195                     ch.connect(server.address()).get();
 196                     throw new RuntimeException("AlreadyConnectedException expected");
 197                 } catch (AlreadyConnectedException x) {
 198                 }
 199 
 200                 // clean-up
 201                 server.accept().close();
 202             }
 203 
 204             // check that connect fails with ClosedChannelException
 205             AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 206             ch.close();
 207             try {
 208                 ch.connect(server.address()).get();
 209                 throw new RuntimeException("ExecutionException expected");
 210             } catch (ExecutionException x) {
 211                 if (!(x.getCause() instanceof ClosedChannelException))
 212                     throw new RuntimeException("Cause of ClosedChannelException expected");
 213             }
 214             final AtomicReference<Throwable> connectException = new AtomicReference<>();
 215             ch.connect(server.address(), (Void)null, new CompletionHandler<Void,Void>() {
 216                 public void completed(Void result, Void att) {
 217                 }
 218                 public void failed(Throwable exc, Void att) {
 219                     connectException.set(exc);
 220                 }
 221             });
 222             while (connectException.get() == null) {
 223                 Thread.sleep(100);
 224             }
 225             if (!(connectException.get() instanceof ClosedChannelException))
 226                 throw new RuntimeException("ClosedChannelException expected");
 227         }
 228 
 229         // test that failure to connect closes the channel
 230         try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
 231             try {
 232                 ch.connect(address).get();
 233             } catch (ExecutionException x) {
 234                 // failed to establish connection
 235                 if (ch.isOpen())
 236                     throw new RuntimeException("Channel should be closed");
 237             }
 238         }
 239 
 240         // repeat test by connecting to a (probably) non-existent host. This
 241         // improves the chance that the connect will not fail immediately.
 242         if (!skipSlowConnectTest) {
 243             try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
 244                 try {
 245                     ch.connect(genSocketAddress()).get();
 246                 } catch (ExecutionException x) {
 247                     // failed to establish connection
 248                     if (ch.isOpen())
 249                         throw new RuntimeException("Channel should be closed");
 250                 }
 251             }
 252         }
 253     }
 254 
 255     static void testCloseWhenPending() throws Exception {
 256         System.out.println("-- asynchronous close when connecting --");
 257 
 258         AsynchronousSocketChannel ch;
 259 
 260         // asynchronous close while connecting
 261         ch = AsynchronousSocketChannel.open();
 262         Future<Void> connectResult = ch.connect(genSocketAddress());
 263 
 264         // give time to initiate the connect (SYN)
 265         Thread.sleep(50);
 266 
 267         // close
 268         ch.close();
 269 
 270         // check that exception is thrown in timely manner
 271         try {
 272             connectResult.get(5, TimeUnit.SECONDS);
 273         } catch (TimeoutException x) {
 274             throw new RuntimeException("AsynchronousCloseException not thrown");
 275         } catch (ExecutionException x) {
 276             // expected
 277         }
 278 
 279         System.out.println("-- asynchronous close when reading --");
 280 
 281         try (Server server = new Server()) {
 282             ch = AsynchronousSocketChannel.open();
 283             ch.connect(server.address()).get();
 284 
 285             ByteBuffer dst = ByteBuffer.allocateDirect(100);
 286             Future<Integer> result = ch.read(dst);
 287 
 288             // attempt a second read - should fail with ReadPendingException
 289             ByteBuffer buf = ByteBuffer.allocateDirect(100);
 290             try {
 291                 ch.read(buf);
 292                 throw new RuntimeException("ReadPendingException expected");
 293             } catch (ReadPendingException x) {
 294             }
 295 
 296             // close channel (should cause initial read to complete)
 297             ch.close();
 298             server.accept().close();
 299 
 300             // check that AsynchronousCloseException is thrown
 301             try {
 302                 result.get();
 303                 throw new RuntimeException("Should not read");
 304             } catch (ExecutionException x) {
 305                 if (!(x.getCause() instanceof AsynchronousCloseException))
 306                     throw new RuntimeException(x);
 307             }
 308 
 309             System.out.println("-- asynchronous close when writing --");
 310 
 311             ch = AsynchronousSocketChannel.open();
 312             ch.connect(server.address()).get();
 313 
 314             final AtomicReference<Throwable> writeException =
 315                 new AtomicReference<Throwable>();
 316 
 317             // write bytes to fill socket buffer
 318             ch.write(genBuffer(), ch, new CompletionHandler<Integer,AsynchronousSocketChannel>() {
 319                 public void completed(Integer result, AsynchronousSocketChannel ch) {
 320                     ch.write(genBuffer(), ch, this);
 321                 }
 322                 public void failed(Throwable x, AsynchronousSocketChannel ch) {
 323                     writeException.set(x);
 324                 }
 325             });
 326 
 327             // give time for socket buffer to fill up.
 328             Thread.sleep(5*1000);
 329 
 330             //  attempt a concurrent write - should fail with WritePendingException
 331             try {
 332                 ch.write(genBuffer());
 333                 throw new RuntimeException("WritePendingException expected");
 334             } catch (WritePendingException x) {
 335             }
 336 
 337             // close channel - should cause initial write to complete
 338             ch.close();
 339             server.accept().close();
 340 
 341             // wait for exception
 342             while (writeException.get() == null) {
 343                 Thread.sleep(100);
 344             }
 345             if (!(writeException.get() instanceof AsynchronousCloseException))
 346                 throw new RuntimeException("AsynchronousCloseException expected");
 347         }
 348     }
 349 
 350     static void testCancel() throws Exception {
 351         System.out.println("-- cancel --");
 352 
 353         try (Server server = new Server()) {
 354             for (int i=0; i<2; i++) {
 355                 boolean mayInterruptIfRunning = (i == 0) ? false : true;
 356 
 357                 // establish loopback connection
 358                 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 359                 ch.connect(server.address()).get();
 360                 SocketChannel peer = server.accept();
 361 
 362                 // start read operation
 363                 ByteBuffer buf = ByteBuffer.allocate(1);
 364                 Future<Integer> res = ch.read(buf);
 365 
 366                 // cancel operation
 367                 boolean cancelled = res.cancel(mayInterruptIfRunning);
 368 
 369                 // check post-conditions
 370                 if (!res.isDone())
 371                     throw new RuntimeException("isDone should return true");
 372                 if (res.isCancelled() != cancelled)
 373                     throw new RuntimeException("isCancelled not consistent");
 374                 try {
 375                     res.get();
 376                     throw new RuntimeException("CancellationException expected");
 377                 } catch (CancellationException x) {
 378                 }
 379                 try {
 380                     res.get(1, TimeUnit.SECONDS);
 381                     throw new RuntimeException("CancellationException expected");
 382                 } catch (CancellationException x) {
 383                 }
 384 
 385                 // check that the cancel doesn't impact writing to the channel
 386                 if (!mayInterruptIfRunning) {
 387                     buf = ByteBuffer.wrap("a".getBytes());
 388                     ch.write(buf).get();
 389                 }
 390 
 391                 ch.close();
 392                 peer.close();
 393             }
 394         }
 395     }
 396 
 397     static void testRead1() throws Exception {
 398         System.out.println("-- read (1) --");
 399 
 400         try (Server server = new Server()) {
 401             final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 402             ch.connect(server.address()).get();
 403 
 404             // read with 0 bytes remaining should complete immediately
 405             ByteBuffer buf = ByteBuffer.allocate(1);
 406             buf.put((byte)0);
 407             int n = ch.read(buf).get();
 408             if (n != 0)
 409                 throw new RuntimeException("0 expected");
 410 
 411             // write bytes and close connection
 412             ByteBuffer src = genBuffer();
 413             try (SocketChannel sc = server.accept()) {
 414                 sc.setOption(SO_SNDBUF, src.remaining());
 415                 while (src.hasRemaining())
 416                     sc.write(src);
 417             }
 418 
 419             // reads should complete immediately
 420             final ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity() + 100);
 421             final CountDownLatch latch = new CountDownLatch(1);
 422             ch.read(dst, (Void)null, new CompletionHandler<Integer,Void>() {
 423                 public void completed(Integer result, Void att) {
 424                     int n = result;
 425                     if (n > 0) {
 426                         ch.read(dst, (Void)null, this);
 427                     } else {
 428                         latch.countDown();
 429                     }
 430                 }
 431                 public void failed(Throwable exc, Void att) {
 432                 }
 433             });
 434 
 435             latch.await();
 436 
 437             // check buffers
 438             src.flip();
 439             dst.flip();
 440             if (!src.equals(dst)) {
 441                 throw new RuntimeException("Contents differ");
 442             }
 443 
 444             // close channel
 445             ch.close();
 446 
 447             // check read fails with ClosedChannelException
 448             try {
 449                 ch.read(dst).get();
 450                 throw new RuntimeException("ExecutionException expected");
 451             } catch (ExecutionException x) {
 452                 if (!(x.getCause() instanceof ClosedChannelException))
 453                     throw new RuntimeException("Cause of ClosedChannelException expected");
 454             }
 455         }
 456     }
 457 
 458     static void testRead2() throws Exception {
 459         System.out.println("-- read (2) --");
 460 
 461         try (Server server = new Server()) {
 462             final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 463             ch.connect(server.address()).get();
 464             SocketChannel sc = server.accept();
 465 
 466             ByteBuffer src = genBuffer();
 467 
 468             // read until the buffer is full
 469             final ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity());
 470             final CountDownLatch latch = new CountDownLatch(1);
 471             ch.read(dst, (Void)null, new CompletionHandler<Integer,Void>() {
 472                 public void completed(Integer result, Void att) {
 473                     if (dst.hasRemaining()) {
 474                         ch.read(dst, (Void)null, this);
 475                     } else {
 476                         latch.countDown();
 477                     }
 478                 }
 479                 public void failed(Throwable exc, Void att) {
 480                 }
 481             });
 482 
 483             // trickle the writing
 484             do {
 485                 int rem = src.remaining();
 486                 int size = (rem <= 100) ? rem : 50 + rand.nextInt(rem - 100);
 487                 ByteBuffer buf = ByteBuffer.allocate(size);
 488                 for (int i=0; i<size; i++)
 489                     buf.put(src.get());
 490                 buf.flip();
 491                 Thread.sleep(50 + rand.nextInt(1500));
 492                 while (buf.hasRemaining())
 493                     sc.write(buf);
 494             } while (src.hasRemaining());
 495 
 496             // wait until ascynrhonous reading has completed
 497             latch.await();
 498 
 499             // check buffers
 500             src.flip();
 501             dst.flip();
 502             if (!src.equals(dst)) {
 503                throw new RuntimeException("Contents differ");
 504             }
 505 
 506             sc.close();
 507             ch.close();
 508         }
 509     }
 510 
 511     // exercise scattering read
 512     static void testRead3() throws Exception {
 513         System.out.println("-- read (3) --");
 514 
 515         try (Server server = new Server()) {
 516             final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 517             ch.connect(server.address()).get();
 518             SocketChannel sc = server.accept();
 519 
 520             ByteBuffer[] dsts = new ByteBuffer[3];
 521             for (int i=0; i<dsts.length; i++) {
 522                 dsts[i] = ByteBuffer.allocateDirect(100);
 523             }
 524 
 525             // scattering read that completes ascynhronously
 526             final CountDownLatch l1 = new CountDownLatch(1);
 527             ch.read(dsts, 0, dsts.length, 0L, TimeUnit.SECONDS, (Void)null,
 528                 new CompletionHandler<Long,Void>() {
 529                     public void completed(Long result, Void att) {
 530                         long n = result;
 531                         if (n <= 0)
 532                             throw new RuntimeException("No bytes read");
 533                         l1.countDown();
 534                     }
 535                     public void failed(Throwable exc, Void att) {
 536                     }
 537             });
 538 
 539             // write some bytes
 540             sc.write(genBuffer());
 541 
 542             // read should now complete
 543             l1.await();
 544 
 545             // write more bytes
 546             sc.write(genBuffer());
 547 
 548             // read should complete immediately
 549             for (int i=0; i<dsts.length; i++) {
 550                 dsts[i].rewind();
 551             }
 552 
 553             final CountDownLatch l2 = new CountDownLatch(1);
 554             ch.read(dsts, 0, dsts.length, 0L, TimeUnit.SECONDS, (Void)null,
 555                 new CompletionHandler<Long,Void>() {
 556                     public void completed(Long result, Void att) {
 557                         long n = result;
 558                         if (n <= 0)
 559                             throw new RuntimeException("No bytes read");
 560                         l2.countDown();
 561                     }
 562                     public void failed(Throwable exc, Void att) {
 563                     }
 564             });
 565             l2.await();
 566 
 567             ch.close();
 568             sc.close();
 569         }
 570     }
 571 
 572     static void testWrite1() throws Exception {
 573         System.out.println("-- write (1) --");
 574 
 575         try (Server server = new Server()) {
 576             final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 577             ch.connect(server.address()).get();
 578             SocketChannel sc = server.accept();
 579 
 580             // write with 0 bytes remaining should complete immediately
 581             ByteBuffer buf = ByteBuffer.allocate(1);
 582             buf.put((byte)0);
 583             int n = ch.write(buf).get();
 584             if (n != 0)
 585                 throw new RuntimeException("0 expected");
 586 
 587             // write all bytes and close connection when done
 588             final ByteBuffer src = genBuffer();
 589             ch.write(src, (Void)null, new CompletionHandler<Integer,Void>() {
 590                 public void completed(Integer result, Void att) {
 591                     if (src.hasRemaining()) {
 592                         ch.write(src, (Void)null, this);
 593                     } else {
 594                         try {
 595                             ch.close();
 596                         } catch (IOException ignore) { }
 597                     }
 598                 }
 599                 public void failed(Throwable exc, Void att) {
 600                 }
 601             });
 602 
 603             // read to EOF or buffer full
 604             ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity() + 100);
 605             do {
 606                 n = sc.read(dst);
 607             } while (n > 0);
 608             sc.close();
 609 
 610             // check buffers
 611             src.flip();
 612             dst.flip();
 613             if (!src.equals(dst)) {
 614                 throw new RuntimeException("Contents differ");
 615             }
 616 
 617             // check write fails with ClosedChannelException
 618             try {
 619                 ch.read(dst).get();
 620                 throw new RuntimeException("ExecutionException expected");
 621             } catch (ExecutionException x) {
 622                 if (!(x.getCause() instanceof ClosedChannelException))
 623                     throw new RuntimeException("Cause of ClosedChannelException expected");
 624             }
 625         }
 626     }
 627 
 628     // exercise gathering write
 629     static void testWrite2() throws Exception {
 630         System.out.println("-- write (2) --");
 631 
 632         try (Server server = new Server()) {
 633             final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 634             ch.connect(server.address()).get();
 635             SocketChannel sc = server.accept();
 636 
 637             // number of bytes written
 638             final AtomicLong bytesWritten = new AtomicLong(0);
 639 
 640             // write buffers (should complete immediately)
 641             ByteBuffer[] srcs = genBuffers(1);
 642             final CountDownLatch l1 = new CountDownLatch(1);
 643             ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, (Void)null,
 644                 new CompletionHandler<Long,Void>() {
 645                     public void completed(Long result, Void att) {
 646                         long n = result;
 647                         if (n <= 0)
 648                             throw new RuntimeException("No bytes read");
 649                         bytesWritten.addAndGet(n);
 650                         l1.countDown();
 651                     }
 652                     public void failed(Throwable exc, Void att) {
 653                     }
 654             });
 655             l1.await();
 656 
 657             // set to true to signal that no more buffers should be written
 658             final AtomicBoolean continueWriting = new AtomicBoolean(true);
 659 
 660             // write until socket buffer is full so as to create the conditions
 661             // for when a write does not complete immediately
 662             srcs = genBuffers(1);
 663             ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, (Void)null,
 664                 new CompletionHandler<Long,Void>() {
 665                     public void completed(Long result, Void att) {
 666                         long n = result;
 667                         if (n <= 0)
 668                             throw new RuntimeException("No bytes written");
 669                         bytesWritten.addAndGet(n);
 670                         if (continueWriting.get()) {
 671                             ByteBuffer[] srcs = genBuffers(8);
 672                             ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS,
 673                                 (Void)null, this);
 674                         }
 675                     }
 676                     public void failed(Throwable exc, Void att) {
 677                     }
 678             });
 679 
 680             // give time for socket buffer to fill up.
 681             Thread.sleep(5*1000);
 682 
 683             // signal handler to stop further writing
 684             continueWriting.set(false);
 685 
 686             // read until done
 687             ByteBuffer buf = ByteBuffer.allocateDirect(4096);
 688             long total = 0L;
 689             do {
 690                 int n = sc.read(buf);
 691                 if (n <= 0)
 692                     throw new RuntimeException("No bytes read");
 693                 buf.rewind();
 694                 total += n;
 695             } while (total < bytesWritten.get());
 696 
 697             ch.close();
 698             sc.close();
 699         }
 700     }
 701 
 702     static void testShutdown() throws Exception {
 703         System.out.println("-- shutdown--");
 704 
 705         try (Server server = new Server();
 706              AsynchronousSocketChannel ch = AsynchronousSocketChannel.open())
 707         {
 708             ch.connect(server.address()).get();
 709             try (SocketChannel peer = server.accept()) {
 710                 ByteBuffer buf = ByteBuffer.allocateDirect(1000);
 711                 int n;
 712 
 713                 // check read
 714                 ch.shutdownInput();
 715                 n = ch.read(buf).get();
 716                 if (n != -1)
 717                     throw new RuntimeException("-1 expected");
 718                 // check full with full buffer
 719                 buf.put(new byte[100]);
 720                 n = ch.read(buf).get();
 721                 if (n != -1)
 722                     throw new RuntimeException("-1 expected");
 723 
 724                 // check write
 725                 ch.shutdownOutput();
 726                 try {
 727                     ch.write(buf).get();
 728                     throw new RuntimeException("ClosedChannelException expected");
 729                 } catch (ExecutionException x) {
 730                     if (!(x.getCause() instanceof ClosedChannelException))
 731                         throw new RuntimeException("ClosedChannelException expected");
 732                 }
 733             }
 734         }
 735     }
 736 
 737     static void testTimeout() throws Exception {
 738         System.out.println("-- timeouts --");
 739         testTimeout(Integer.MIN_VALUE, TimeUnit.SECONDS);
 740         testTimeout(-1L, TimeUnit.SECONDS);
 741         testTimeout(0L, TimeUnit.SECONDS);
 742         testTimeout(2L, TimeUnit.SECONDS);
 743     }
 744 
 745     static void testTimeout(final long timeout, final TimeUnit unit) throws Exception {
 746         try (Server server = new Server()) {
 747             AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 748             ch.connect(server.address()).get();
 749 
 750             ByteBuffer dst = ByteBuffer.allocate(512);
 751 
 752             final AtomicReference<Throwable> readException = new AtomicReference<Throwable>();
 753 
 754             // this read should timeout if value is > 0
 755             ch.read(dst, timeout, unit, null, new CompletionHandler<Integer,Void>() {
 756                 public void completed(Integer result, Void att) {
 757                     readException.set(new RuntimeException("Should not complete"));
 758                 }
 759                 public void failed(Throwable exc, Void att) {
 760                     readException.set(exc);
 761                 }
 762             });
 763             if (timeout > 0L) {
 764                 // wait for exception
 765                 while (readException.get() == null) {
 766                     Thread.sleep(100);
 767                 }
 768                 if (!(readException.get() instanceof InterruptedByTimeoutException))
 769                     throw new RuntimeException("InterruptedByTimeoutException expected");
 770 
 771                 // after a timeout then further reading should throw unspecified runtime exception
 772                 boolean exceptionThrown = false;
 773                 try {
 774                     ch.read(dst);
 775                 } catch (RuntimeException x) {
 776                     exceptionThrown = true;
 777                 }
 778                 if (!exceptionThrown)
 779                     throw new RuntimeException("RuntimeException expected after timeout.");
 780             } else {
 781                 Thread.sleep(1000);
 782                 Throwable exc = readException.get();
 783                 if (exc != null)
 784                     throw new RuntimeException(exc);
 785             }
 786 
 787             final AtomicReference<Throwable> writeException = new AtomicReference<Throwable>();
 788 
 789             // write bytes to fill socket buffer
 790             ch.write(genBuffer(), timeout, unit, ch,
 791                 new CompletionHandler<Integer,AsynchronousSocketChannel>()
 792             {
 793                 public void completed(Integer result, AsynchronousSocketChannel ch) {
 794                     ch.write(genBuffer(), timeout, unit, ch, this);
 795                 }
 796                 public void failed(Throwable exc, AsynchronousSocketChannel ch) {
 797                     writeException.set(exc);
 798                 }
 799             });
 800             if (timeout > 0) {
 801                 // wait for exception
 802                 while (writeException.get() == null) {
 803                     Thread.sleep(100);
 804                 }
 805                 if (!(writeException.get() instanceof InterruptedByTimeoutException))
 806                     throw new RuntimeException("InterruptedByTimeoutException expected");
 807 
 808                 // after a timeout then further writing should throw unspecified runtime exception
 809                 boolean exceptionThrown = false;
 810                 try {
 811                     ch.write(genBuffer());
 812                 } catch (RuntimeException x) {
 813                     exceptionThrown = true;
 814                 }
 815                 if (!exceptionThrown)
 816                     throw new RuntimeException("RuntimeException expected after timeout.");
 817             } else {
 818                 Thread.sleep(1000);
 819                 Throwable exc = writeException.get();
 820                 if (exc != null)
 821                     throw new RuntimeException(exc);
 822             }
 823 
 824             // clean-up
 825             server.accept().close();
 826             ch.close();
 827         }
 828     }
 829 
 830     // returns ByteBuffer with random bytes
 831     static ByteBuffer genBuffer() {
 832         int size = 1024 + rand.nextInt(16000);
 833         byte[] buf = new byte[size];
 834         rand.nextBytes(buf);
 835         boolean useDirect = rand.nextBoolean();
 836         if (useDirect) {
 837             ByteBuffer bb = ByteBuffer.allocateDirect(buf.length);
 838             bb.put(buf);
 839             bb.flip();
 840             return bb;
 841         } else {
 842             return ByteBuffer.wrap(buf);
 843         }
 844     }
 845 
 846     // return ByteBuffer[] with random bytes
 847     static ByteBuffer[] genBuffers(int max) {
 848         int len = 1;
 849         if (max > 1)
 850             len += rand.nextInt(max);
 851         ByteBuffer[] bufs = new ByteBuffer[len];
 852         for (int i=0; i<len; i++)
 853             bufs[i] = genBuffer();
 854         return bufs;
 855     }
 856 
 857     // return random SocketAddress
 858     static SocketAddress genSocketAddress() {
 859         StringBuilder sb = new StringBuilder("10.");
 860         sb.append(rand.nextInt(256));
 861         sb.append('.');
 862         sb.append(rand.nextInt(256));
 863         sb.append('.');
 864         sb.append(rand.nextInt(256));
 865         InetAddress rh;
 866         try {
 867             rh = InetAddress.getByName(sb.toString());
 868         } catch (UnknownHostException x) {
 869             throw new InternalError("Should not happen");
 870         }
 871         return new InetSocketAddress(rh, rand.nextInt(65535)+1);
 872     }
 873 }