1 /*
   2  * Copyright (c) 2008, 2017, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 /* @test
  25  * @bug 4607272 6842687 6878369 6944810 7023403
  26  * @summary Unit test for AsynchronousSocketChannel
  27  * @run main Basic -skipSlowConnectTest
  28  * @key randomness intermittent
  29  */
  30 
  31 import java.io.Closeable;
  32 import java.io.IOException;
  33 import java.net.*;
  34 import static java.net.StandardSocketOptions.*;
  35 import java.nio.ByteBuffer;
  36 import java.nio.channels.*;
  37 import java.util.Random;
  38 import java.util.Set;
  39 import java.util.concurrent.*;
  40 import java.util.concurrent.atomic.*;
  41 
  42 public class Basic {
  43     static final Random rand = new Random();
  44 
  45     static boolean skipSlowConnectTest = false;
  46 
  47     public static void main(String[] args) throws Exception {
  48         for (String arg: args) {
  49             switch (arg) {
  50             case "-skipSlowConnectTest" :
  51                 skipSlowConnectTest = true;
  52                 break;
  53             default:
  54                 throw new RuntimeException("Unrecognized argument: " + arg);
  55             }
  56         }
  57 
  58         testBind();
  59         testSocketOptions();
  60         testConnect();
  61         testCloseWhenPending();
  62         testCancel();
  63         testRead1();
  64         testRead2();
  65         testRead3();
  66         testWrite1();
  67         testWrite2();
  68         // skip timeout tests until 7052549 is fixed
  69         if (!System.getProperty("os.name").startsWith("Windows"))
  70             testTimeout();
  71         testShutdown();
  72     }
  73 
  74     static class Server implements Closeable {
  75         private final ServerSocketChannel ssc;
  76         private final InetSocketAddress address;
  77 
  78         Server() throws IOException {
  79             ssc = ServerSocketChannel.open().bind(new InetSocketAddress(0));
  80 
  81             InetAddress lh = InetAddress.getLocalHost();
  82             int port = ((InetSocketAddress)(ssc.getLocalAddress())).getPort();
  83             address = new InetSocketAddress(lh, port);
  84         }
  85 
  86         InetSocketAddress address() {
  87             return address;
  88         }
  89 
  90         SocketChannel accept() throws IOException {
  91             return ssc.accept();
  92         }
  93 
  94         public void close() throws IOException {
  95             ssc.close();
  96         }
  97 
  98     }
  99 
 100     static void testBind() throws Exception {
 101         System.out.println("-- bind --");
 102 
 103         try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
 104             if (ch.getLocalAddress() != null)
 105                 throw new RuntimeException("Local address should be 'null'");
 106             ch.bind(new InetSocketAddress(0));
 107 
 108             // check local address after binding
 109             InetSocketAddress local = (InetSocketAddress)ch.getLocalAddress();
 110             if (local.getPort() == 0)
 111                 throw new RuntimeException("Unexpected port");
 112             if (!local.getAddress().isAnyLocalAddress())
 113                 throw new RuntimeException("Not bound to a wildcard address");
 114 
 115             // try to re-bind
 116             try {
 117                 ch.bind(new InetSocketAddress(0));
 118                 throw new RuntimeException("AlreadyBoundException expected");
 119             } catch (AlreadyBoundException x) {
 120             }
 121         }
 122 
 123         // check ClosedChannelException
 124         AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 125         ch.close();
 126         try {
 127             ch.bind(new InetSocketAddress(0));
 128             throw new RuntimeException("ClosedChannelException  expected");
 129         } catch (ClosedChannelException  x) {
 130         }
 131     }
 132 
 133     static void testSocketOptions() throws Exception {
 134         System.out.println("-- socket options --");
 135 
 136         try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
 137             ch.setOption(SO_RCVBUF, 128*1024)
 138               .setOption(SO_SNDBUF, 128*1024)
 139               .setOption(SO_REUSEADDR, true);
 140 
 141             // check SO_SNDBUF/SO_RCVBUF limits
 142             int before, after;
 143             before = ch.getOption(SO_SNDBUF);
 144             after = ch.setOption(SO_SNDBUF, Integer.MAX_VALUE).getOption(SO_SNDBUF);
 145             if (after < before)
 146                 throw new RuntimeException("setOption caused SO_SNDBUF to decrease");
 147             before = ch.getOption(SO_RCVBUF);
 148             after = ch.setOption(SO_RCVBUF, Integer.MAX_VALUE).getOption(SO_RCVBUF);
 149             if (after < before)
 150                 throw new RuntimeException("setOption caused SO_RCVBUF to decrease");
 151 
 152             ch.bind(new InetSocketAddress(0));
 153 
 154             // default values
 155             if (ch.getOption(SO_KEEPALIVE))
 156                 throw new RuntimeException("Default of SO_KEEPALIVE should be 'false'");
 157             if (ch.getOption(TCP_NODELAY))
 158                 throw new RuntimeException("Default of TCP_NODELAY should be 'false'");
 159 
 160             // set and check
 161             if (!ch.setOption(SO_KEEPALIVE, true).getOption(SO_KEEPALIVE))
 162                 throw new RuntimeException("SO_KEEPALIVE did not change");
 163             if (!ch.setOption(TCP_NODELAY, true).getOption(TCP_NODELAY))
 164                 throw new RuntimeException("SO_KEEPALIVE did not change");
 165 
 166             // read others (can't check as actual value is implementation dependent)
 167             ch.getOption(SO_RCVBUF);
 168             ch.getOption(SO_SNDBUF);
 169 
 170             Set<SocketOption<?>> options = ch.supportedOptions();
 171             boolean reuseport = options.contains(SO_REUSEPORT);
 172             if (reuseport) {
 173                 if (ch.getOption(SO_REUSEPORT))
 174                     throw new RuntimeException("Default of SO_REUSEPORT should be 'false'");
 175                 if (!ch.setOption(SO_REUSEPORT, true).getOption(SO_REUSEPORT))
 176                     throw new RuntimeException("SO_REUSEPORT did not change");
 177             }
 178         }
 179     }
 180 
 181     static void testConnect() throws Exception {
 182         System.out.println("-- connect --");
 183 
 184         SocketAddress address;
 185 
 186         try (Server server = new Server()) {
 187             address = server.address();
 188 
 189             // connect to server and check local/remote addresses
 190             try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
 191                 ch.connect(address).get();
 192                 // check local address
 193                 if (ch.getLocalAddress() == null)
 194                     throw new RuntimeException("Not bound to local address");
 195 
 196                 // check remote address
 197                 InetSocketAddress remote = (InetSocketAddress)ch.getRemoteAddress();
 198                 if (remote.getPort() != server.address().getPort())
 199                     throw new RuntimeException("Connected to unexpected port");
 200                 if (!remote.getAddress().equals(server.address().getAddress()))
 201                     throw new RuntimeException("Connected to unexpected address");
 202 
 203                 // try to connect again
 204                 try {
 205                     ch.connect(server.address()).get();
 206                     throw new RuntimeException("AlreadyConnectedException expected");
 207                 } catch (AlreadyConnectedException x) {
 208                 }
 209 
 210                 // clean-up
 211                 server.accept().close();
 212             }
 213 
 214             // check that connect fails with ClosedChannelException
 215             AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 216             ch.close();
 217             try {
 218                 ch.connect(server.address()).get();
 219                 throw new RuntimeException("ExecutionException expected");
 220             } catch (ExecutionException x) {
 221                 if (!(x.getCause() instanceof ClosedChannelException))
 222                     throw new RuntimeException("Cause of ClosedChannelException expected",
 223                             x.getCause());
 224             }
 225             final AtomicReference<Throwable> connectException = new AtomicReference<>();
 226             ch.connect(server.address(), (Void)null, new CompletionHandler<Void,Void>() {
 227                 public void completed(Void result, Void att) {
 228                 }
 229                 public void failed(Throwable exc, Void att) {
 230                     connectException.set(exc);
 231                 }
 232             });
 233             while (connectException.get() == null) {
 234                 Thread.sleep(100);
 235             }
 236             if (!(connectException.get() instanceof ClosedChannelException))
 237                 throw new RuntimeException("ClosedChannelException expected",
 238                         connectException.get());
 239         }
 240 
 241         // test that failure to connect closes the channel
 242         try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
 243             try {
 244                 ch.connect(address).get();
 245             } catch (ExecutionException x) {
 246                 // failed to establish connection
 247                 if (ch.isOpen())
 248                     throw new RuntimeException("Channel should be closed");
 249             }
 250         }
 251 
 252         // repeat test by connecting to a (probably) non-existent host. This
 253         // improves the chance that the connect will not fail immediately.
 254         if (!skipSlowConnectTest) {
 255             try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
 256                 try {
 257                     ch.connect(genSocketAddress()).get();
 258                 } catch (ExecutionException x) {
 259                     // failed to establish connection
 260                     if (ch.isOpen())
 261                         throw new RuntimeException("Channel should be closed");
 262                 }
 263             }
 264         }
 265     }
 266 
 267     static void testCloseWhenPending() throws Exception {
 268         System.out.println("-- asynchronous close when connecting --");
 269 
 270         AsynchronousSocketChannel ch;
 271 
 272         // asynchronous close while connecting
 273         ch = AsynchronousSocketChannel.open();
 274         Future<Void> connectResult = ch.connect(genSocketAddress());
 275 
 276         // give time to initiate the connect (SYN)
 277         Thread.sleep(50);
 278 
 279         // close
 280         ch.close();
 281 
 282         // check that exception is thrown in timely manner
 283         try {
 284             connectResult.get(5, TimeUnit.SECONDS);
 285         } catch (TimeoutException x) {
 286             throw new RuntimeException("AsynchronousCloseException not thrown");
 287         } catch (ExecutionException x) {
 288             // expected
 289         }
 290 
 291         System.out.println("-- asynchronous close when reading --");
 292 
 293         try (Server server = new Server()) {
 294             ch = AsynchronousSocketChannel.open();
 295             ch.connect(server.address()).get();
 296 
 297             ByteBuffer dst = ByteBuffer.allocateDirect(100);
 298             Future<Integer> result = ch.read(dst);
 299 
 300             // attempt a second read - should fail with ReadPendingException
 301             ByteBuffer buf = ByteBuffer.allocateDirect(100);
 302             try {
 303                 ch.read(buf);
 304                 throw new RuntimeException("ReadPendingException expected");
 305             } catch (ReadPendingException x) {
 306             }
 307 
 308             // close channel (should cause initial read to complete)
 309             ch.close();
 310             server.accept().close();
 311 
 312             // check that AsynchronousCloseException is thrown
 313             try {
 314                 result.get();
 315                 throw new RuntimeException("Should not read");
 316             } catch (ExecutionException x) {
 317                 if (!(x.getCause() instanceof AsynchronousCloseException))
 318                     throw new RuntimeException(x);
 319             }
 320 
 321             System.out.println("-- asynchronous close when writing --");
 322 
 323             ch = AsynchronousSocketChannel.open();
 324             ch.connect(server.address()).get();
 325 
 326             final AtomicReference<Throwable> writeException =
 327                 new AtomicReference<Throwable>();
 328 
 329             // write bytes to fill socket buffer
 330             final AtomicLong completedTime = new AtomicLong();
 331             ch.write(genBuffer(), ch, new CompletionHandler<Integer,AsynchronousSocketChannel>() {
 332                 public void completed(Integer result, AsynchronousSocketChannel ch) {
 333                     completedTime.set(System.nanoTime());
 334                     ch.write(genBuffer(), ch, this);
 335                 }
 336                 public void failed(Throwable x, AsynchronousSocketChannel ch) {
 337                     writeException.set(x);
 338                 }
 339             });
 340 
 341             // give time for socket buffer to fill up -
 342             // take pauses until the handler is no longer being invoked
 343             // because all writes are being pended which guarantees that
 344             // the internal channel state indicates it is writing
 345             long t0 = System.nanoTime();
 346             long previousCompletedTime = completedTime.get();
 347             do {
 348                 Thread.sleep(1000);
 349                 if (completedTime.get() == previousCompletedTime) {
 350                     break;
 351                 }
 352                 previousCompletedTime = completedTime.get();
 353             } while (true);
 354 
 355             // attempt a concurrent write - should fail with WritePendingException
 356             try {
 357                 ch.write(genBuffer());
 358                 throw new RuntimeException("WritePendingException expected");
 359             } catch (WritePendingException x) {
 360             }
 361 
 362             // close channel - should cause initial write to complete
 363             ch.close();
 364             server.accept().close();
 365 
 366             // wait for exception
 367             while (writeException.get() == null) {
 368                 Thread.sleep(100);
 369             }
 370             if (!(writeException.get() instanceof AsynchronousCloseException))
 371                 throw new RuntimeException("AsynchronousCloseException expected",
 372                         writeException.get());
 373         }
 374     }
 375 
 376     static void testCancel() throws Exception {
 377         System.out.println("-- cancel --");
 378 
 379         try (Server server = new Server()) {
 380             for (int i=0; i<2; i++) {
 381                 boolean mayInterruptIfRunning = (i == 0) ? false : true;
 382 
 383                 // establish loopback connection
 384                 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 385                 ch.connect(server.address()).get();
 386                 SocketChannel peer = server.accept();
 387 
 388                 // start read operation
 389                 ByteBuffer buf = ByteBuffer.allocate(1);
 390                 Future<Integer> res = ch.read(buf);
 391 
 392                 // cancel operation
 393                 boolean cancelled = res.cancel(mayInterruptIfRunning);
 394 
 395                 // check post-conditions
 396                 if (!res.isDone())
 397                     throw new RuntimeException("isDone should return true");
 398                 if (res.isCancelled() != cancelled)
 399                     throw new RuntimeException("isCancelled not consistent");
 400                 try {
 401                     res.get();
 402                     throw new RuntimeException("CancellationException expected");
 403                 } catch (CancellationException x) {
 404                 }
 405                 try {
 406                     res.get(1, TimeUnit.SECONDS);
 407                     throw new RuntimeException("CancellationException expected");
 408                 } catch (CancellationException x) {
 409                 }
 410 
 411                 // check that the cancel doesn't impact writing to the channel
 412                 if (!mayInterruptIfRunning) {
 413                     buf = ByteBuffer.wrap("a".getBytes());
 414                     ch.write(buf).get();
 415                 }
 416 
 417                 ch.close();
 418                 peer.close();
 419             }
 420         }
 421     }
 422 
 423     static void testRead1() throws Exception {
 424         System.out.println("-- read (1) --");
 425 
 426         try (Server server = new Server()) {
 427             final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 428             ch.connect(server.address()).get();
 429 
 430             // read with 0 bytes remaining should complete immediately
 431             ByteBuffer buf = ByteBuffer.allocate(1);
 432             buf.put((byte)0);
 433             int n = ch.read(buf).get();
 434             if (n != 0)
 435                 throw new RuntimeException("0 expected");
 436 
 437             // write bytes and close connection
 438             ByteBuffer src = genBuffer();
 439             try (SocketChannel sc = server.accept()) {
 440                 sc.setOption(SO_SNDBUF, src.remaining());
 441                 while (src.hasRemaining())
 442                     sc.write(src);
 443             }
 444 
 445             // reads should complete immediately
 446             final ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity() + 100);
 447             final CountDownLatch latch = new CountDownLatch(1);
 448             ch.read(dst, (Void)null, new CompletionHandler<Integer,Void>() {
 449                 public void completed(Integer result, Void att) {
 450                     int n = result;
 451                     if (n > 0) {
 452                         ch.read(dst, (Void)null, this);
 453                     } else {
 454                         latch.countDown();
 455                     }
 456                 }
 457                 public void failed(Throwable exc, Void att) {
 458                 }
 459             });
 460 
 461             latch.await();
 462 
 463             // check buffers
 464             src.flip();
 465             dst.flip();
 466             if (!src.equals(dst)) {
 467                 throw new RuntimeException("Contents differ");
 468             }
 469 
 470             // close channel
 471             ch.close();
 472 
 473             // check read fails with ClosedChannelException
 474             try {
 475                 ch.read(dst).get();
 476                 throw new RuntimeException("ExecutionException expected");
 477             } catch (ExecutionException x) {
 478                 if (!(x.getCause() instanceof ClosedChannelException))
 479                     throw new RuntimeException("Cause of ClosedChannelException expected",
 480                             x.getCause());
 481             }
 482         }
 483     }
 484 
 485     static void testRead2() throws Exception {
 486         System.out.println("-- read (2) --");
 487 
 488         try (Server server = new Server()) {
 489             final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 490             ch.connect(server.address()).get();
 491             SocketChannel sc = server.accept();
 492 
 493             ByteBuffer src = genBuffer();
 494 
 495             // read until the buffer is full
 496             final ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity());
 497             final CountDownLatch latch = new CountDownLatch(1);
 498             ch.read(dst, (Void)null, new CompletionHandler<Integer,Void>() {
 499                 public void completed(Integer result, Void att) {
 500                     if (dst.hasRemaining()) {
 501                         ch.read(dst, (Void)null, this);
 502                     } else {
 503                         latch.countDown();
 504                     }
 505                 }
 506                 public void failed(Throwable exc, Void att) {
 507                 }
 508             });
 509 
 510             // trickle the writing
 511             do {
 512                 int rem = src.remaining();
 513                 int size = (rem <= 100) ? rem : 50 + rand.nextInt(rem - 100);
 514                 ByteBuffer buf = ByteBuffer.allocate(size);
 515                 for (int i=0; i<size; i++)
 516                     buf.put(src.get());
 517                 buf.flip();
 518                 Thread.sleep(50 + rand.nextInt(1500));
 519                 while (buf.hasRemaining())
 520                     sc.write(buf);
 521             } while (src.hasRemaining());
 522 
 523             // wait until ascynrhonous reading has completed
 524             latch.await();
 525 
 526             // check buffers
 527             src.flip();
 528             dst.flip();
 529             if (!src.equals(dst)) {
 530                throw new RuntimeException("Contents differ");
 531             }
 532 
 533             sc.close();
 534             ch.close();
 535         }
 536     }
 537 
 538     // exercise scattering read
 539     static void testRead3() throws Exception {
 540         System.out.println("-- read (3) --");
 541 
 542         try (Server server = new Server()) {
 543             final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 544             ch.connect(server.address()).get();
 545             SocketChannel sc = server.accept();
 546 
 547             ByteBuffer[] dsts = new ByteBuffer[3];
 548             for (int i=0; i<dsts.length; i++) {
 549                 dsts[i] = ByteBuffer.allocateDirect(100);
 550             }
 551 
 552             // scattering read that completes ascynhronously
 553             final CountDownLatch l1 = new CountDownLatch(1);
 554             ch.read(dsts, 0, dsts.length, 0L, TimeUnit.SECONDS, (Void)null,
 555                 new CompletionHandler<Long,Void>() {
 556                     public void completed(Long result, Void att) {
 557                         long n = result;
 558                         if (n <= 0)
 559                             throw new RuntimeException("No bytes read");
 560                         l1.countDown();
 561                     }
 562                     public void failed(Throwable exc, Void att) {
 563                     }
 564             });
 565 
 566             // write some bytes
 567             sc.write(genBuffer());
 568 
 569             // read should now complete
 570             l1.await();
 571 
 572             // write more bytes
 573             sc.write(genBuffer());
 574 
 575             // read should complete immediately
 576             for (int i=0; i<dsts.length; i++) {
 577                 dsts[i].rewind();
 578             }
 579 
 580             final CountDownLatch l2 = new CountDownLatch(1);
 581             ch.read(dsts, 0, dsts.length, 0L, TimeUnit.SECONDS, (Void)null,
 582                 new CompletionHandler<Long,Void>() {
 583                     public void completed(Long result, Void att) {
 584                         long n = result;
 585                         if (n <= 0)
 586                             throw new RuntimeException("No bytes read");
 587                         l2.countDown();
 588                     }
 589                     public void failed(Throwable exc, Void att) {
 590                     }
 591             });
 592             l2.await();
 593 
 594             ch.close();
 595             sc.close();
 596         }
 597     }
 598 
 599     static void testWrite1() throws Exception {
 600         System.out.println("-- write (1) --");
 601 
 602         try (Server server = new Server()) {
 603             final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 604             ch.connect(server.address()).get();
 605             SocketChannel sc = server.accept();
 606 
 607             // write with 0 bytes remaining should complete immediately
 608             ByteBuffer buf = ByteBuffer.allocate(1);
 609             buf.put((byte)0);
 610             int n = ch.write(buf).get();
 611             if (n != 0)
 612                 throw new RuntimeException("0 expected");
 613 
 614             // write all bytes and close connection when done
 615             final ByteBuffer src = genBuffer();
 616             ch.write(src, (Void)null, new CompletionHandler<Integer,Void>() {
 617                 public void completed(Integer result, Void att) {
 618                     if (src.hasRemaining()) {
 619                         ch.write(src, (Void)null, this);
 620                     } else {
 621                         try {
 622                             ch.close();
 623                         } catch (IOException ignore) { }
 624                     }
 625                 }
 626                 public void failed(Throwable exc, Void att) {
 627                 }
 628             });
 629 
 630             // read to EOF or buffer full
 631             ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity() + 100);
 632             do {
 633                 n = sc.read(dst);
 634             } while (n > 0);
 635             sc.close();
 636 
 637             // check buffers
 638             src.flip();
 639             dst.flip();
 640             if (!src.equals(dst)) {
 641                 throw new RuntimeException("Contents differ");
 642             }
 643 
 644             // check write fails with ClosedChannelException
 645             try {
 646                 ch.read(dst).get();
 647                 throw new RuntimeException("ExecutionException expected");
 648             } catch (ExecutionException x) {
 649                 if (!(x.getCause() instanceof ClosedChannelException))
 650                     throw new RuntimeException("Cause of ClosedChannelException expected",
 651                             x.getCause());
 652             }
 653         }
 654     }
 655 
 656     // exercise gathering write
 657     static void testWrite2() throws Exception {
 658         System.out.println("-- write (2) --");
 659 
 660         try (Server server = new Server()) {
 661             final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 662             ch.connect(server.address()).get();
 663             SocketChannel sc = server.accept();
 664 
 665             // number of bytes written
 666             final AtomicLong bytesWritten = new AtomicLong(0);
 667 
 668             // write buffers (should complete immediately)
 669             ByteBuffer[] srcs = genBuffers(1);
 670             final CountDownLatch l1 = new CountDownLatch(1);
 671             ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, (Void)null,
 672                 new CompletionHandler<Long,Void>() {
 673                     public void completed(Long result, Void att) {
 674                         long n = result;
 675                         if (n <= 0)
 676                             throw new RuntimeException("No bytes read");
 677                         bytesWritten.addAndGet(n);
 678                         l1.countDown();
 679                     }
 680                     public void failed(Throwable exc, Void att) {
 681                     }
 682             });
 683             l1.await();
 684 
 685             // set to true to signal that no more buffers should be written
 686             final AtomicBoolean continueWriting = new AtomicBoolean(true);
 687 
 688             // write until socket buffer is full so as to create the conditions
 689             // for when a write does not complete immediately
 690             srcs = genBuffers(1);
 691             ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, (Void)null,
 692                 new CompletionHandler<Long,Void>() {
 693                     public void completed(Long result, Void att) {
 694                         long n = result;
 695                         if (n <= 0)
 696                             throw new RuntimeException("No bytes written");
 697                         bytesWritten.addAndGet(n);
 698                         if (continueWriting.get()) {
 699                             ByteBuffer[] srcs = genBuffers(8);
 700                             ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS,
 701                                 (Void)null, this);
 702                         }
 703                     }
 704                     public void failed(Throwable exc, Void att) {
 705                     }
 706             });
 707 
 708             // give time for socket buffer to fill up.
 709             Thread.sleep(5*1000);
 710 
 711             // signal handler to stop further writing
 712             continueWriting.set(false);
 713 
 714             // read until done
 715             ByteBuffer buf = ByteBuffer.allocateDirect(4096);
 716             long total = 0L;
 717             do {
 718                 int n = sc.read(buf);
 719                 if (n <= 0)
 720                     throw new RuntimeException("No bytes read");
 721                 buf.rewind();
 722                 total += n;
 723             } while (total < bytesWritten.get());
 724 
 725             ch.close();
 726             sc.close();
 727         }
 728     }
 729 
 730     static void testShutdown() throws Exception {
 731         System.out.println("-- shutdown--");
 732 
 733         try (Server server = new Server();
 734              AsynchronousSocketChannel ch = AsynchronousSocketChannel.open())
 735         {
 736             ch.connect(server.address()).get();
 737             try (SocketChannel peer = server.accept()) {
 738                 ByteBuffer buf = ByteBuffer.allocateDirect(1000);
 739                 int n;
 740 
 741                 // check read
 742                 ch.shutdownInput();
 743                 n = ch.read(buf).get();
 744                 if (n != -1)
 745                     throw new RuntimeException("-1 expected");
 746                 // check full with full buffer
 747                 buf.put(new byte[100]);
 748                 n = ch.read(buf).get();
 749                 if (n != -1)
 750                     throw new RuntimeException("-1 expected");
 751 
 752                 // check write
 753                 ch.shutdownOutput();
 754                 try {
 755                     ch.write(buf).get();
 756                     throw new RuntimeException("ClosedChannelException expected");
 757                 } catch (ExecutionException x) {
 758                     if (!(x.getCause() instanceof ClosedChannelException))
 759                         throw new RuntimeException("ClosedChannelException expected",
 760                                 x.getCause());
 761                 }
 762             }
 763         }
 764     }
 765 
 766     static void testTimeout() throws Exception {
 767         System.out.println("-- timeouts --");
 768         testTimeout(Integer.MIN_VALUE, TimeUnit.SECONDS);
 769         testTimeout(-1L, TimeUnit.SECONDS);
 770         testTimeout(0L, TimeUnit.SECONDS);
 771         testTimeout(2L, TimeUnit.SECONDS);
 772     }
 773 
 774     static void testTimeout(final long timeout, final TimeUnit unit) throws Exception {
 775         try (Server server = new Server()) {
 776             AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 777             ch.connect(server.address()).get();
 778 
 779             ByteBuffer dst = ByteBuffer.allocate(512);
 780 
 781             final AtomicReference<Throwable> readException = new AtomicReference<Throwable>();
 782 
 783             // this read should timeout if value is > 0
 784             ch.read(dst, timeout, unit, null, new CompletionHandler<Integer,Void>() {
 785                 public void completed(Integer result, Void att) {
 786                     readException.set(new RuntimeException("Should not complete"));
 787                 }
 788                 public void failed(Throwable exc, Void att) {
 789                     readException.set(exc);
 790                 }
 791             });
 792             if (timeout > 0L) {
 793                 // wait for exception
 794                 while (readException.get() == null) {
 795                     Thread.sleep(100);
 796                 }
 797                 if (!(readException.get() instanceof InterruptedByTimeoutException))
 798                     throw new RuntimeException("InterruptedByTimeoutException expected",
 799                             readException.get());
 800 
 801                 // after a timeout then further reading should throw unspecified runtime exception
 802                 boolean exceptionThrown = false;
 803                 try {
 804                     ch.read(dst);
 805                 } catch (RuntimeException x) {
 806                     exceptionThrown = true;
 807                 }
 808                 if (!exceptionThrown)
 809                     throw new RuntimeException("RuntimeException expected after timeout.");
 810             } else {
 811                 Thread.sleep(1000);
 812                 Throwable exc = readException.get();
 813                 if (exc != null)
 814                     throw new RuntimeException(exc);
 815             }
 816 
 817             final AtomicReference<Throwable> writeException = new AtomicReference<Throwable>();
 818 
 819             // write bytes to fill socket buffer
 820             ch.write(genBuffer(), timeout, unit, ch,
 821                 new CompletionHandler<Integer,AsynchronousSocketChannel>()
 822             {
 823                 public void completed(Integer result, AsynchronousSocketChannel ch) {
 824                     ch.write(genBuffer(), timeout, unit, ch, this);
 825                 }
 826                 public void failed(Throwable exc, AsynchronousSocketChannel ch) {
 827                     writeException.set(exc);
 828                 }
 829             });
 830             if (timeout > 0) {
 831                 // wait for exception
 832                 while (writeException.get() == null) {
 833                     Thread.sleep(100);
 834                 }
 835                 if (!(writeException.get() instanceof InterruptedByTimeoutException))
 836                     throw new RuntimeException("InterruptedByTimeoutException expected",
 837                             writeException.get());
 838 
 839                 // after a timeout then further writing should throw unspecified runtime exception
 840                 boolean exceptionThrown = false;
 841                 try {
 842                     ch.write(genBuffer());
 843                 } catch (RuntimeException x) {
 844                     exceptionThrown = true;
 845                 }
 846                 if (!exceptionThrown)
 847                     throw new RuntimeException("RuntimeException expected after timeout.");
 848             } else {
 849                 Thread.sleep(1000);
 850                 Throwable exc = writeException.get();
 851                 if (exc != null)
 852                     throw new RuntimeException(exc);
 853             }
 854 
 855             // clean-up
 856             server.accept().close();
 857             ch.close();
 858         }
 859     }
 860 
 861     // returns ByteBuffer with random bytes
 862     static ByteBuffer genBuffer() {
 863         int size = 1024 + rand.nextInt(16000);
 864         byte[] buf = new byte[size];
 865         rand.nextBytes(buf);
 866         boolean useDirect = rand.nextBoolean();
 867         if (useDirect) {
 868             ByteBuffer bb = ByteBuffer.allocateDirect(buf.length);
 869             bb.put(buf);
 870             bb.flip();
 871             return bb;
 872         } else {
 873             return ByteBuffer.wrap(buf);
 874         }
 875     }
 876 
 877     // return ByteBuffer[] with random bytes
 878     static ByteBuffer[] genBuffers(int max) {
 879         int len = 1;
 880         if (max > 1)
 881             len += rand.nextInt(max);
 882         ByteBuffer[] bufs = new ByteBuffer[len];
 883         for (int i=0; i<len; i++)
 884             bufs[i] = genBuffer();
 885         return bufs;
 886     }
 887 
 888     // return random SocketAddress
 889     static SocketAddress genSocketAddress() {
 890         StringBuilder sb = new StringBuilder("10.");
 891         sb.append(rand.nextInt(256));
 892         sb.append('.');
 893         sb.append(rand.nextInt(256));
 894         sb.append('.');
 895         sb.append(rand.nextInt(256));
 896         InetAddress rh;
 897         try {
 898             rh = InetAddress.getByName(sb.toString());
 899         } catch (UnknownHostException x) {
 900             throw new InternalError("Should not happen");
 901         }
 902         return new InetSocketAddress(rh, rand.nextInt(65535)+1);
 903     }
 904 }