1 /*
   2  * Copyright (c) 2008, 2016, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 /* @test
  25  * @bug 4607272 6842687 6878369 6944810 7023403
  26  * @summary Unit test for AsynchronousSocketChannel
  27  * @run main Basic -skipSlowConnectTest
  28  * @key randomness intermittent
  29  */
  30 
  31 import java.nio.ByteBuffer;
  32 import java.nio.channels.*;
  33 import static java.net.StandardSocketOptions.*;
  34 import java.net.*;
  35 import java.util.Random;
  36 import java.util.concurrent.*;
  37 import java.util.concurrent.atomic.*;
  38 import java.io.Closeable;
  39 import java.io.IOException;
  40 import java.util.Set;
  41 
  42 public class Basic {
  43     static final Random rand = new Random();
  44 
  45     static boolean skipSlowConnectTest = false;
  46 
  47     public static void main(String[] args) throws Exception {
  48         for (String arg: args) {
  49             switch (arg) {
  50             case "-skipSlowConnectTest" :
  51                 skipSlowConnectTest = true;
  52                 break;
  53             default:
  54                 throw new RuntimeException("Unrecognized argument: " + arg);
  55             }
  56         }
  57 
  58         testBind();
  59         testSocketOptions();
  60         testConnect();
  61         testCloseWhenPending();
  62         testCancel();
  63         testRead1();
  64         testRead2();
  65         testRead3();
  66         testWrite1();
  67         testWrite2();
  68         // skip timeout tests until 7052549 is fixed
  69         if (!System.getProperty("os.name").startsWith("Windows"))
  70             testTimeout();
  71         testShutdown();
  72     }
  73 
  74     static class Server implements Closeable {
  75         private final ServerSocketChannel ssc;
  76         private final InetSocketAddress address;
  77 
  78         Server() throws IOException {
  79             ssc = ServerSocketChannel.open().bind(new InetSocketAddress(0));
  80 
  81             InetAddress lh = InetAddress.getLocalHost();
  82             int port = ((InetSocketAddress)(ssc.getLocalAddress())).getPort();
  83             address = new InetSocketAddress(lh, port);
  84         }
  85 
  86         InetSocketAddress address() {
  87             return address;
  88         }
  89 
  90         SocketChannel accept() throws IOException {
  91             return ssc.accept();
  92         }
  93 
  94         public void close() throws IOException {
  95             ssc.close();
  96         }
  97 
  98     }
  99 
 100     static void testBind() throws Exception {
 101         System.out.println("-- bind --");
 102 
 103         try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
 104             if (ch.getLocalAddress() != null)
 105                 throw new RuntimeException("Local address should be 'null'");
 106             ch.bind(new InetSocketAddress(0));
 107 
 108             // check local address after binding
 109             InetSocketAddress local = (InetSocketAddress)ch.getLocalAddress();
 110             if (local.getPort() == 0)
 111                 throw new RuntimeException("Unexpected port");
 112             if (!local.getAddress().isAnyLocalAddress())
 113                 throw new RuntimeException("Not bound to a wildcard address");
 114 
 115             // try to re-bind
 116             try {
 117                 ch.bind(new InetSocketAddress(0));
 118                 throw new RuntimeException("AlreadyBoundException expected");
 119             } catch (AlreadyBoundException x) {
 120             }
 121         }
 122 
 123         // check ClosedChannelException
 124         AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 125         ch.close();
 126         try {
 127             ch.bind(new InetSocketAddress(0));
 128             throw new RuntimeException("ClosedChannelException  expected");
 129         } catch (ClosedChannelException  x) {
 130         }
 131     }
 132 
 133     static void testSocketOptions() throws Exception {
 134         System.out.println("-- socket options --");
 135 
 136         try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
 137             ch.setOption(SO_RCVBUF, 128*1024)
 138               .setOption(SO_SNDBUF, 128*1024)
 139               .setOption(SO_REUSEADDR, true);
 140 
 141             // check SO_SNDBUF/SO_RCVBUF limits
 142             int before, after;
 143             before = ch.getOption(SO_SNDBUF);
 144             after = ch.setOption(SO_SNDBUF, Integer.MAX_VALUE).getOption(SO_SNDBUF);
 145             if (after < before)
 146                 throw new RuntimeException("setOption caused SO_SNDBUF to decrease");
 147             before = ch.getOption(SO_RCVBUF);
 148             after = ch.setOption(SO_RCVBUF, Integer.MAX_VALUE).getOption(SO_RCVBUF);
 149             if (after < before)
 150                 throw new RuntimeException("setOption caused SO_RCVBUF to decrease");
 151 
 152             ch.bind(new InetSocketAddress(0));
 153 
 154             // default values
 155             if (ch.getOption(SO_KEEPALIVE))
 156                 throw new RuntimeException("Default of SO_KEEPALIVE should be 'false'");
 157             if (ch.getOption(TCP_NODELAY))
 158                 throw new RuntimeException("Default of TCP_NODELAY should be 'false'");
 159 
 160             // set and check
 161             if (!ch.setOption(SO_KEEPALIVE, true).getOption(SO_KEEPALIVE))
 162                 throw new RuntimeException("SO_KEEPALIVE did not change");
 163             if (!ch.setOption(TCP_NODELAY, true).getOption(TCP_NODELAY))
 164                 throw new RuntimeException("SO_KEEPALIVE did not change");
 165 
 166             // read others (can't check as actual value is implementation dependent)
 167             ch.getOption(SO_RCVBUF);
 168             ch.getOption(SO_SNDBUF);
 169 
 170             Set<SocketOption<?>> options = ch.supportedOptions();
 171             boolean reuseport = options.contains(SO_REUSEPORT);
 172             if (reuseport) {
 173                 if (ch.getOption(SO_REUSEPORT))
 174                     throw new RuntimeException("Default of SO_REUSEPORT should be 'false'");
 175                 if (!ch.setOption(SO_REUSEPORT, true).getOption(SO_REUSEPORT))
 176                     throw new RuntimeException("SO_REUSEPORT did not change");
 177             }
 178         }
 179     }
 180 
 181     static void testConnect() throws Exception {
 182         System.out.println("-- connect --");
 183 
 184         SocketAddress address;
 185 
 186         try (Server server = new Server()) {
 187             address = server.address();
 188 
 189             // connect to server and check local/remote addresses
 190             try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
 191                 ch.connect(address).get();
 192                 // check local address
 193                 if (ch.getLocalAddress() == null)
 194                     throw new RuntimeException("Not bound to local address");
 195 
 196                 // check remote address
 197                 InetSocketAddress remote = (InetSocketAddress)ch.getRemoteAddress();
 198                 if (remote.getPort() != server.address().getPort())
 199                     throw new RuntimeException("Connected to unexpected port");
 200                 if (!remote.getAddress().equals(server.address().getAddress()))
 201                     throw new RuntimeException("Connected to unexpected address");
 202 
 203                 // try to connect again
 204                 try {
 205                     ch.connect(server.address()).get();
 206                     throw new RuntimeException("AlreadyConnectedException expected");
 207                 } catch (AlreadyConnectedException x) {
 208                 }
 209 
 210                 // clean-up
 211                 server.accept().close();
 212             }
 213 
 214             // check that connect fails with ClosedChannelException
 215             AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 216             ch.close();
 217             try {
 218                 ch.connect(server.address()).get();
 219                 throw new RuntimeException("ExecutionException expected");
 220             } catch (ExecutionException x) {
 221                 if (!(x.getCause() instanceof ClosedChannelException))
 222                     throw new RuntimeException("Cause of ClosedChannelException expected",
 223                             x.getCause());
 224             }
 225             final AtomicReference<Throwable> connectException = new AtomicReference<>();
 226             ch.connect(server.address(), (Void)null, new CompletionHandler<Void,Void>() {
 227                 public void completed(Void result, Void att) {
 228                 }
 229                 public void failed(Throwable exc, Void att) {
 230                     connectException.set(exc);
 231                 }
 232             });
 233             while (connectException.get() == null) {
 234                 Thread.sleep(100);
 235             }
 236             if (!(connectException.get() instanceof ClosedChannelException))
 237                 throw new RuntimeException("ClosedChannelException expected",
 238                         connectException.get());
 239         }
 240 
 241         // test that failure to connect closes the channel
 242         try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
 243             try {
 244                 ch.connect(address).get();
 245             } catch (ExecutionException x) {
 246                 // failed to establish connection
 247                 if (ch.isOpen())
 248                     throw new RuntimeException("Channel should be closed");
 249             }
 250         }
 251 
 252         // repeat test by connecting to a (probably) non-existent host. This
 253         // improves the chance that the connect will not fail immediately.
 254         if (!skipSlowConnectTest) {
 255             try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
 256                 try {
 257                     ch.connect(genSocketAddress()).get();
 258                 } catch (ExecutionException x) {
 259                     // failed to establish connection
 260                     if (ch.isOpen())
 261                         throw new RuntimeException("Channel should be closed");
 262                 }
 263             }
 264         }
 265     }
 266 
 267     static void testCloseWhenPending() throws Exception {
 268         System.out.println("-- asynchronous close when connecting --");
 269 
 270         AsynchronousSocketChannel ch;
 271 
 272         // asynchronous close while connecting
 273         ch = AsynchronousSocketChannel.open();
 274         Future<Void> connectResult = ch.connect(genSocketAddress());
 275 
 276         // give time to initiate the connect (SYN)
 277         Thread.sleep(50);
 278 
 279         // close
 280         ch.close();
 281 
 282         // check that exception is thrown in timely manner
 283         try {
 284             connectResult.get(5, TimeUnit.SECONDS);
 285         } catch (TimeoutException x) {
 286             throw new RuntimeException("AsynchronousCloseException not thrown");
 287         } catch (ExecutionException x) {
 288             // expected
 289         }
 290 
 291         System.out.println("-- asynchronous close when reading --");
 292 
 293         try (Server server = new Server()) {
 294             ch = AsynchronousSocketChannel.open();
 295             ch.connect(server.address()).get();
 296 
 297             ByteBuffer dst = ByteBuffer.allocateDirect(100);
 298             Future<Integer> result = ch.read(dst);
 299 
 300             // attempt a second read - should fail with ReadPendingException
 301             ByteBuffer buf = ByteBuffer.allocateDirect(100);
 302             try {
 303                 ch.read(buf);
 304                 throw new RuntimeException("ReadPendingException expected");
 305             } catch (ReadPendingException x) {
 306             }
 307 
 308             // close channel (should cause initial read to complete)
 309             ch.close();
 310             server.accept().close();
 311 
 312             // check that AsynchronousCloseException is thrown
 313             try {
 314                 result.get();
 315                 throw new RuntimeException("Should not read");
 316             } catch (ExecutionException x) {
 317                 if (!(x.getCause() instanceof AsynchronousCloseException))
 318                     throw new RuntimeException(x);
 319             }
 320 
 321             System.out.println("-- asynchronous close when writing --");
 322 
 323             ch = AsynchronousSocketChannel.open();
 324             ch.connect(server.address()).get();
 325 
 326             final AtomicReference<Throwable> writeException =
 327                 new AtomicReference<Throwable>();
 328 
 329             // write bytes to fill socket buffer
 330             ch.write(genBuffer(), ch, new CompletionHandler<Integer,AsynchronousSocketChannel>() {
 331                 public void completed(Integer result, AsynchronousSocketChannel ch) {
 332                     ch.write(genBuffer(), ch, this);
 333                 }
 334                 public void failed(Throwable x, AsynchronousSocketChannel ch) {
 335                     writeException.set(x);
 336                 }
 337             });
 338 
 339             // give time for socket buffer to fill up.
 340             Thread.sleep(5*1000);
 341 
 342             //  attempt a concurrent write - should fail with WritePendingException
 343             try {
 344                 ch.write(genBuffer());
 345                 throw new RuntimeException("WritePendingException expected");
 346             } catch (WritePendingException x) {
 347             }
 348 
 349             // close channel - should cause initial write to complete
 350             ch.close();
 351             server.accept().close();
 352 
 353             // wait for exception
 354             while (writeException.get() == null) {
 355                 Thread.sleep(100);
 356             }
 357             if (!(writeException.get() instanceof AsynchronousCloseException))
 358                 throw new RuntimeException("AsynchronousCloseException expected",
 359                         writeException.get());
 360         }
 361     }
 362 
 363     static void testCancel() throws Exception {
 364         System.out.println("-- cancel --");
 365 
 366         try (Server server = new Server()) {
 367             for (int i=0; i<2; i++) {
 368                 boolean mayInterruptIfRunning = (i == 0) ? false : true;
 369 
 370                 // establish loopback connection
 371                 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 372                 ch.connect(server.address()).get();
 373                 SocketChannel peer = server.accept();
 374 
 375                 // start read operation
 376                 ByteBuffer buf = ByteBuffer.allocate(1);
 377                 Future<Integer> res = ch.read(buf);
 378 
 379                 // cancel operation
 380                 boolean cancelled = res.cancel(mayInterruptIfRunning);
 381 
 382                 // check post-conditions
 383                 if (!res.isDone())
 384                     throw new RuntimeException("isDone should return true");
 385                 if (res.isCancelled() != cancelled)
 386                     throw new RuntimeException("isCancelled not consistent");
 387                 try {
 388                     res.get();
 389                     throw new RuntimeException("CancellationException expected");
 390                 } catch (CancellationException x) {
 391                 }
 392                 try {
 393                     res.get(1, TimeUnit.SECONDS);
 394                     throw new RuntimeException("CancellationException expected");
 395                 } catch (CancellationException x) {
 396                 }
 397 
 398                 // check that the cancel doesn't impact writing to the channel
 399                 if (!mayInterruptIfRunning) {
 400                     buf = ByteBuffer.wrap("a".getBytes());
 401                     ch.write(buf).get();
 402                 }
 403 
 404                 ch.close();
 405                 peer.close();
 406             }
 407         }
 408     }
 409 
 410     static void testRead1() throws Exception {
 411         System.out.println("-- read (1) --");
 412 
 413         try (Server server = new Server()) {
 414             final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 415             ch.connect(server.address()).get();
 416 
 417             // read with 0 bytes remaining should complete immediately
 418             ByteBuffer buf = ByteBuffer.allocate(1);
 419             buf.put((byte)0);
 420             int n = ch.read(buf).get();
 421             if (n != 0)
 422                 throw new RuntimeException("0 expected");
 423 
 424             // write bytes and close connection
 425             ByteBuffer src = genBuffer();
 426             try (SocketChannel sc = server.accept()) {
 427                 sc.setOption(SO_SNDBUF, src.remaining());
 428                 while (src.hasRemaining())
 429                     sc.write(src);
 430             }
 431 
 432             // reads should complete immediately
 433             final ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity() + 100);
 434             final CountDownLatch latch = new CountDownLatch(1);
 435             ch.read(dst, (Void)null, new CompletionHandler<Integer,Void>() {
 436                 public void completed(Integer result, Void att) {
 437                     int n = result;
 438                     if (n > 0) {
 439                         ch.read(dst, (Void)null, this);
 440                     } else {
 441                         latch.countDown();
 442                     }
 443                 }
 444                 public void failed(Throwable exc, Void att) {
 445                 }
 446             });
 447 
 448             latch.await();
 449 
 450             // check buffers
 451             src.flip();
 452             dst.flip();
 453             if (!src.equals(dst)) {
 454                 throw new RuntimeException("Contents differ");
 455             }
 456 
 457             // close channel
 458             ch.close();
 459 
 460             // check read fails with ClosedChannelException
 461             try {
 462                 ch.read(dst).get();
 463                 throw new RuntimeException("ExecutionException expected");
 464             } catch (ExecutionException x) {
 465                 if (!(x.getCause() instanceof ClosedChannelException))
 466                     throw new RuntimeException("Cause of ClosedChannelException expected",
 467                             x.getCause());
 468             }
 469         }
 470     }
 471 
 472     static void testRead2() throws Exception {
 473         System.out.println("-- read (2) --");
 474 
 475         try (Server server = new Server()) {
 476             final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 477             ch.connect(server.address()).get();
 478             SocketChannel sc = server.accept();
 479 
 480             ByteBuffer src = genBuffer();
 481 
 482             // read until the buffer is full
 483             final ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity());
 484             final CountDownLatch latch = new CountDownLatch(1);
 485             ch.read(dst, (Void)null, new CompletionHandler<Integer,Void>() {
 486                 public void completed(Integer result, Void att) {
 487                     if (dst.hasRemaining()) {
 488                         ch.read(dst, (Void)null, this);
 489                     } else {
 490                         latch.countDown();
 491                     }
 492                 }
 493                 public void failed(Throwable exc, Void att) {
 494                 }
 495             });
 496 
 497             // trickle the writing
 498             do {
 499                 int rem = src.remaining();
 500                 int size = (rem <= 100) ? rem : 50 + rand.nextInt(rem - 100);
 501                 ByteBuffer buf = ByteBuffer.allocate(size);
 502                 for (int i=0; i<size; i++)
 503                     buf.put(src.get());
 504                 buf.flip();
 505                 Thread.sleep(50 + rand.nextInt(1500));
 506                 while (buf.hasRemaining())
 507                     sc.write(buf);
 508             } while (src.hasRemaining());
 509 
 510             // wait until ascynrhonous reading has completed
 511             latch.await();
 512 
 513             // check buffers
 514             src.flip();
 515             dst.flip();
 516             if (!src.equals(dst)) {
 517                throw new RuntimeException("Contents differ");
 518             }
 519 
 520             sc.close();
 521             ch.close();
 522         }
 523     }
 524 
 525     // exercise scattering read
 526     static void testRead3() throws Exception {
 527         System.out.println("-- read (3) --");
 528 
 529         try (Server server = new Server()) {
 530             final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 531             ch.connect(server.address()).get();
 532             SocketChannel sc = server.accept();
 533 
 534             ByteBuffer[] dsts = new ByteBuffer[3];
 535             for (int i=0; i<dsts.length; i++) {
 536                 dsts[i] = ByteBuffer.allocateDirect(100);
 537             }
 538 
 539             // scattering read that completes ascynhronously
 540             final CountDownLatch l1 = new CountDownLatch(1);
 541             ch.read(dsts, 0, dsts.length, 0L, TimeUnit.SECONDS, (Void)null,
 542                 new CompletionHandler<Long,Void>() {
 543                     public void completed(Long result, Void att) {
 544                         long n = result;
 545                         if (n <= 0)
 546                             throw new RuntimeException("No bytes read");
 547                         l1.countDown();
 548                     }
 549                     public void failed(Throwable exc, Void att) {
 550                     }
 551             });
 552 
 553             // write some bytes
 554             sc.write(genBuffer());
 555 
 556             // read should now complete
 557             l1.await();
 558 
 559             // write more bytes
 560             sc.write(genBuffer());
 561 
 562             // read should complete immediately
 563             for (int i=0; i<dsts.length; i++) {
 564                 dsts[i].rewind();
 565             }
 566 
 567             final CountDownLatch l2 = new CountDownLatch(1);
 568             ch.read(dsts, 0, dsts.length, 0L, TimeUnit.SECONDS, (Void)null,
 569                 new CompletionHandler<Long,Void>() {
 570                     public void completed(Long result, Void att) {
 571                         long n = result;
 572                         if (n <= 0)
 573                             throw new RuntimeException("No bytes read");
 574                         l2.countDown();
 575                     }
 576                     public void failed(Throwable exc, Void att) {
 577                     }
 578             });
 579             l2.await();
 580 
 581             ch.close();
 582             sc.close();
 583         }
 584     }
 585 
 586     static void testWrite1() throws Exception {
 587         System.out.println("-- write (1) --");
 588 
 589         try (Server server = new Server()) {
 590             final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 591             ch.connect(server.address()).get();
 592             SocketChannel sc = server.accept();
 593 
 594             // write with 0 bytes remaining should complete immediately
 595             ByteBuffer buf = ByteBuffer.allocate(1);
 596             buf.put((byte)0);
 597             int n = ch.write(buf).get();
 598             if (n != 0)
 599                 throw new RuntimeException("0 expected");
 600 
 601             // write all bytes and close connection when done
 602             final ByteBuffer src = genBuffer();
 603             ch.write(src, (Void)null, new CompletionHandler<Integer,Void>() {
 604                 public void completed(Integer result, Void att) {
 605                     if (src.hasRemaining()) {
 606                         ch.write(src, (Void)null, this);
 607                     } else {
 608                         try {
 609                             ch.close();
 610                         } catch (IOException ignore) { }
 611                     }
 612                 }
 613                 public void failed(Throwable exc, Void att) {
 614                 }
 615             });
 616 
 617             // read to EOF or buffer full
 618             ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity() + 100);
 619             do {
 620                 n = sc.read(dst);
 621             } while (n > 0);
 622             sc.close();
 623 
 624             // check buffers
 625             src.flip();
 626             dst.flip();
 627             if (!src.equals(dst)) {
 628                 throw new RuntimeException("Contents differ");
 629             }
 630 
 631             // check write fails with ClosedChannelException
 632             try {
 633                 ch.read(dst).get();
 634                 throw new RuntimeException("ExecutionException expected");
 635             } catch (ExecutionException x) {
 636                 if (!(x.getCause() instanceof ClosedChannelException))
 637                     throw new RuntimeException("Cause of ClosedChannelException expected",
 638                             x.getCause());
 639             }
 640         }
 641     }
 642 
 643     // exercise gathering write
 644     static void testWrite2() throws Exception {
 645         System.out.println("-- write (2) --");
 646 
 647         try (Server server = new Server()) {
 648             final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 649             ch.connect(server.address()).get();
 650             SocketChannel sc = server.accept();
 651 
 652             // number of bytes written
 653             final AtomicLong bytesWritten = new AtomicLong(0);
 654 
 655             // write buffers (should complete immediately)
 656             ByteBuffer[] srcs = genBuffers(1);
 657             final CountDownLatch l1 = new CountDownLatch(1);
 658             ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, (Void)null,
 659                 new CompletionHandler<Long,Void>() {
 660                     public void completed(Long result, Void att) {
 661                         long n = result;
 662                         if (n <= 0)
 663                             throw new RuntimeException("No bytes read");
 664                         bytesWritten.addAndGet(n);
 665                         l1.countDown();
 666                     }
 667                     public void failed(Throwable exc, Void att) {
 668                     }
 669             });
 670             l1.await();
 671 
 672             // set to true to signal that no more buffers should be written
 673             final AtomicBoolean continueWriting = new AtomicBoolean(true);
 674 
 675             // write until socket buffer is full so as to create the conditions
 676             // for when a write does not complete immediately
 677             srcs = genBuffers(1);
 678             ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, (Void)null,
 679                 new CompletionHandler<Long,Void>() {
 680                     public void completed(Long result, Void att) {
 681                         long n = result;
 682                         if (n <= 0)
 683                             throw new RuntimeException("No bytes written");
 684                         bytesWritten.addAndGet(n);
 685                         if (continueWriting.get()) {
 686                             ByteBuffer[] srcs = genBuffers(8);
 687                             ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS,
 688                                 (Void)null, this);
 689                         }
 690                     }
 691                     public void failed(Throwable exc, Void att) {
 692                     }
 693             });
 694 
 695             // give time for socket buffer to fill up.
 696             Thread.sleep(5*1000);
 697 
 698             // signal handler to stop further writing
 699             continueWriting.set(false);
 700 
 701             // read until done
 702             ByteBuffer buf = ByteBuffer.allocateDirect(4096);
 703             long total = 0L;
 704             do {
 705                 int n = sc.read(buf);
 706                 if (n <= 0)
 707                     throw new RuntimeException("No bytes read");
 708                 buf.rewind();
 709                 total += n;
 710             } while (total < bytesWritten.get());
 711 
 712             ch.close();
 713             sc.close();
 714         }
 715     }
 716 
 717     static void testShutdown() throws Exception {
 718         System.out.println("-- shutdown--");
 719 
 720         try (Server server = new Server();
 721              AsynchronousSocketChannel ch = AsynchronousSocketChannel.open())
 722         {
 723             ch.connect(server.address()).get();
 724             try (SocketChannel peer = server.accept()) {
 725                 ByteBuffer buf = ByteBuffer.allocateDirect(1000);
 726                 int n;
 727 
 728                 // check read
 729                 ch.shutdownInput();
 730                 n = ch.read(buf).get();
 731                 if (n != -1)
 732                     throw new RuntimeException("-1 expected");
 733                 // check full with full buffer
 734                 buf.put(new byte[100]);
 735                 n = ch.read(buf).get();
 736                 if (n != -1)
 737                     throw new RuntimeException("-1 expected");
 738 
 739                 // check write
 740                 ch.shutdownOutput();
 741                 try {
 742                     ch.write(buf).get();
 743                     throw new RuntimeException("ClosedChannelException expected");
 744                 } catch (ExecutionException x) {
 745                     if (!(x.getCause() instanceof ClosedChannelException))
 746                         throw new RuntimeException("ClosedChannelException expected",
 747                                 x.getCause());
 748                 }
 749             }
 750         }
 751     }
 752 
 753     static void testTimeout() throws Exception {
 754         System.out.println("-- timeouts --");
 755         testTimeout(Integer.MIN_VALUE, TimeUnit.SECONDS);
 756         testTimeout(-1L, TimeUnit.SECONDS);
 757         testTimeout(0L, TimeUnit.SECONDS);
 758         testTimeout(2L, TimeUnit.SECONDS);
 759     }
 760 
 761     static void testTimeout(final long timeout, final TimeUnit unit) throws Exception {
 762         try (Server server = new Server()) {
 763             AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 764             ch.connect(server.address()).get();
 765 
 766             ByteBuffer dst = ByteBuffer.allocate(512);
 767 
 768             final AtomicReference<Throwable> readException = new AtomicReference<Throwable>();
 769 
 770             // this read should timeout if value is > 0
 771             ch.read(dst, timeout, unit, null, new CompletionHandler<Integer,Void>() {
 772                 public void completed(Integer result, Void att) {
 773                     readException.set(new RuntimeException("Should not complete"));
 774                 }
 775                 public void failed(Throwable exc, Void att) {
 776                     readException.set(exc);
 777                 }
 778             });
 779             if (timeout > 0L) {
 780                 // wait for exception
 781                 while (readException.get() == null) {
 782                     Thread.sleep(100);
 783                 }
 784                 if (!(readException.get() instanceof InterruptedByTimeoutException))
 785                     throw new RuntimeException("InterruptedByTimeoutException expected",
 786                             readException.get());
 787 
 788                 // after a timeout then further reading should throw unspecified runtime exception
 789                 boolean exceptionThrown = false;
 790                 try {
 791                     ch.read(dst);
 792                 } catch (RuntimeException x) {
 793                     exceptionThrown = true;
 794                 }
 795                 if (!exceptionThrown)
 796                     throw new RuntimeException("RuntimeException expected after timeout.");
 797             } else {
 798                 Thread.sleep(1000);
 799                 Throwable exc = readException.get();
 800                 if (exc != null)
 801                     throw new RuntimeException(exc);
 802             }
 803 
 804             final AtomicReference<Throwable> writeException = new AtomicReference<Throwable>();
 805 
 806             // write bytes to fill socket buffer
 807             ch.write(genBuffer(), timeout, unit, ch,
 808                 new CompletionHandler<Integer,AsynchronousSocketChannel>()
 809             {
 810                 public void completed(Integer result, AsynchronousSocketChannel ch) {
 811                     ch.write(genBuffer(), timeout, unit, ch, this);
 812                 }
 813                 public void failed(Throwable exc, AsynchronousSocketChannel ch) {
 814                     writeException.set(exc);
 815                 }
 816             });
 817             if (timeout > 0) {
 818                 // wait for exception
 819                 while (writeException.get() == null) {
 820                     Thread.sleep(100);
 821                 }
 822                 if (!(writeException.get() instanceof InterruptedByTimeoutException))
 823                     throw new RuntimeException("InterruptedByTimeoutException expected",
 824                             writeException.get());
 825 
 826                 // after a timeout then further writing should throw unspecified runtime exception
 827                 boolean exceptionThrown = false;
 828                 try {
 829                     ch.write(genBuffer());
 830                 } catch (RuntimeException x) {
 831                     exceptionThrown = true;
 832                 }
 833                 if (!exceptionThrown)
 834                     throw new RuntimeException("RuntimeException expected after timeout.");
 835             } else {
 836                 Thread.sleep(1000);
 837                 Throwable exc = writeException.get();
 838                 if (exc != null)
 839                     throw new RuntimeException(exc);
 840             }
 841 
 842             // clean-up
 843             server.accept().close();
 844             ch.close();
 845         }
 846     }
 847 
 848     // returns ByteBuffer with random bytes
 849     static ByteBuffer genBuffer() {
 850         int size = 1024 + rand.nextInt(16000);
 851         byte[] buf = new byte[size];
 852         rand.nextBytes(buf);
 853         boolean useDirect = rand.nextBoolean();
 854         if (useDirect) {
 855             ByteBuffer bb = ByteBuffer.allocateDirect(buf.length);
 856             bb.put(buf);
 857             bb.flip();
 858             return bb;
 859         } else {
 860             return ByteBuffer.wrap(buf);
 861         }
 862     }
 863 
 864     // return ByteBuffer[] with random bytes
 865     static ByteBuffer[] genBuffers(int max) {
 866         int len = 1;
 867         if (max > 1)
 868             len += rand.nextInt(max);
 869         ByteBuffer[] bufs = new ByteBuffer[len];
 870         for (int i=0; i<len; i++)
 871             bufs[i] = genBuffer();
 872         return bufs;
 873     }
 874 
 875     // return random SocketAddress
 876     static SocketAddress genSocketAddress() {
 877         StringBuilder sb = new StringBuilder("10.");
 878         sb.append(rand.nextInt(256));
 879         sb.append('.');
 880         sb.append(rand.nextInt(256));
 881         sb.append('.');
 882         sb.append(rand.nextInt(256));
 883         InetAddress rh;
 884         try {
 885             rh = InetAddress.getByName(sb.toString());
 886         } catch (UnknownHostException x) {
 887             throw new InternalError("Should not happen");
 888         }
 889         return new InetSocketAddress(rh, rand.nextInt(65535)+1);
 890     }
 891 }