1 /*
   2  * Copyright (c) 2008, 2016, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 /* @test
  25  * @bug 4607272 6842687 6878369 6944810 7023403
  26  * @summary Unit test for AsynchronousSocketChannel
  27  * @run main Basic -skipSlowConnectTest
  28  * @key randomness
  29  */
  30 
  31 import java.nio.ByteBuffer;
  32 import java.nio.channels.*;
  33 import static java.net.StandardSocketOptions.*;
  34 import java.net.*;
  35 import java.util.Random;
  36 import java.util.concurrent.*;
  37 import java.util.concurrent.atomic.*;
  38 import java.io.Closeable;
  39 import java.io.IOException;
  40 import java.util.Set;
  41 
  42 public class Basic {
  43     static final Random rand = new Random();
  44 
  45     static boolean skipSlowConnectTest = false;
  46 
  47     public static void main(String[] args) throws Exception {
  48         for (String arg: args) {
  49             switch (arg) {
  50             case "-skipSlowConnectTest" :
  51                 skipSlowConnectTest = true;
  52                 break;
  53             default:
  54                 throw new RuntimeException("Unrecognized argument: " + arg);
  55             }
  56         }
  57 
  58         testBind();
  59         testSocketOptions();
  60         testConnect();
  61         testCloseWhenPending();
  62         testCancel();
  63         testRead1();
  64         testRead2();
  65         testRead3();
  66         testWrite1();
  67         testWrite2();
  68         // skip timeout tests until 7052549 is fixed
  69         if (!System.getProperty("os.name").startsWith("Windows"))
  70             testTimeout();
  71         testShutdown();
  72     }
  73 
  74     static class Server implements Closeable {
  75         private final ServerSocketChannel ssc;
  76         private final InetSocketAddress address;
  77 
  78         Server() throws IOException {
  79             ssc = ServerSocketChannel.open().bind(new InetSocketAddress(0));
  80 
  81             InetAddress lh = InetAddress.getLocalHost();
  82             int port = ((InetSocketAddress)(ssc.getLocalAddress())).getPort();
  83             address = new InetSocketAddress(lh, port);
  84         }
  85 
  86         InetSocketAddress address() {
  87             return address;
  88         }
  89 
  90         SocketChannel accept() throws IOException {
  91             return ssc.accept();
  92         }
  93 
  94         public void close() throws IOException {
  95             ssc.close();
  96         }
  97 
  98     }
  99 
 100     static void testBind() throws Exception {
 101         System.out.println("-- bind --");
 102 
 103         try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
 104             if (ch.getLocalAddress() != null)
 105                 throw new RuntimeException("Local address should be 'null'");
 106             ch.bind(new InetSocketAddress(0));
 107 
 108             // check local address after binding
 109             InetSocketAddress local = (InetSocketAddress)ch.getLocalAddress();
 110             if (local.getPort() == 0)
 111                 throw new RuntimeException("Unexpected port");
 112             if (!local.getAddress().isAnyLocalAddress())
 113                 throw new RuntimeException("Not bound to a wildcard address");
 114 
 115             // try to re-bind
 116             try {
 117                 ch.bind(new InetSocketAddress(0));
 118                 throw new RuntimeException("AlreadyBoundException expected");
 119             } catch (AlreadyBoundException x) {
 120             }
 121         }
 122 
 123         // check ClosedChannelException
 124         AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 125         ch.close();
 126         try {
 127             ch.bind(new InetSocketAddress(0));
 128             throw new RuntimeException("ClosedChannelException  expected");
 129         } catch (ClosedChannelException  x) {
 130         }
 131     }
 132 
 133     static void testSocketOptions() throws Exception {
 134         System.out.println("-- socket options --");
 135 
 136         try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
 137             ch.setOption(SO_RCVBUF, 128*1024)
 138               .setOption(SO_SNDBUF, 128*1024)
 139               .setOption(SO_REUSEADDR, true);
 140 
 141             // check SO_SNDBUF/SO_RCVBUF limits
 142             int before, after;
 143             before = ch.getOption(SO_SNDBUF);
 144             after = ch.setOption(SO_SNDBUF, Integer.MAX_VALUE).getOption(SO_SNDBUF);
 145             if (after < before)
 146                 throw new RuntimeException("setOption caused SO_SNDBUF to decrease");
 147             before = ch.getOption(SO_RCVBUF);
 148             after = ch.setOption(SO_RCVBUF, Integer.MAX_VALUE).getOption(SO_RCVBUF);
 149             if (after < before)
 150                 throw new RuntimeException("setOption caused SO_RCVBUF to decrease");
 151 
 152             ch.bind(new InetSocketAddress(0));
 153 
 154             // default values
 155             if (ch.getOption(SO_KEEPALIVE))
 156                 throw new RuntimeException("Default of SO_KEEPALIVE should be 'false'");
 157             if (ch.getOption(TCP_NODELAY))
 158                 throw new RuntimeException("Default of TCP_NODELAY should be 'false'");
 159 
 160             // set and check
 161             if (!ch.setOption(SO_KEEPALIVE, true).getOption(SO_KEEPALIVE))
 162                 throw new RuntimeException("SO_KEEPALIVE did not change");
 163             if (!ch.setOption(TCP_NODELAY, true).getOption(TCP_NODELAY))
 164                 throw new RuntimeException("SO_KEEPALIVE did not change");
 165 
 166             // read others (can't check as actual value is implementation dependent)
 167             ch.getOption(SO_RCVBUF);
 168             ch.getOption(SO_SNDBUF);
 169             
 170             Set<SocketOption<?>> options = ch.supportedOptions();
 171             boolean quickAck = options.contains(jdk.net.ExtendedSocketOptions.SO_QUICKACK);
 172             if (quickAck) {
 173                 if (ch.getOption(jdk.net.ExtendedSocketOptions.SO_QUICKACK)) {
 174                     throw new RuntimeException("Default of SO_QUICKACK should be 'false'");
 175                 }
 176 
 177                 if (!ch.setOption(jdk.net.ExtendedSocketOptions.SO_QUICKACK, true).
 178                         getOption(jdk.net.ExtendedSocketOptions.SO_QUICKACK)) {
 179                     throw new RuntimeException("SO_QUICKACK did not change");
 180                 }
 181             }
 182         }
 183     }
 184 
 185     static void testConnect() throws Exception {
 186         System.out.println("-- connect --");
 187 
 188         SocketAddress address;
 189 
 190         try (Server server = new Server()) {
 191             address = server.address();
 192 
 193             // connect to server and check local/remote addresses
 194             try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
 195                 ch.connect(address).get();
 196                 // check local address
 197                 if (ch.getLocalAddress() == null)
 198                     throw new RuntimeException("Not bound to local address");
 199 
 200                 // check remote address
 201                 InetSocketAddress remote = (InetSocketAddress)ch.getRemoteAddress();
 202                 if (remote.getPort() != server.address().getPort())
 203                     throw new RuntimeException("Connected to unexpected port");
 204                 if (!remote.getAddress().equals(server.address().getAddress()))
 205                     throw new RuntimeException("Connected to unexpected address");
 206 
 207                 // try to connect again
 208                 try {
 209                     ch.connect(server.address()).get();
 210                     throw new RuntimeException("AlreadyConnectedException expected");
 211                 } catch (AlreadyConnectedException x) {
 212                 }
 213 
 214                 // clean-up
 215                 server.accept().close();
 216             }
 217 
 218             // check that connect fails with ClosedChannelException
 219             AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 220             ch.close();
 221             try {
 222                 ch.connect(server.address()).get();
 223                 throw new RuntimeException("ExecutionException expected");
 224             } catch (ExecutionException x) {
 225                 if (!(x.getCause() instanceof ClosedChannelException))
 226                     throw new RuntimeException("Cause of ClosedChannelException expected");
 227             }
 228             final AtomicReference<Throwable> connectException = new AtomicReference<>();
 229             ch.connect(server.address(), (Void)null, new CompletionHandler<Void,Void>() {
 230                 public void completed(Void result, Void att) {
 231                 }
 232                 public void failed(Throwable exc, Void att) {
 233                     connectException.set(exc);
 234                 }
 235             });
 236             while (connectException.get() == null) {
 237                 Thread.sleep(100);
 238             }
 239             if (!(connectException.get() instanceof ClosedChannelException))
 240                 throw new RuntimeException("ClosedChannelException expected");
 241         }
 242 
 243         // test that failure to connect closes the channel
 244         try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
 245             try {
 246                 ch.connect(address).get();
 247             } catch (ExecutionException x) {
 248                 // failed to establish connection
 249                 if (ch.isOpen())
 250                     throw new RuntimeException("Channel should be closed");
 251             }
 252         }
 253 
 254         // repeat test by connecting to a (probably) non-existent host. This
 255         // improves the chance that the connect will not fail immediately.
 256         if (!skipSlowConnectTest) {
 257             try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
 258                 try {
 259                     ch.connect(genSocketAddress()).get();
 260                 } catch (ExecutionException x) {
 261                     // failed to establish connection
 262                     if (ch.isOpen())
 263                         throw new RuntimeException("Channel should be closed");
 264                 }
 265             }
 266         }
 267     }
 268 
 269     static void testCloseWhenPending() throws Exception {
 270         System.out.println("-- asynchronous close when connecting --");
 271 
 272         AsynchronousSocketChannel ch;
 273 
 274         // asynchronous close while connecting
 275         ch = AsynchronousSocketChannel.open();
 276         Future<Void> connectResult = ch.connect(genSocketAddress());
 277 
 278         // give time to initiate the connect (SYN)
 279         Thread.sleep(50);
 280 
 281         // close
 282         ch.close();
 283 
 284         // check that exception is thrown in timely manner
 285         try {
 286             connectResult.get(5, TimeUnit.SECONDS);
 287         } catch (TimeoutException x) {
 288             throw new RuntimeException("AsynchronousCloseException not thrown");
 289         } catch (ExecutionException x) {
 290             // expected
 291         }
 292 
 293         System.out.println("-- asynchronous close when reading --");
 294 
 295         try (Server server = new Server()) {
 296             ch = AsynchronousSocketChannel.open();
 297             ch.connect(server.address()).get();
 298 
 299             ByteBuffer dst = ByteBuffer.allocateDirect(100);
 300             Future<Integer> result = ch.read(dst);
 301 
 302             // attempt a second read - should fail with ReadPendingException
 303             ByteBuffer buf = ByteBuffer.allocateDirect(100);
 304             try {
 305                 ch.read(buf);
 306                 throw new RuntimeException("ReadPendingException expected");
 307             } catch (ReadPendingException x) {
 308             }
 309 
 310             // close channel (should cause initial read to complete)
 311             ch.close();
 312             server.accept().close();
 313 
 314             // check that AsynchronousCloseException is thrown
 315             try {
 316                 result.get();
 317                 throw new RuntimeException("Should not read");
 318             } catch (ExecutionException x) {
 319                 if (!(x.getCause() instanceof AsynchronousCloseException))
 320                     throw new RuntimeException(x);
 321             }
 322 
 323             System.out.println("-- asynchronous close when writing --");
 324 
 325             ch = AsynchronousSocketChannel.open();
 326             ch.connect(server.address()).get();
 327 
 328             final AtomicReference<Throwable> writeException =
 329                 new AtomicReference<Throwable>();
 330 
 331             // write bytes to fill socket buffer
 332             ch.write(genBuffer(), ch, new CompletionHandler<Integer,AsynchronousSocketChannel>() {
 333                 public void completed(Integer result, AsynchronousSocketChannel ch) {
 334                     ch.write(genBuffer(), ch, this);
 335                 }
 336                 public void failed(Throwable x, AsynchronousSocketChannel ch) {
 337                     writeException.set(x);
 338                 }
 339             });
 340 
 341             // give time for socket buffer to fill up.
 342             Thread.sleep(5*1000);
 343 
 344             //  attempt a concurrent write - should fail with WritePendingException
 345             try {
 346                 ch.write(genBuffer());
 347                 throw new RuntimeException("WritePendingException expected");
 348             } catch (WritePendingException x) {
 349             }
 350 
 351             // close channel - should cause initial write to complete
 352             ch.close();
 353             server.accept().close();
 354 
 355             // wait for exception
 356             while (writeException.get() == null) {
 357                 Thread.sleep(100);
 358             }
 359             if (!(writeException.get() instanceof AsynchronousCloseException))
 360                 throw new RuntimeException("AsynchronousCloseException expected");
 361         }
 362     }
 363 
 364     static void testCancel() throws Exception {
 365         System.out.println("-- cancel --");
 366 
 367         try (Server server = new Server()) {
 368             for (int i=0; i<2; i++) {
 369                 boolean mayInterruptIfRunning = (i == 0) ? false : true;
 370 
 371                 // establish loopback connection
 372                 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 373                 ch.connect(server.address()).get();
 374                 SocketChannel peer = server.accept();
 375 
 376                 // start read operation
 377                 ByteBuffer buf = ByteBuffer.allocate(1);
 378                 Future<Integer> res = ch.read(buf);
 379 
 380                 // cancel operation
 381                 boolean cancelled = res.cancel(mayInterruptIfRunning);
 382 
 383                 // check post-conditions
 384                 if (!res.isDone())
 385                     throw new RuntimeException("isDone should return true");
 386                 if (res.isCancelled() != cancelled)
 387                     throw new RuntimeException("isCancelled not consistent");
 388                 try {
 389                     res.get();
 390                     throw new RuntimeException("CancellationException expected");
 391                 } catch (CancellationException x) {
 392                 }
 393                 try {
 394                     res.get(1, TimeUnit.SECONDS);
 395                     throw new RuntimeException("CancellationException expected");
 396                 } catch (CancellationException x) {
 397                 }
 398 
 399                 // check that the cancel doesn't impact writing to the channel
 400                 if (!mayInterruptIfRunning) {
 401                     buf = ByteBuffer.wrap("a".getBytes());
 402                     ch.write(buf).get();
 403                 }
 404 
 405                 ch.close();
 406                 peer.close();
 407             }
 408         }
 409     }
 410 
 411     static void testRead1() throws Exception {
 412         System.out.println("-- read (1) --");
 413 
 414         try (Server server = new Server()) {
 415             final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 416             ch.connect(server.address()).get();
 417 
 418             // read with 0 bytes remaining should complete immediately
 419             ByteBuffer buf = ByteBuffer.allocate(1);
 420             buf.put((byte)0);
 421             int n = ch.read(buf).get();
 422             if (n != 0)
 423                 throw new RuntimeException("0 expected");
 424 
 425             // write bytes and close connection
 426             ByteBuffer src = genBuffer();
 427             try (SocketChannel sc = server.accept()) {
 428                 sc.setOption(SO_SNDBUF, src.remaining());
 429                 while (src.hasRemaining())
 430                     sc.write(src);
 431             }
 432 
 433             // reads should complete immediately
 434             final ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity() + 100);
 435             final CountDownLatch latch = new CountDownLatch(1);
 436             ch.read(dst, (Void)null, new CompletionHandler<Integer,Void>() {
 437                 public void completed(Integer result, Void att) {
 438                     int n = result;
 439                     if (n > 0) {
 440                         ch.read(dst, (Void)null, this);
 441                     } else {
 442                         latch.countDown();
 443                     }
 444                 }
 445                 public void failed(Throwable exc, Void att) {
 446                 }
 447             });
 448 
 449             latch.await();
 450 
 451             // check buffers
 452             src.flip();
 453             dst.flip();
 454             if (!src.equals(dst)) {
 455                 throw new RuntimeException("Contents differ");
 456             }
 457 
 458             // close channel
 459             ch.close();
 460 
 461             // check read fails with ClosedChannelException
 462             try {
 463                 ch.read(dst).get();
 464                 throw new RuntimeException("ExecutionException expected");
 465             } catch (ExecutionException x) {
 466                 if (!(x.getCause() instanceof ClosedChannelException))
 467                     throw new RuntimeException("Cause of ClosedChannelException expected");
 468             }
 469         }
 470     }
 471 
 472     static void testRead2() throws Exception {
 473         System.out.println("-- read (2) --");
 474 
 475         try (Server server = new Server()) {
 476             final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 477             ch.connect(server.address()).get();
 478             SocketChannel sc = server.accept();
 479 
 480             ByteBuffer src = genBuffer();
 481 
 482             // read until the buffer is full
 483             final ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity());
 484             final CountDownLatch latch = new CountDownLatch(1);
 485             ch.read(dst, (Void)null, new CompletionHandler<Integer,Void>() {
 486                 public void completed(Integer result, Void att) {
 487                     if (dst.hasRemaining()) {
 488                         ch.read(dst, (Void)null, this);
 489                     } else {
 490                         latch.countDown();
 491                     }
 492                 }
 493                 public void failed(Throwable exc, Void att) {
 494                 }
 495             });
 496 
 497             // trickle the writing
 498             do {
 499                 int rem = src.remaining();
 500                 int size = (rem <= 100) ? rem : 50 + rand.nextInt(rem - 100);
 501                 ByteBuffer buf = ByteBuffer.allocate(size);
 502                 for (int i=0; i<size; i++)
 503                     buf.put(src.get());
 504                 buf.flip();
 505                 Thread.sleep(50 + rand.nextInt(1500));
 506                 while (buf.hasRemaining())
 507                     sc.write(buf);
 508             } while (src.hasRemaining());
 509 
 510             // wait until ascynrhonous reading has completed
 511             latch.await();
 512 
 513             // check buffers
 514             src.flip();
 515             dst.flip();
 516             if (!src.equals(dst)) {
 517                throw new RuntimeException("Contents differ");
 518             }
 519 
 520             sc.close();
 521             ch.close();
 522         }
 523     }
 524 
 525     // exercise scattering read
 526     static void testRead3() throws Exception {
 527         System.out.println("-- read (3) --");
 528 
 529         try (Server server = new Server()) {
 530             final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 531             ch.connect(server.address()).get();
 532             SocketChannel sc = server.accept();
 533 
 534             ByteBuffer[] dsts = new ByteBuffer[3];
 535             for (int i=0; i<dsts.length; i++) {
 536                 dsts[i] = ByteBuffer.allocateDirect(100);
 537             }
 538 
 539             // scattering read that completes ascynhronously
 540             final CountDownLatch l1 = new CountDownLatch(1);
 541             ch.read(dsts, 0, dsts.length, 0L, TimeUnit.SECONDS, (Void)null,
 542                 new CompletionHandler<Long,Void>() {
 543                     public void completed(Long result, Void att) {
 544                         long n = result;
 545                         if (n <= 0)
 546                             throw new RuntimeException("No bytes read");
 547                         l1.countDown();
 548                     }
 549                     public void failed(Throwable exc, Void att) {
 550                     }
 551             });
 552 
 553             // write some bytes
 554             sc.write(genBuffer());
 555 
 556             // read should now complete
 557             l1.await();
 558 
 559             // write more bytes
 560             sc.write(genBuffer());
 561 
 562             // read should complete immediately
 563             for (int i=0; i<dsts.length; i++) {
 564                 dsts[i].rewind();
 565             }
 566 
 567             final CountDownLatch l2 = new CountDownLatch(1);
 568             ch.read(dsts, 0, dsts.length, 0L, TimeUnit.SECONDS, (Void)null,
 569                 new CompletionHandler<Long,Void>() {
 570                     public void completed(Long result, Void att) {
 571                         long n = result;
 572                         if (n <= 0)
 573                             throw new RuntimeException("No bytes read");
 574                         l2.countDown();
 575                     }
 576                     public void failed(Throwable exc, Void att) {
 577                     }
 578             });
 579             l2.await();
 580 
 581             ch.close();
 582             sc.close();
 583         }
 584     }
 585 
 586     static void testWrite1() throws Exception {
 587         System.out.println("-- write (1) --");
 588 
 589         try (Server server = new Server()) {
 590             final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 591             ch.connect(server.address()).get();
 592             SocketChannel sc = server.accept();
 593 
 594             // write with 0 bytes remaining should complete immediately
 595             ByteBuffer buf = ByteBuffer.allocate(1);
 596             buf.put((byte)0);
 597             int n = ch.write(buf).get();
 598             if (n != 0)
 599                 throw new RuntimeException("0 expected");
 600 
 601             // write all bytes and close connection when done
 602             final ByteBuffer src = genBuffer();
 603             ch.write(src, (Void)null, new CompletionHandler<Integer,Void>() {
 604                 public void completed(Integer result, Void att) {
 605                     if (src.hasRemaining()) {
 606                         ch.write(src, (Void)null, this);
 607                     } else {
 608                         try {
 609                             ch.close();
 610                         } catch (IOException ignore) { }
 611                     }
 612                 }
 613                 public void failed(Throwable exc, Void att) {
 614                 }
 615             });
 616 
 617             // read to EOF or buffer full
 618             ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity() + 100);
 619             do {
 620                 n = sc.read(dst);
 621             } while (n > 0);
 622             sc.close();
 623 
 624             // check buffers
 625             src.flip();
 626             dst.flip();
 627             if (!src.equals(dst)) {
 628                 throw new RuntimeException("Contents differ");
 629             }
 630 
 631             // check write fails with ClosedChannelException
 632             try {
 633                 ch.read(dst).get();
 634                 throw new RuntimeException("ExecutionException expected");
 635             } catch (ExecutionException x) {
 636                 if (!(x.getCause() instanceof ClosedChannelException))
 637                     throw new RuntimeException("Cause of ClosedChannelException expected");
 638             }
 639         }
 640     }
 641 
 642     // exercise gathering write
 643     static void testWrite2() throws Exception {
 644         System.out.println("-- write (2) --");
 645 
 646         try (Server server = new Server()) {
 647             final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 648             ch.connect(server.address()).get();
 649             SocketChannel sc = server.accept();
 650 
 651             // number of bytes written
 652             final AtomicLong bytesWritten = new AtomicLong(0);
 653 
 654             // write buffers (should complete immediately)
 655             ByteBuffer[] srcs = genBuffers(1);
 656             final CountDownLatch l1 = new CountDownLatch(1);
 657             ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, (Void)null,
 658                 new CompletionHandler<Long,Void>() {
 659                     public void completed(Long result, Void att) {
 660                         long n = result;
 661                         if (n <= 0)
 662                             throw new RuntimeException("No bytes read");
 663                         bytesWritten.addAndGet(n);
 664                         l1.countDown();
 665                     }
 666                     public void failed(Throwable exc, Void att) {
 667                     }
 668             });
 669             l1.await();
 670 
 671             // set to true to signal that no more buffers should be written
 672             final AtomicBoolean continueWriting = new AtomicBoolean(true);
 673 
 674             // write until socket buffer is full so as to create the conditions
 675             // for when a write does not complete immediately
 676             srcs = genBuffers(1);
 677             ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, (Void)null,
 678                 new CompletionHandler<Long,Void>() {
 679                     public void completed(Long result, Void att) {
 680                         long n = result;
 681                         if (n <= 0)
 682                             throw new RuntimeException("No bytes written");
 683                         bytesWritten.addAndGet(n);
 684                         if (continueWriting.get()) {
 685                             ByteBuffer[] srcs = genBuffers(8);
 686                             ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS,
 687                                 (Void)null, this);
 688                         }
 689                     }
 690                     public void failed(Throwable exc, Void att) {
 691                     }
 692             });
 693 
 694             // give time for socket buffer to fill up.
 695             Thread.sleep(5*1000);
 696 
 697             // signal handler to stop further writing
 698             continueWriting.set(false);
 699 
 700             // read until done
 701             ByteBuffer buf = ByteBuffer.allocateDirect(4096);
 702             long total = 0L;
 703             do {
 704                 int n = sc.read(buf);
 705                 if (n <= 0)
 706                     throw new RuntimeException("No bytes read");
 707                 buf.rewind();
 708                 total += n;
 709             } while (total < bytesWritten.get());
 710 
 711             ch.close();
 712             sc.close();
 713         }
 714     }
 715 
 716     static void testShutdown() throws Exception {
 717         System.out.println("-- shutdown--");
 718 
 719         try (Server server = new Server();
 720              AsynchronousSocketChannel ch = AsynchronousSocketChannel.open())
 721         {
 722             ch.connect(server.address()).get();
 723             try (SocketChannel peer = server.accept()) {
 724                 ByteBuffer buf = ByteBuffer.allocateDirect(1000);
 725                 int n;
 726 
 727                 // check read
 728                 ch.shutdownInput();
 729                 n = ch.read(buf).get();
 730                 if (n != -1)
 731                     throw new RuntimeException("-1 expected");
 732                 // check full with full buffer
 733                 buf.put(new byte[100]);
 734                 n = ch.read(buf).get();
 735                 if (n != -1)
 736                     throw new RuntimeException("-1 expected");
 737 
 738                 // check write
 739                 ch.shutdownOutput();
 740                 try {
 741                     ch.write(buf).get();
 742                     throw new RuntimeException("ClosedChannelException expected");
 743                 } catch (ExecutionException x) {
 744                     if (!(x.getCause() instanceof ClosedChannelException))
 745                         throw new RuntimeException("ClosedChannelException expected");
 746                 }
 747             }
 748         }
 749     }
 750 
 751     static void testTimeout() throws Exception {
 752         System.out.println("-- timeouts --");
 753         testTimeout(Integer.MIN_VALUE, TimeUnit.SECONDS);
 754         testTimeout(-1L, TimeUnit.SECONDS);
 755         testTimeout(0L, TimeUnit.SECONDS);
 756         testTimeout(2L, TimeUnit.SECONDS);
 757     }
 758 
 759     static void testTimeout(final long timeout, final TimeUnit unit) throws Exception {
 760         try (Server server = new Server()) {
 761             AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 762             ch.connect(server.address()).get();
 763 
 764             ByteBuffer dst = ByteBuffer.allocate(512);
 765 
 766             final AtomicReference<Throwable> readException = new AtomicReference<Throwable>();
 767 
 768             // this read should timeout if value is > 0
 769             ch.read(dst, timeout, unit, null, new CompletionHandler<Integer,Void>() {
 770                 public void completed(Integer result, Void att) {
 771                     readException.set(new RuntimeException("Should not complete"));
 772                 }
 773                 public void failed(Throwable exc, Void att) {
 774                     readException.set(exc);
 775                 }
 776             });
 777             if (timeout > 0L) {
 778                 // wait for exception
 779                 while (readException.get() == null) {
 780                     Thread.sleep(100);
 781                 }
 782                 if (!(readException.get() instanceof InterruptedByTimeoutException))
 783                     throw new RuntimeException("InterruptedByTimeoutException expected");
 784 
 785                 // after a timeout then further reading should throw unspecified runtime exception
 786                 boolean exceptionThrown = false;
 787                 try {
 788                     ch.read(dst);
 789                 } catch (RuntimeException x) {
 790                     exceptionThrown = true;
 791                 }
 792                 if (!exceptionThrown)
 793                     throw new RuntimeException("RuntimeException expected after timeout.");
 794             } else {
 795                 Thread.sleep(1000);
 796                 Throwable exc = readException.get();
 797                 if (exc != null)
 798                     throw new RuntimeException(exc);
 799             }
 800 
 801             final AtomicReference<Throwable> writeException = new AtomicReference<Throwable>();
 802 
 803             // write bytes to fill socket buffer
 804             ch.write(genBuffer(), timeout, unit, ch,
 805                 new CompletionHandler<Integer,AsynchronousSocketChannel>()
 806             {
 807                 public void completed(Integer result, AsynchronousSocketChannel ch) {
 808                     ch.write(genBuffer(), timeout, unit, ch, this);
 809                 }
 810                 public void failed(Throwable exc, AsynchronousSocketChannel ch) {
 811                     writeException.set(exc);
 812                 }
 813             });
 814             if (timeout > 0) {
 815                 // wait for exception
 816                 while (writeException.get() == null) {
 817                     Thread.sleep(100);
 818                 }
 819                 if (!(writeException.get() instanceof InterruptedByTimeoutException))
 820                     throw new RuntimeException("InterruptedByTimeoutException expected");
 821 
 822                 // after a timeout then further writing should throw unspecified runtime exception
 823                 boolean exceptionThrown = false;
 824                 try {
 825                     ch.write(genBuffer());
 826                 } catch (RuntimeException x) {
 827                     exceptionThrown = true;
 828                 }
 829                 if (!exceptionThrown)
 830                     throw new RuntimeException("RuntimeException expected after timeout.");
 831             } else {
 832                 Thread.sleep(1000);
 833                 Throwable exc = writeException.get();
 834                 if (exc != null)
 835                     throw new RuntimeException(exc);
 836             }
 837 
 838             // clean-up
 839             server.accept().close();
 840             ch.close();
 841         }
 842     }
 843 
 844     // returns ByteBuffer with random bytes
 845     static ByteBuffer genBuffer() {
 846         int size = 1024 + rand.nextInt(16000);
 847         byte[] buf = new byte[size];
 848         rand.nextBytes(buf);
 849         boolean useDirect = rand.nextBoolean();
 850         if (useDirect) {
 851             ByteBuffer bb = ByteBuffer.allocateDirect(buf.length);
 852             bb.put(buf);
 853             bb.flip();
 854             return bb;
 855         } else {
 856             return ByteBuffer.wrap(buf);
 857         }
 858     }
 859 
 860     // return ByteBuffer[] with random bytes
 861     static ByteBuffer[] genBuffers(int max) {
 862         int len = 1;
 863         if (max > 1)
 864             len += rand.nextInt(max);
 865         ByteBuffer[] bufs = new ByteBuffer[len];
 866         for (int i=0; i<len; i++)
 867             bufs[i] = genBuffer();
 868         return bufs;
 869     }
 870 
 871     // return random SocketAddress
 872     static SocketAddress genSocketAddress() {
 873         StringBuilder sb = new StringBuilder("10.");
 874         sb.append(rand.nextInt(256));
 875         sb.append('.');
 876         sb.append(rand.nextInt(256));
 877         sb.append('.');
 878         sb.append(rand.nextInt(256));
 879         InetAddress rh;
 880         try {
 881             rh = InetAddress.getByName(sb.toString());
 882         } catch (UnknownHostException x) {
 883             throw new InternalError("Should not happen");
 884         }
 885         return new InetSocketAddress(rh, rand.nextInt(65535)+1);
 886     }
 887 }