1 /*
2 * Copyright (c) 2008, 2011, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This code is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12 * version 2 for more details (a copy is included in the LICENSE file that
13 * accompanied this code).
14 *
15 * You should have received a copy of the GNU General Public License version
16 * 2 along with this work; if not, write to the Free Software Foundation,
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18 *
19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20 * or visit www.oracle.com if you need additional information or have any
21 * questions.
22 */
23
24 /* @test
25 * @bug 4607272 6842687 6878369 6944810 7023403
26 * @summary Unit test for AsynchronousSocketChannel
27 * @run main Basic -skipSlowConnectTest
28 * @key randomness
29 */
30
31 import java.nio.ByteBuffer;
32 import java.nio.channels.*;
33 import static java.net.StandardSocketOptions.*;
34 import java.net.*;
35 import java.util.Random;
36 import java.util.concurrent.*;
37 import java.util.concurrent.atomic.*;
38 import java.io.Closeable;
39 import java.io.IOException;
40
41 public class Basic {
42 static final Random rand = new Random();
43
44 static boolean skipSlowConnectTest = false;
45
46 public static void main(String[] args) throws Exception {
47 for (String arg: args) {
48 switch (arg) {
49 case "-skipSlowConnectTest" :
50 skipSlowConnectTest = true;
51 break;
52 default:
53 throw new RuntimeException("Unrecognized argument: " + arg);
54 }
55 }
56
57 testBind();
58 testSocketOptions();
59 testConnect();
60 testCloseWhenPending();
61 testCancel();
62 testRead1();
63 testRead2();
64 testRead3();
65 testWrite1();
66 testWrite2();
67 // skip timeout tests until 7052549 is fixed
68 if (!System.getProperty("os.name").startsWith("Windows"))
69 testTimeout();
70 testShutdown();
71 }
72
73 static class Server implements Closeable {
74 private final ServerSocketChannel ssc;
75 private final InetSocketAddress address;
76
77 Server() throws IOException {
78 ssc = ServerSocketChannel.open().bind(new InetSocketAddress(0));
79
80 InetAddress lh = InetAddress.getLocalHost();
81 int port = ((InetSocketAddress)(ssc.getLocalAddress())).getPort();
82 address = new InetSocketAddress(lh, port);
83 }
84
85 InetSocketAddress address() {
86 return address;
87 }
88
89 SocketChannel accept() throws IOException {
90 return ssc.accept();
91 }
92
93 public void close() throws IOException {
94 ssc.close();
95 }
96
97 }
98
99 static void testBind() throws Exception {
100 System.out.println("-- bind --");
101
102 try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
103 if (ch.getLocalAddress() != null)
104 throw new RuntimeException("Local address should be 'null'");
105 ch.bind(new InetSocketAddress(0));
106
107 // check local address after binding
108 InetSocketAddress local = (InetSocketAddress)ch.getLocalAddress();
109 if (local.getPort() == 0)
110 throw new RuntimeException("Unexpected port");
111 if (!local.getAddress().isAnyLocalAddress())
112 throw new RuntimeException("Not bound to a wildcard address");
113
114 // try to re-bind
115 try {
116 ch.bind(new InetSocketAddress(0));
117 throw new RuntimeException("AlreadyBoundException expected");
118 } catch (AlreadyBoundException x) {
119 }
120 }
121
122 // check ClosedChannelException
123 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
124 ch.close();
125 try {
126 ch.bind(new InetSocketAddress(0));
127 throw new RuntimeException("ClosedChannelException expected");
128 } catch (ClosedChannelException x) {
129 }
130 }
131
132 static void testSocketOptions() throws Exception {
133 System.out.println("-- socket options --");
134
135 try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
136 ch.setOption(SO_RCVBUF, 128*1024)
137 .setOption(SO_SNDBUF, 128*1024)
138 .setOption(SO_REUSEADDR, true);
139
140 // check SO_SNDBUF/SO_RCVBUF limits
141 int before, after;
142 before = ch.getOption(SO_SNDBUF);
143 after = ch.setOption(SO_SNDBUF, Integer.MAX_VALUE).getOption(SO_SNDBUF);
144 if (after < before)
145 throw new RuntimeException("setOption caused SO_SNDBUF to decrease");
146 before = ch.getOption(SO_RCVBUF);
147 after = ch.setOption(SO_RCVBUF, Integer.MAX_VALUE).getOption(SO_RCVBUF);
148 if (after < before)
149 throw new RuntimeException("setOption caused SO_RCVBUF to decrease");
150
151 ch.bind(new InetSocketAddress(0));
152
153 // default values
154 if (ch.getOption(SO_KEEPALIVE))
155 throw new RuntimeException("Default of SO_KEEPALIVE should be 'false'");
156 if (ch.getOption(TCP_NODELAY))
157 throw new RuntimeException("Default of TCP_NODELAY should be 'false'");
158
159 // set and check
160 if (!ch.setOption(SO_KEEPALIVE, true).getOption(SO_KEEPALIVE))
161 throw new RuntimeException("SO_KEEPALIVE did not change");
162 if (!ch.setOption(TCP_NODELAY, true).getOption(TCP_NODELAY))
163 throw new RuntimeException("SO_KEEPALIVE did not change");
164
165 // read others (can't check as actual value is implementation dependent)
166 ch.getOption(SO_RCVBUF);
167 ch.getOption(SO_SNDBUF);
168 }
169 }
170
171 static void testConnect() throws Exception {
172 System.out.println("-- connect --");
173
174 SocketAddress address;
175
176 try (Server server = new Server()) {
177 address = server.address();
178
179 // connect to server and check local/remote addresses
180 try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
181 ch.connect(address).get();
182 // check local address
183 if (ch.getLocalAddress() == null)
184 throw new RuntimeException("Not bound to local address");
185
186 // check remote address
187 InetSocketAddress remote = (InetSocketAddress)ch.getRemoteAddress();
188 if (remote.getPort() != server.address().getPort())
189 throw new RuntimeException("Connected to unexpected port");
190 if (!remote.getAddress().equals(server.address().getAddress()))
191 throw new RuntimeException("Connected to unexpected address");
192
193 // try to connect again
194 try {
195 ch.connect(server.address()).get();
196 throw new RuntimeException("AlreadyConnectedException expected");
197 } catch (AlreadyConnectedException x) {
198 }
199
200 // clean-up
201 server.accept().close();
202 }
203
204 // check that connect fails with ClosedChannelException
205 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
206 ch.close();
207 try {
208 ch.connect(server.address()).get();
209 throw new RuntimeException("ExecutionException expected");
210 } catch (ExecutionException x) {
211 if (!(x.getCause() instanceof ClosedChannelException))
212 throw new RuntimeException("Cause of ClosedChannelException expected");
213 }
214 final AtomicReference<Throwable> connectException = new AtomicReference<>();
215 ch.connect(server.address(), (Void)null, new CompletionHandler<Void,Void>() {
216 public void completed(Void result, Void att) {
217 }
218 public void failed(Throwable exc, Void att) {
219 connectException.set(exc);
220 }
221 });
222 while (connectException.get() == null) {
223 Thread.sleep(100);
224 }
225 if (!(connectException.get() instanceof ClosedChannelException))
226 throw new RuntimeException("ClosedChannelException expected");
227 }
228
229 // test that failure to connect closes the channel
230 try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
231 try {
232 ch.connect(address).get();
233 } catch (ExecutionException x) {
234 // failed to establish connection
235 if (ch.isOpen())
236 throw new RuntimeException("Channel should be closed");
237 }
238 }
239
240 // repeat test by connecting to a (probably) non-existent host. This
241 // improves the chance that the connect will not fail immediately.
242 if (!skipSlowConnectTest) {
243 try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
244 try {
245 ch.connect(genSocketAddress()).get();
246 } catch (ExecutionException x) {
247 // failed to establish connection
248 if (ch.isOpen())
249 throw new RuntimeException("Channel should be closed");
250 }
251 }
252 }
253 }
254
255 static void testCloseWhenPending() throws Exception {
256 System.out.println("-- asynchronous close when connecting --");
257
258 AsynchronousSocketChannel ch;
259
260 // asynchronous close while connecting
261 ch = AsynchronousSocketChannel.open();
262 Future<Void> connectResult = ch.connect(genSocketAddress());
263
264 // give time to initiate the connect (SYN)
265 Thread.sleep(50);
266
267 // close
268 ch.close();
269
270 // check that exception is thrown in timely manner
271 try {
272 connectResult.get(5, TimeUnit.SECONDS);
273 } catch (TimeoutException x) {
274 throw new RuntimeException("AsynchronousCloseException not thrown");
275 } catch (ExecutionException x) {
276 // expected
277 }
278
279 System.out.println("-- asynchronous close when reading --");
280
281 try (Server server = new Server()) {
282 ch = AsynchronousSocketChannel.open();
283 ch.connect(server.address()).get();
284
285 ByteBuffer dst = ByteBuffer.allocateDirect(100);
286 Future<Integer> result = ch.read(dst);
287
288 // attempt a second read - should fail with ReadPendingException
289 ByteBuffer buf = ByteBuffer.allocateDirect(100);
290 try {
291 ch.read(buf);
292 throw new RuntimeException("ReadPendingException expected");
293 } catch (ReadPendingException x) {
294 }
295
296 // close channel (should cause initial read to complete)
297 ch.close();
298 server.accept().close();
299
300 // check that AsynchronousCloseException is thrown
301 try {
302 result.get();
303 throw new RuntimeException("Should not read");
304 } catch (ExecutionException x) {
305 if (!(x.getCause() instanceof AsynchronousCloseException))
306 throw new RuntimeException(x);
307 }
308
309 System.out.println("-- asynchronous close when writing --");
310
311 ch = AsynchronousSocketChannel.open();
312 ch.connect(server.address()).get();
313
314 final AtomicReference<Throwable> writeException =
315 new AtomicReference<Throwable>();
316
317 // write bytes to fill socket buffer
318 ch.write(genBuffer(), ch, new CompletionHandler<Integer,AsynchronousSocketChannel>() {
319 public void completed(Integer result, AsynchronousSocketChannel ch) {
320 ch.write(genBuffer(), ch, this);
321 }
322 public void failed(Throwable x, AsynchronousSocketChannel ch) {
323 writeException.set(x);
324 }
325 });
326
327 // give time for socket buffer to fill up.
328 Thread.sleep(5*1000);
329
330 // attempt a concurrent write - should fail with WritePendingException
331 try {
332 ch.write(genBuffer());
333 throw new RuntimeException("WritePendingException expected");
334 } catch (WritePendingException x) {
335 }
336
337 // close channel - should cause initial write to complete
338 ch.close();
339 server.accept().close();
340
341 // wait for exception
342 while (writeException.get() == null) {
343 Thread.sleep(100);
344 }
345 if (!(writeException.get() instanceof AsynchronousCloseException))
346 throw new RuntimeException("AsynchronousCloseException expected");
347 }
348 }
349
350 static void testCancel() throws Exception {
351 System.out.println("-- cancel --");
352
353 try (Server server = new Server()) {
354 for (int i=0; i<2; i++) {
355 boolean mayInterruptIfRunning = (i == 0) ? false : true;
356
357 // establish loopback connection
358 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
359 ch.connect(server.address()).get();
360 SocketChannel peer = server.accept();
361
362 // start read operation
363 ByteBuffer buf = ByteBuffer.allocate(1);
364 Future<Integer> res = ch.read(buf);
365
366 // cancel operation
367 boolean cancelled = res.cancel(mayInterruptIfRunning);
368
369 // check post-conditions
370 if (!res.isDone())
371 throw new RuntimeException("isDone should return true");
372 if (res.isCancelled() != cancelled)
373 throw new RuntimeException("isCancelled not consistent");
374 try {
375 res.get();
376 throw new RuntimeException("CancellationException expected");
377 } catch (CancellationException x) {
378 }
379 try {
380 res.get(1, TimeUnit.SECONDS);
381 throw new RuntimeException("CancellationException expected");
382 } catch (CancellationException x) {
383 }
384
385 // check that the cancel doesn't impact writing to the channel
386 if (!mayInterruptIfRunning) {
387 buf = ByteBuffer.wrap("a".getBytes());
388 ch.write(buf).get();
389 }
390
391 ch.close();
392 peer.close();
393 }
394 }
395 }
396
397 static void testRead1() throws Exception {
398 System.out.println("-- read (1) --");
399
400 try (Server server = new Server()) {
401 final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
402 ch.connect(server.address()).get();
403
404 // read with 0 bytes remaining should complete immediately
405 ByteBuffer buf = ByteBuffer.allocate(1);
406 buf.put((byte)0);
407 int n = ch.read(buf).get();
408 if (n != 0)
409 throw new RuntimeException("0 expected");
410
411 // write bytes and close connection
412 ByteBuffer src = genBuffer();
413 try (SocketChannel sc = server.accept()) {
414 sc.setOption(SO_SNDBUF, src.remaining());
415 while (src.hasRemaining())
416 sc.write(src);
417 }
418
419 // reads should complete immediately
420 final ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity() + 100);
421 final CountDownLatch latch = new CountDownLatch(1);
422 ch.read(dst, (Void)null, new CompletionHandler<Integer,Void>() {
423 public void completed(Integer result, Void att) {
424 int n = result;
425 if (n > 0) {
426 ch.read(dst, (Void)null, this);
427 } else {
428 latch.countDown();
429 }
430 }
431 public void failed(Throwable exc, Void att) {
432 }
433 });
434
435 latch.await();
436
437 // check buffers
438 src.flip();
439 dst.flip();
440 if (!src.equals(dst)) {
441 throw new RuntimeException("Contents differ");
442 }
443
444 // close channel
445 ch.close();
446
447 // check read fails with ClosedChannelException
448 try {
449 ch.read(dst).get();
450 throw new RuntimeException("ExecutionException expected");
451 } catch (ExecutionException x) {
452 if (!(x.getCause() instanceof ClosedChannelException))
453 throw new RuntimeException("Cause of ClosedChannelException expected");
454 }
455 }
456 }
457
458 static void testRead2() throws Exception {
459 System.out.println("-- read (2) --");
460
461 try (Server server = new Server()) {
462 final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
463 ch.connect(server.address()).get();
464 SocketChannel sc = server.accept();
465
466 ByteBuffer src = genBuffer();
467
468 // read until the buffer is full
469 final ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity());
470 final CountDownLatch latch = new CountDownLatch(1);
471 ch.read(dst, (Void)null, new CompletionHandler<Integer,Void>() {
472 public void completed(Integer result, Void att) {
473 if (dst.hasRemaining()) {
474 ch.read(dst, (Void)null, this);
475 } else {
476 latch.countDown();
477 }
478 }
479 public void failed(Throwable exc, Void att) {
480 }
481 });
482
483 // trickle the writing
484 do {
485 int rem = src.remaining();
486 int size = (rem <= 100) ? rem : 50 + rand.nextInt(rem - 100);
487 ByteBuffer buf = ByteBuffer.allocate(size);
488 for (int i=0; i<size; i++)
489 buf.put(src.get());
490 buf.flip();
491 Thread.sleep(50 + rand.nextInt(1500));
492 while (buf.hasRemaining())
493 sc.write(buf);
494 } while (src.hasRemaining());
495
496 // wait until ascynrhonous reading has completed
497 latch.await();
498
499 // check buffers
500 src.flip();
501 dst.flip();
502 if (!src.equals(dst)) {
503 throw new RuntimeException("Contents differ");
504 }
505
506 sc.close();
507 ch.close();
508 }
509 }
510
511 // exercise scattering read
512 static void testRead3() throws Exception {
513 System.out.println("-- read (3) --");
514
515 try (Server server = new Server()) {
516 final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
517 ch.connect(server.address()).get();
518 SocketChannel sc = server.accept();
519
520 ByteBuffer[] dsts = new ByteBuffer[3];
521 for (int i=0; i<dsts.length; i++) {
522 dsts[i] = ByteBuffer.allocateDirect(100);
523 }
524
525 // scattering read that completes ascynhronously
526 final CountDownLatch l1 = new CountDownLatch(1);
527 ch.read(dsts, 0, dsts.length, 0L, TimeUnit.SECONDS, (Void)null,
528 new CompletionHandler<Long,Void>() {
529 public void completed(Long result, Void att) {
530 long n = result;
531 if (n <= 0)
532 throw new RuntimeException("No bytes read");
533 l1.countDown();
534 }
535 public void failed(Throwable exc, Void att) {
536 }
537 });
538
539 // write some bytes
540 sc.write(genBuffer());
541
542 // read should now complete
543 l1.await();
544
545 // write more bytes
546 sc.write(genBuffer());
547
548 // read should complete immediately
549 for (int i=0; i<dsts.length; i++) {
550 dsts[i].rewind();
551 }
552
553 final CountDownLatch l2 = new CountDownLatch(1);
554 ch.read(dsts, 0, dsts.length, 0L, TimeUnit.SECONDS, (Void)null,
555 new CompletionHandler<Long,Void>() {
556 public void completed(Long result, Void att) {
557 long n = result;
558 if (n <= 0)
559 throw new RuntimeException("No bytes read");
560 l2.countDown();
561 }
562 public void failed(Throwable exc, Void att) {
563 }
564 });
565 l2.await();
566
567 ch.close();
568 sc.close();
569 }
570 }
571
572 static void testWrite1() throws Exception {
573 System.out.println("-- write (1) --");
574
575 try (Server server = new Server()) {
576 final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
577 ch.connect(server.address()).get();
578 SocketChannel sc = server.accept();
579
580 // write with 0 bytes remaining should complete immediately
581 ByteBuffer buf = ByteBuffer.allocate(1);
582 buf.put((byte)0);
583 int n = ch.write(buf).get();
584 if (n != 0)
585 throw new RuntimeException("0 expected");
586
587 // write all bytes and close connection when done
588 final ByteBuffer src = genBuffer();
589 ch.write(src, (Void)null, new CompletionHandler<Integer,Void>() {
590 public void completed(Integer result, Void att) {
591 if (src.hasRemaining()) {
592 ch.write(src, (Void)null, this);
593 } else {
594 try {
595 ch.close();
596 } catch (IOException ignore) { }
597 }
598 }
599 public void failed(Throwable exc, Void att) {
600 }
601 });
602
603 // read to EOF or buffer full
604 ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity() + 100);
605 do {
606 n = sc.read(dst);
607 } while (n > 0);
608 sc.close();
609
610 // check buffers
611 src.flip();
612 dst.flip();
613 if (!src.equals(dst)) {
614 throw new RuntimeException("Contents differ");
615 }
616
617 // check write fails with ClosedChannelException
618 try {
619 ch.read(dst).get();
620 throw new RuntimeException("ExecutionException expected");
621 } catch (ExecutionException x) {
622 if (!(x.getCause() instanceof ClosedChannelException))
623 throw new RuntimeException("Cause of ClosedChannelException expected");
624 }
625 }
626 }
627
628 // exercise gathering write
629 static void testWrite2() throws Exception {
630 System.out.println("-- write (2) --");
631
632 try (Server server = new Server()) {
633 final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
634 ch.connect(server.address()).get();
635 SocketChannel sc = server.accept();
636
637 // number of bytes written
638 final AtomicLong bytesWritten = new AtomicLong(0);
639
640 // write buffers (should complete immediately)
641 ByteBuffer[] srcs = genBuffers(1);
642 final CountDownLatch l1 = new CountDownLatch(1);
643 ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, (Void)null,
644 new CompletionHandler<Long,Void>() {
645 public void completed(Long result, Void att) {
646 long n = result;
647 if (n <= 0)
648 throw new RuntimeException("No bytes read");
649 bytesWritten.addAndGet(n);
650 l1.countDown();
651 }
652 public void failed(Throwable exc, Void att) {
653 }
654 });
655 l1.await();
656
657 // set to true to signal that no more buffers should be written
658 final AtomicBoolean continueWriting = new AtomicBoolean(true);
659
660 // write until socket buffer is full so as to create the conditions
661 // for when a write does not complete immediately
662 srcs = genBuffers(1);
663 ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, (Void)null,
664 new CompletionHandler<Long,Void>() {
665 public void completed(Long result, Void att) {
666 long n = result;
667 if (n <= 0)
668 throw new RuntimeException("No bytes written");
669 bytesWritten.addAndGet(n);
670 if (continueWriting.get()) {
671 ByteBuffer[] srcs = genBuffers(8);
672 ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS,
673 (Void)null, this);
674 }
675 }
676 public void failed(Throwable exc, Void att) {
677 }
678 });
679
680 // give time for socket buffer to fill up.
681 Thread.sleep(5*1000);
682
683 // signal handler to stop further writing
684 continueWriting.set(false);
685
686 // read until done
687 ByteBuffer buf = ByteBuffer.allocateDirect(4096);
688 long total = 0L;
689 do {
690 int n = sc.read(buf);
691 if (n <= 0)
692 throw new RuntimeException("No bytes read");
693 buf.rewind();
694 total += n;
695 } while (total < bytesWritten.get());
696
697 ch.close();
698 sc.close();
699 }
700 }
701
702 static void testShutdown() throws Exception {
703 System.out.println("-- shutdown--");
704
705 try (Server server = new Server();
706 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open())
707 {
708 ch.connect(server.address()).get();
709 try (SocketChannel peer = server.accept()) {
710 ByteBuffer buf = ByteBuffer.allocateDirect(1000);
711 int n;
712
713 // check read
714 ch.shutdownInput();
715 n = ch.read(buf).get();
716 if (n != -1)
717 throw new RuntimeException("-1 expected");
718 // check full with full buffer
719 buf.put(new byte[100]);
720 n = ch.read(buf).get();
721 if (n != -1)
722 throw new RuntimeException("-1 expected");
723
724 // check write
725 ch.shutdownOutput();
726 try {
727 ch.write(buf).get();
728 throw new RuntimeException("ClosedChannelException expected");
729 } catch (ExecutionException x) {
730 if (!(x.getCause() instanceof ClosedChannelException))
731 throw new RuntimeException("ClosedChannelException expected");
732 }
733 }
734 }
735 }
736
737 static void testTimeout() throws Exception {
738 System.out.println("-- timeouts --");
739 testTimeout(Integer.MIN_VALUE, TimeUnit.SECONDS);
740 testTimeout(-1L, TimeUnit.SECONDS);
741 testTimeout(0L, TimeUnit.SECONDS);
742 testTimeout(2L, TimeUnit.SECONDS);
743 }
744
745 static void testTimeout(final long timeout, final TimeUnit unit) throws Exception {
746 try (Server server = new Server()) {
747 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
748 ch.connect(server.address()).get();
749
750 ByteBuffer dst = ByteBuffer.allocate(512);
751
752 final AtomicReference<Throwable> readException = new AtomicReference<Throwable>();
753
754 // this read should timeout if value is > 0
755 ch.read(dst, timeout, unit, null, new CompletionHandler<Integer,Void>() {
756 public void completed(Integer result, Void att) {
757 readException.set(new RuntimeException("Should not complete"));
758 }
759 public void failed(Throwable exc, Void att) {
760 readException.set(exc);
761 }
762 });
763 if (timeout > 0L) {
764 // wait for exception
765 while (readException.get() == null) {
766 Thread.sleep(100);
767 }
768 if (!(readException.get() instanceof InterruptedByTimeoutException))
769 throw new RuntimeException("InterruptedByTimeoutException expected");
770
771 // after a timeout then further reading should throw unspecified runtime exception
772 boolean exceptionThrown = false;
773 try {
774 ch.read(dst);
775 } catch (RuntimeException x) {
776 exceptionThrown = true;
777 }
778 if (!exceptionThrown)
779 throw new RuntimeException("RuntimeException expected after timeout.");
780 } else {
781 Thread.sleep(1000);
782 Throwable exc = readException.get();
783 if (exc != null)
784 throw new RuntimeException(exc);
785 }
786
787 final AtomicReference<Throwable> writeException = new AtomicReference<Throwable>();
788
789 // write bytes to fill socket buffer
790 ch.write(genBuffer(), timeout, unit, ch,
791 new CompletionHandler<Integer,AsynchronousSocketChannel>()
792 {
793 public void completed(Integer result, AsynchronousSocketChannel ch) {
794 ch.write(genBuffer(), timeout, unit, ch, this);
795 }
796 public void failed(Throwable exc, AsynchronousSocketChannel ch) {
797 writeException.set(exc);
798 }
799 });
800 if (timeout > 0) {
801 // wait for exception
802 while (writeException.get() == null) {
803 Thread.sleep(100);
804 }
805 if (!(writeException.get() instanceof InterruptedByTimeoutException))
806 throw new RuntimeException("InterruptedByTimeoutException expected");
807
808 // after a timeout then further writing should throw unspecified runtime exception
809 boolean exceptionThrown = false;
810 try {
811 ch.write(genBuffer());
812 } catch (RuntimeException x) {
813 exceptionThrown = true;
814 }
815 if (!exceptionThrown)
816 throw new RuntimeException("RuntimeException expected after timeout.");
817 } else {
818 Thread.sleep(1000);
819 Throwable exc = writeException.get();
820 if (exc != null)
821 throw new RuntimeException(exc);
822 }
823
824 // clean-up
825 server.accept().close();
826 ch.close();
827 }
828 }
829
830 // returns ByteBuffer with random bytes
831 static ByteBuffer genBuffer() {
832 int size = 1024 + rand.nextInt(16000);
833 byte[] buf = new byte[size];
834 rand.nextBytes(buf);
835 boolean useDirect = rand.nextBoolean();
836 if (useDirect) {
837 ByteBuffer bb = ByteBuffer.allocateDirect(buf.length);
838 bb.put(buf);
839 bb.flip();
840 return bb;
841 } else {
842 return ByteBuffer.wrap(buf);
843 }
844 }
845
846 // return ByteBuffer[] with random bytes
847 static ByteBuffer[] genBuffers(int max) {
848 int len = 1;
849 if (max > 1)
850 len += rand.nextInt(max);
851 ByteBuffer[] bufs = new ByteBuffer[len];
852 for (int i=0; i<len; i++)
853 bufs[i] = genBuffer();
854 return bufs;
855 }
856
857 // return random SocketAddress
858 static SocketAddress genSocketAddress() {
859 StringBuilder sb = new StringBuilder("10.");
860 sb.append(rand.nextInt(256));
861 sb.append('.');
862 sb.append(rand.nextInt(256));
863 sb.append('.');
864 sb.append(rand.nextInt(256));
865 InetAddress rh;
866 try {
867 rh = InetAddress.getByName(sb.toString());
868 } catch (UnknownHostException x) {
869 throw new InternalError("Should not happen");
870 }
871 return new InetSocketAddress(rh, rand.nextInt(65535)+1);
872 }
873 }
--- EOF ---