--- /dev/null 2018-12-11 17:22:14.232000000 +0000 +++ new/test/jdk/jdk/net/RdmaSockets/rsocket/SocketChannel/IOExchanges.java 2018-12-13 19:43:53.037978430 +0000 @@ -0,0 +1,1204 @@ +/* + * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +/* + * @test + * @bug 8195160 + * @summary Tests various combinations of blocking/nonblocking connections + * @requires (os.family == "linux") + * @library ../ /test/lib + * @build RsocketTest + * @run testng/othervm IOExchanges + * @author chegar + */ + +import java.io.IOException; +import java.net.Inet6Address; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.ProtocolFamily; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.nio.ByteBuffer; +import jdk.net.RdmaSockets; +import org.testng.SkipException; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; +import static java.lang.System.out; +import static java.net.StandardProtocolFamily.INET; +import static java.net.StandardProtocolFamily.INET6; +import static java.nio.channels.SelectionKey.OP_ACCEPT; +import static java.nio.channels.SelectionKey.OP_READ; +import static java.nio.channels.SelectionKey.OP_WRITE; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +public class IOExchanges { + + // Whether, or not, to use RDMA channels or regular socket channels. + // For test assertion purposes during development. + static final boolean useRDMA = true; + + static InetAddress addr; + static ProtocolFamily family; + + @BeforeTest + public void setup() throws Exception { + if (useRDMA && !RsocketTest.isRsocketAvailable()) + throw new SkipException("rsocket is not available"); + + addr = InetAddress.getLocalHost(); + family = addr instanceof Inet6Address ? INET6 : INET; + out.printf("local address: %s%n", addr); + out.printf("useRDMA: %b%n", useRDMA); + } + + static SocketChannel openSocketChannel(ProtocolFamily family) + throws IOException + { + return useRDMA ? RdmaSockets.openSocketChannel(family) + : SocketChannel.open(); + } + static ServerSocketChannel openServerSocketChannel(ProtocolFamily family) + throws IOException + { + return useRDMA ? RdmaSockets.openServerSocketChannel(family) + : ServerSocketChannel.open(); + } + static Selector openSelector( ) throws IOException { + return useRDMA ? RdmaSockets.openSelector() : Selector.open(); + } + + /* + The following, non-exhaustive set, of tests exercise different combinations + of blocking and non-blocking accept/connect calls along with I/O + operations, that exchange a single byte. The intent it to test a reasonable + set of blocking and non-blocking scenarios. + + The individual test method names follow their test scenario. + [BAccep|SELNBAccep|SPINNBAccep] - Accept either: + blocking, select-non-blocking, spinning-non-blocking + [BConn|NBConn] - blocking connect / non-blocking connect + [BIO|NBIO] - blocking / non-blocking I/O operations (read/write) + [WR|RW] - connecting thread write/accepting thread reads first, and vice-versa + [Id] - unique test Id + + BAccep_BConn_BIO_WR_1 + BAccep_BConn_BIO_RW_2 + SELNBAccep_BConn_BIO_WR_3 + SELNBAccep_BConn_BIO_RW_4 + SPINNBAccep_BConn_BIO_WR_5 + SPINNBAccep_BConn_BIO_RW_6 + BAccep_NBConn_BIO_WR_7 + BAccep_NBConn_BIO_RW_8 + SELNBAccep_NBConn_BIO_WR_9 + SELNBAccep_NBConn_BIO_RW_10 + SPINNBAccep_NBConn_BIO_WR_11 + SPINNBAccep_NBConn_BIO_RW_12 + + BAccep_BConn_NBIO_WR_1a << Non-Blocking I/O + BAccep_BConn_NBIO_RW_2a + SELNBAccep_BConn_NBIO_WR_3a + SELNBAccep_BConn_NBIO_RW_4a + SPINNBAccep_BConn_NBIO_WR_5a + SPINNBAccep_BConn_NBIO_RW_6a + BAccep_NBConn_NBIO_WR_7a + BAccep_NBConn_NBIO_RW_8a + SELNBAccep_NBConn_NBIO_WR_9a + SELNBAccep_NBConn_NBIO_RW_10a + SPINBAccep_NBConn_NBIO_WR_11a + SPINBAccep_NBConn_NBIO_RW_12a + */ + + + @Test + public void BAccep_BConn_BIO_WR_1() throws Throwable { + try (ServerSocketChannel ssc = openServerSocketChannel(family)) { + ssc.bind(new InetSocketAddress(addr, 0)); + final int port = ssc.socket().getLocalPort(); + + TestThread t = TestThread.of("t1", () -> { + try (SocketChannel sc = openSocketChannel(family)) { + assertTrue(sc.connect(new InetSocketAddress(addr, port))); + ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x01).flip(); + assertEquals(sc.write(bb), 1); + out.printf("wrote: 0x%x%n", bb.get(0)); + assertEquals(sc.read(bb.clear()), -1); + } + }); + t.start(); + + try (SocketChannel sc = ssc.accept()) { + ByteBuffer bb = ByteBuffer.allocate(10); + assertEquals(sc.read(bb), 1); + out.printf("read: 0x%x%n", bb.get(0)); + assertEquals(bb.get(0), 0x01); + } + t.awaitCompletion(); + } + } + + @Test + public void BAccep_BConn_BIO_RW_2() throws Throwable { + try (ServerSocketChannel ssc = openServerSocketChannel(family)) { + ssc.bind(new InetSocketAddress(addr, 0)); + final int port = ssc.socket().getLocalPort(); + + TestThread t = TestThread.of("t2", () -> { + try (SocketChannel sc = openSocketChannel(family)) { + assertTrue(sc.connect(new InetSocketAddress(addr, port))); + ByteBuffer bb = ByteBuffer.allocate(10); + assertEquals(sc.read(bb), 1); + out.printf("read: 0x%x%n", bb.get(0)); + assertEquals(bb.get(0), 0x02); + } + }); + t.start(); + + try (SocketChannel sc = ssc.accept()) { + ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x02).flip(); + assertEquals(sc.write(bb), 1); + out.printf("wrote: 0x%x%n", bb.get(0)); + assertEquals(sc.read(bb.clear()), -1); + } + t.awaitCompletion(); + } + } + + @Test + public void SELNBAccep_BConn_BIO_WR_3() throws Throwable { + try (ServerSocketChannel ssc = openServerSocketChannel(family)) { + ssc.bind(new InetSocketAddress(addr, 0)); + final int port = ssc.socket().getLocalPort(); + + TestThread t = TestThread.of("t3", () -> { + try (SocketChannel sc = openSocketChannel(family)) { + assertTrue(sc.connect(new InetSocketAddress(addr, port))); + ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x03).flip(); + assertEquals(sc.write(bb), 1); + out.printf("wrote: 0x%x%n", bb.get(0)); + assertEquals(sc.read(bb.clear()), -1); + } + }); + t.start(); + + Selector selector = openSelector(); + ssc.configureBlocking(false).register(selector, OP_ACCEPT); + assertEquals(selector.select(), 1); + + try (SocketChannel sc = ssc.accept()) { + ByteBuffer bb = ByteBuffer.allocate(10); + assertEquals(sc.read(bb), 1); + out.printf("read: 0x%x%n", bb.get(0)); + assertEquals(bb.get(0), 0x03); + } + t.awaitCompletion(); + } + } + + @Test + public void SELNBAccep_BConn_BIO_RW_4() throws Throwable { + try (ServerSocketChannel ssc = openServerSocketChannel(family)) { + ssc.bind(new InetSocketAddress(addr, 0)); + final int port = ssc.socket().getLocalPort(); + + TestThread t = TestThread.of("t4", () -> { + try (SocketChannel sc = openSocketChannel(family)) { + assertTrue(sc.connect(new InetSocketAddress(addr, port))); + ByteBuffer bb = ByteBuffer.allocate(10); + assertEquals(sc.read(bb), 1); + out.printf("read: 0x%x%n", bb.get(0)); + assertEquals(bb.get(0), 0x04); + } + }); + t.start(); + + Selector selector = openSelector(); + ssc.configureBlocking(false).register(selector, OP_ACCEPT); + assertEquals(selector.select(), 1); + + try (SocketChannel sc = ssc.accept()) { + ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x04).flip(); + assertEquals(sc.write(bb), 1); + out.printf("wrote: 0x%x%n", bb.get(0)); + assertEquals(sc.read(bb.clear()), -1); + + } + t.awaitCompletion(); + } + } + + @Test + public void SPINNBAccep_BConn_BIO_WR_5() throws Throwable { + try (ServerSocketChannel ssc = openServerSocketChannel(family)) { + ssc.bind(new InetSocketAddress(addr, 0)); + final int port = ssc.socket().getLocalPort(); + + TestThread t = TestThread.of("t5", () -> { + try (SocketChannel sc = openSocketChannel(family)) { + assertTrue(sc.connect(new InetSocketAddress(addr, port))); + ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x05).flip(); + assertEquals(sc.write(bb), 1); + out.printf("wrote: 0x%x%n", bb.get(0)); + assertEquals(sc.read(bb.clear()), -1); + } + }); + t.start(); + + SocketChannel accepted; + for (;;) { + accepted = ssc.accept(); + if (accepted != null) { + out.println("accepted new connection"); + break; + } + Thread.onSpinWait(); + } + + try (SocketChannel sc = accepted) { + ByteBuffer bb = ByteBuffer.allocate(10); + assertEquals(sc.read(bb), 1); + out.printf("read: 0x%x%n", bb.get(0)); + assertEquals(bb.get(0), 0x05); + } + t.awaitCompletion(); + } + } + + @Test + public void SPINNBAccep_BConn_BIO_RW_6() throws Throwable { + try (ServerSocketChannel ssc = openServerSocketChannel(family)) { + ssc.bind(new InetSocketAddress(addr, 0)); + final int port = ssc.socket().getLocalPort(); + + TestThread t = TestThread.of("t6", () -> { + try (SocketChannel sc = openSocketChannel(family)) { + assertTrue(sc.connect(new InetSocketAddress(addr, port))); + ByteBuffer bb = ByteBuffer.allocate(10); + assertEquals(sc.read(bb), 1); + out.printf("read: 0x%x%n", bb.get(0)); + assertEquals(bb.get(0), 0x06); + } + }); + t.start(); + + SocketChannel accepted; + for (;;) { + accepted = ssc.accept(); + if (accepted != null) { + out.println("accepted new connection"); + break; + } + Thread.onSpinWait(); + } + + try (SocketChannel sc = accepted) { + ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x06).flip(); + assertEquals(sc.write(bb), 1); + out.printf("wrote: 0x%x%n", bb.get(0)); + assertEquals(sc.read(bb.clear()), -1); + + } + t.awaitCompletion(); + } + } + + // Similar to the previous six scenarios, but with same-thread + // non-blocking connect. + + @Test + public void BAccep_NBConn_BIO_WR_7() throws Throwable { + try (ServerSocketChannel ssc = openServerSocketChannel(family)) { + ssc.bind(new InetSocketAddress(addr, 0)); + final int port = ssc.socket().getLocalPort(); + + try (SocketChannel sc = openSocketChannel(family)) { + sc.configureBlocking(false); + sc.connect(new InetSocketAddress(addr, port)); + + try (SocketChannel sc2 = ssc.accept()) { + assertTrue(sc.finishConnect()); + sc.configureBlocking(true); + TestThread t = TestThread.of("t7", () -> { + ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x07).flip(); + assertEquals(sc.write(bb), 1); + out.printf("wrote: 0x%x%n", bb.get(0)); + assertEquals(sc.read(bb.clear()), -1); + }); + t.start(); + + ByteBuffer bb = ByteBuffer.allocate(10); + assertEquals(sc2.read(bb), 1); + out.printf("read: 0x%x%n", bb.get(0)); + assertEquals(bb.get(0), 0x07); + sc2.shutdownOutput(); + t.awaitCompletion(); + } + } + } + } + + @Test + public void BAccep_NBConn_BIO_RW_8() throws Throwable { + try (ServerSocketChannel ssc = openServerSocketChannel(family)) { + ssc.bind(new InetSocketAddress(addr, 0)); + final int port = ssc.socket().getLocalPort(); + + try (SocketChannel sc = openSocketChannel(family)) { + sc.configureBlocking(false); + sc.connect(new InetSocketAddress(addr, port)); + + try (SocketChannel sc2 = ssc.accept()) { + assertTrue(sc.finishConnect()); + sc.configureBlocking(true); + TestThread t = TestThread.of("t8", () -> { + ByteBuffer bb = ByteBuffer.allocate(10); + assertEquals(sc.read(bb), 1); + out.printf("read: 0x%x%n", bb.get(0)); + assertEquals(bb.get(0), 0x08); + sc.shutdownOutput(); + }); + t.start(); + + ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x08).flip(); + assertEquals(sc2.write(bb), 1); + out.printf("wrote: 0x%x%n", bb.get(0)); + assertEquals(sc2.read(bb.clear()), -1); + t.awaitCompletion(); + } + } + } + } + + @Test + public void SELNBAccep_NBConn_BIO_WR_9() throws Throwable { + try (ServerSocketChannel ssc = openServerSocketChannel(family)) { + ssc.bind(new InetSocketAddress(addr, 0)); + final int port = ssc.socket().getLocalPort(); + + try (SocketChannel sc = openSocketChannel(family)) { + sc.configureBlocking(false); + sc.connect(new InetSocketAddress(addr, port)); + + Selector selector = openSelector(); + ssc.configureBlocking(false).register(selector, OP_ACCEPT); + assertEquals(selector.select(), 1); + + try (SocketChannel sc2 = ssc.accept()) { + assertTrue(sc.finishConnect()); + sc.configureBlocking(true); + TestThread t = TestThread.of("t9", () -> { + ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x09).flip(); + assertEquals(sc.write(bb), 1); + out.printf("wrote: 0x%x%n", bb.get(0)); + assertEquals(sc.read(bb.clear()), -1); + }); + t.start(); + + ByteBuffer bb = ByteBuffer.allocate(10); + assertEquals(sc2.read(bb), 1); + out.printf("read: 0x%x%n", bb.get(0)); + assertEquals(bb.get(0), 0x09); + sc2.shutdownOutput(); + t.awaitCompletion(); + } + } + } + } + + @Test + public void SELNBAccep_NBConn_BIO_RW_10() throws Throwable { + try (ServerSocketChannel ssc = openServerSocketChannel(family)) { + ssc.bind(new InetSocketAddress(addr, 0)); + final int port = ssc.socket().getLocalPort(); + + try (SocketChannel sc = openSocketChannel(family)) { + sc.configureBlocking(false); + sc.connect(new InetSocketAddress(addr, port)); + + Selector selector = openSelector(); + ssc.configureBlocking(false).register(selector, OP_ACCEPT); + assertEquals(selector.select(), 1); + + try (SocketChannel sc2 = ssc.accept()) { + assertTrue(sc.finishConnect()); + sc.configureBlocking(true); + TestThread t = TestThread.of("t10", () -> { + ByteBuffer bb = ByteBuffer.allocate(10); + assertEquals(sc.read(bb), 1); + out.printf("read: 0x%x%n", bb.get(0)); + assertEquals(bb.get(0), 0x10); + sc.shutdownOutput(); + }); + t.start(); + + ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x10).flip(); + assertEquals(sc2.write(bb), 1); + out.printf("wrote: 0x%x%n", bb.get(0)); + assertEquals(sc2.read(bb.clear()), -1); + t.awaitCompletion(); + } + } + } + } + + @Test + public void SPINNBAccep_NBConn_BIO_WR_11() throws Throwable { + try (ServerSocketChannel ssc = openServerSocketChannel(family)) { + ssc.bind(new InetSocketAddress(addr, 0)); + final int port = ssc.socket().getLocalPort(); + + try (SocketChannel sc = openSocketChannel(family)) { + sc.configureBlocking(false); + sc.connect(new InetSocketAddress(addr, port)); + + SocketChannel accepted; + for (;;) { + accepted = ssc.accept(); + if (accepted != null) { + out.println("accepted new connection"); + break; + } + Thread.onSpinWait(); + } + + try (SocketChannel sc2 = accepted) { + assertTrue(sc.finishConnect()); + sc.configureBlocking(true); + TestThread t = TestThread.of("t11", () -> { + ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x11).flip(); + assertEquals(sc.write(bb), 1); + out.printf("wrote: 0x%x%n", bb.get(0)); + assertEquals(sc.read(bb.clear()), -1); + }); + t.start(); + + ByteBuffer bb = ByteBuffer.allocate(10); + assertEquals(sc2.read(bb), 1); + out.printf("read: 0x%x%n", bb.get(0)); + assertEquals(bb.get(0), 0x11); + sc2.shutdownOutput(); + t.awaitCompletion(); + } + } + } + } + + @Test + public void SPINNBAccep_NBConn_BIO_RW_12() throws Throwable { + try (ServerSocketChannel ssc = openServerSocketChannel(family)) { + ssc.bind(new InetSocketAddress(addr, 0)); + final int port = ssc.socket().getLocalPort(); + + try (SocketChannel sc = openSocketChannel(family)) { + sc.configureBlocking(false); + sc.connect(new InetSocketAddress(addr, port)); + + SocketChannel accepted; + for (;;) { + accepted = ssc.accept(); + if (accepted != null) { + out.println("accepted new connection"); + break; + } + Thread.onSpinWait(); + } + + try (SocketChannel sc2 = accepted) { + assertTrue(sc.finishConnect()); + sc.configureBlocking(true); + TestThread t = TestThread.of("t12", () -> { + ByteBuffer bb = ByteBuffer.allocate(10); + assertEquals(sc.read(bb), 1); + out.printf("read: 0x%x%n", bb.get(0)); + assertEquals(bb.get(0), 0x12); + sc.shutdownOutput(); + }); + t.start(); + + ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x12).flip(); + assertEquals(sc2.write(bb), 1); + out.printf("wrote: 0x%x%n", bb.get(0)); + assertEquals(sc2.read(bb.clear()), -1); + t.awaitCompletion(); + } + } + } + } + + // --- + // Similar to the previous twelve scenarios but with non-blocking IO + // --- + + @Test + public void BAccep_BConn_NBIO_WR_1a() throws Throwable { + try (ServerSocketChannel ssc = openServerSocketChannel(family)) { + ssc.bind(new InetSocketAddress(addr, 0)); + final int port = ssc.socket().getLocalPort(); + + TestThread t = TestThread.of("t1a", () -> { + try (SocketChannel sc = openSocketChannel(family); + Selector selector = openSelector()) { + assertTrue(sc.connect(new InetSocketAddress(addr, port))); + ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x1A).flip(); + sc.configureBlocking(false); + SelectionKey k = sc.register(selector, OP_WRITE); + selector.select(); + int c; + while ((c = sc.write(bb)) < 1); + assertEquals(c, 1); + out.printf("wrote: 0x%x%n", bb.get(0)); + k.interestOps(OP_READ); + selector.select(); + bb.clear(); + while ((c = sc.read(bb)) == 0); + assertEquals(c, -1); + } + }); + t.start(); + + try (SocketChannel sc = ssc.accept()) { + ByteBuffer bb = ByteBuffer.allocate(10); + sc.configureBlocking(false); + try (Selector selector = openSelector()) { + sc.register(selector, OP_READ); + selector.select(); + int c; + while ((c = sc.read(bb)) == 0) ; + assertEquals(c, 1); + out.printf("read: 0x%x%n", bb.get(0)); + assertEquals(bb.get(0), 0x1A); + } + } + t.awaitCompletion(); + } + } + + @Test + public void BAccep_BConn_NBIO_RW_2a() throws Throwable { + try (ServerSocketChannel ssc = openServerSocketChannel(family)) { + ssc.bind(new InetSocketAddress(addr, 0)); + final int port = ssc.socket().getLocalPort(); + + TestThread t = TestThread.of("t2a", () -> { + try (SocketChannel sc = openSocketChannel(family); + Selector selector = openSelector()) { + assertTrue(sc.connect(new InetSocketAddress(addr, port))); + ByteBuffer bb = ByteBuffer.allocate(10); + sc.configureBlocking(false); + sc.register(selector, OP_READ); + selector.select(); + int c; + while ((c = sc.read(bb)) == 0); + assertEquals(c, 1); + out.printf("read: 0x%x%n", bb.get(0)); + assertEquals(bb.get(0), 0x2A); + } + }); + t.start(); + + try (SocketChannel sc = ssc.accept()) { + ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x2A).flip(); + sc.configureBlocking(false); + try (Selector selector = openSelector()) { + SelectionKey k = sc.register(selector, OP_WRITE); + selector.select(); + int c; + while ((c = sc.write(bb)) < 1); + assertEquals(c, 1); + out.printf("wrote: 0x%x%n", bb.get(0)); + k.interestOps(OP_READ); + selector.select(); + bb.clear(); + while ((c = sc.read(bb)) == 0); + assertEquals(c, -1); + } + } + t.awaitCompletion(); + } + } + + @Test + public void SELNBAccep_BConn_NBIO_WR_3a() throws Throwable { + try (ServerSocketChannel ssc = openServerSocketChannel(family)) { + ssc.bind(new InetSocketAddress(addr, 0)); + final int port = ssc.socket().getLocalPort(); + + TestThread t = TestThread.of("t3a", () -> { + try (SocketChannel sc = openSocketChannel(family); + Selector selector = openSelector()) { + assertTrue(sc.connect(new InetSocketAddress(addr, port))); + ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x3A).flip(); + sc.configureBlocking(false); + SelectionKey k = sc.register(selector, OP_WRITE); + selector.select(); + int c; + while ((c = sc.write(bb)) < 1); + assertEquals(c, 1); + out.printf("wrote: 0x%x%n", bb.get(0)); + k.interestOps(OP_READ); + selector.select(); + bb.clear(); + while ((c = sc.read(bb)) == 0); + assertEquals(c, -1); + } + }); + t.start(); + + Selector aselector = openSelector(); + ssc.configureBlocking(false).register(aselector, OP_ACCEPT); + assertEquals(aselector.select(), 1); + + try (SocketChannel sc = ssc.accept()) { + ByteBuffer bb = ByteBuffer.allocate(10); + sc.configureBlocking(false); + try (Selector selector = openSelector()) { + sc.register(selector, OP_READ); + selector.select(); + int c; + while ((c = sc.read(bb)) == 0) ; + assertEquals(c, 1); + out.printf("read: 0x%x%n", bb.get(0)); + assertEquals(bb.get(0), 0x3A); + } + } + t.awaitCompletion(); + } + } + + @Test + public void SELNBAccep_BConn_NBIO_RW_4a() throws Throwable { + try (ServerSocketChannel ssc = openServerSocketChannel(family)) { + ssc.bind(new InetSocketAddress(addr, 0)); + final int port = ssc.socket().getLocalPort(); + + TestThread t = TestThread.of("t4a", () -> { + try (SocketChannel sc = openSocketChannel(family); + Selector selector = openSelector()) { + assertTrue(sc.connect(new InetSocketAddress(addr, port))); + ByteBuffer bb = ByteBuffer.allocate(10); + sc.configureBlocking(false); + sc.register(selector, OP_READ); + selector.select(); + int c; + while ((c = sc.read(bb)) == 0); + assertEquals(c, 1); + out.printf("read: 0x%x%n", bb.get(0)); + assertEquals(bb.get(0), 0x4A); + } + }); + t.start(); + + Selector aselector = openSelector(); + ssc.configureBlocking(false).register(aselector, OP_ACCEPT); + assertEquals(aselector.select(), 1); + + try (SocketChannel sc = ssc.accept()) { + ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x4A).flip(); + sc.configureBlocking(false); + try (Selector selector = openSelector()) { + SelectionKey k = sc.register(selector, OP_WRITE); + selector.select(); + int c; + while ((c = sc.write(bb)) < 1); + assertEquals(c, 1); + out.printf("wrote: 0x%x%n", bb.get(0)); + k.interestOps(OP_READ); + selector.select(); + bb.clear(); + while ((c = sc.read(bb)) == 0); + assertEquals(c, -1); + } + } + t.awaitCompletion(); + } + } + + @Test + public void SPINNBAccep_BConn_NBIO_WR_5a() throws Throwable { + try (ServerSocketChannel ssc = openServerSocketChannel(family)) { + ssc.bind(new InetSocketAddress(addr, 0)); + final int port = ssc.socket().getLocalPort(); + + TestThread t = TestThread.of("t5a", () -> { + try (SocketChannel sc = openSocketChannel(family); + Selector selector = openSelector()) { + assertTrue(sc.connect(new InetSocketAddress(addr, port))); + ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x5A).flip(); + sc.configureBlocking(false); + SelectionKey k = sc.register(selector, OP_WRITE); + selector.select(); + int c; + while ((c = sc.write(bb)) < 1); + assertEquals(c, 1); + out.printf("wrote: 0x%x%n", bb.get(0)); + k.interestOps(OP_READ); + selector.select(); + bb.clear(); + while ((c = sc.read(bb)) == 0); + assertEquals(c, -1); + } + }); + t.start(); + + SocketChannel accepted; + for (;;) { + accepted = ssc.accept(); + if (accepted != null) { + out.println("accepted new connection"); + break; + } + Thread.onSpinWait(); + } + + try (SocketChannel sc = accepted) { + ByteBuffer bb = ByteBuffer.allocate(10); + sc.configureBlocking(false); + try (Selector selector = openSelector()) { + sc.register(selector, OP_READ); + selector.select(); + int c; + while ((c = sc.read(bb)) == 0) ; + assertEquals(c, 1); + out.printf("read: 0x%x%n", bb.get(0)); + assertEquals(bb.get(0), 0x5A); + } + } + t.awaitCompletion(); + } + } + + @Test + public void SPINNBAccep_BConn_NBIO_RW_6a() throws Throwable { + try (ServerSocketChannel ssc = openServerSocketChannel(family)) { + ssc.bind(new InetSocketAddress(addr, 0)); + final int port = ssc.socket().getLocalPort(); + + TestThread t = TestThread.of("t6a", () -> { + try (SocketChannel sc = openSocketChannel(family); + Selector selector = openSelector()) { + assertTrue(sc.connect(new InetSocketAddress(addr, port))); + ByteBuffer bb = ByteBuffer.allocate(10); + sc.configureBlocking(false); + sc.register(selector, OP_READ); + selector.select(); + int c; + while ((c = sc.read(bb)) == 0); + assertEquals(c, 1); + out.printf("read: 0x%x%n", bb.get(0)); + assertEquals(bb.get(0), 0x6A); + } + }); + t.start(); + + SocketChannel accepted; + for (;;) { + accepted = ssc.accept(); + if (accepted != null) { + out.println("accepted new connection"); + break; + } + Thread.onSpinWait(); + } + + try (SocketChannel sc = accepted) { + ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x6A).flip(); + sc.configureBlocking(false); + try (Selector selector = openSelector()) { + SelectionKey k = sc.register(selector, OP_WRITE); + selector.select(); + int c; + while ((c = sc.write(bb)) < 1); + assertEquals(c, 1); + out.printf("wrote: 0x%x%n", bb.get(0)); + k.interestOps(OP_READ); + selector.select(); + bb.clear(); + while ((c = sc.read(bb)) == 0); + assertEquals(c, -1); + } + } + t.awaitCompletion(); + } + } + + // Similar to the previous six scenarios but with same-thread + // non-blocking connect. + + @Test + public void BAccep_NBConn_NBIO_WR_7a() throws Throwable { + try (ServerSocketChannel ssc = openServerSocketChannel(family)) { + ssc.bind(new InetSocketAddress(addr, 0)); + final int port = ssc.socket().getLocalPort(); + + try (SocketChannel sc = openSocketChannel(family)) { + sc.configureBlocking(false); + sc.connect(new InetSocketAddress(addr, port)); + + try (SocketChannel sc2 = ssc.accept()) { + assertTrue(sc.finishConnect()); + TestThread t = TestThread.of("t7a", () -> { + try (Selector selector = openSelector()) { + ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x7A).flip(); + sc.configureBlocking(false); + SelectionKey k = sc.register(selector, OP_WRITE); + selector.select(); + int c; + while ((c = sc.write(bb)) < 1) ; + assertEquals(c, 1); + out.printf("wrote: 0x%x%n", bb.get(0)); + k.interestOps(OP_READ); + selector.select(); + bb.clear(); + while ((c = sc.read(bb)) == 0) ; + assertEquals(c, -1); + } + }); + t.start(); + + ByteBuffer bb = ByteBuffer.allocate(10); + sc2.configureBlocking(false); + try (Selector selector = openSelector()) { + sc2.register(selector, OP_READ); + selector.select(); + int c; + while ((c = sc2.read(bb)) == 0) ; + assertEquals(c, 1); + out.printf("read: 0x%x%n", bb.get(0)); + assertEquals(bb.get(0), 0x7A); + sc2.shutdownOutput(); + } + t.awaitCompletion(); + } + } + } + } + + @Test + public void BAccep_NBConn_NBIO_RW_8a() throws Throwable { + try (ServerSocketChannel ssc = openServerSocketChannel(family)) { + ssc.bind(new InetSocketAddress(addr, 0)); + final int port = ssc.socket().getLocalPort(); + + try (SocketChannel sc = openSocketChannel(family)) { + sc.configureBlocking(false); + sc.connect(new InetSocketAddress(addr, port)); + + try (SocketChannel sc2 = ssc.accept()) { + assertTrue(sc.finishConnect()); + TestThread t = TestThread.of("t8a", () -> { + try (Selector selector = openSelector()) { + ByteBuffer bb = ByteBuffer.allocate(10); + sc.register(selector, OP_READ); + selector.select(); + int c; + while ((c = sc.read(bb)) == 0); + assertEquals(c, 1); + out.printf("read: 0x%x%n", bb.get(0)); + assertEquals(bb.get(0), (byte)0x8A); + sc.shutdownOutput(); + } + }); + t.start(); + + ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x8A).flip(); + sc2.configureBlocking(false); + try (Selector selector = openSelector()) { + SelectionKey k = sc2.register(selector, OP_WRITE); + selector.select(); + int c; + while ((c = sc2.write(bb)) < 1) ; + assertEquals(c, 1); + out.printf("wrote: 0x%x%n", bb.get(0)); + k.interestOps(OP_READ); + selector.select(); + bb.clear(); + while ((c = sc2.read(bb)) == 0) ; + assertEquals(c, -1); + } + t.awaitCompletion(); + } + } + } + } + + @Test + public void SELNBAccep_NBConn_NBIO_WR_9a() throws Throwable { + try (ServerSocketChannel ssc = openServerSocketChannel(family)) { + ssc.bind(new InetSocketAddress(addr, 0)); + final int port = ssc.socket().getLocalPort(); + + try (SocketChannel sc = openSocketChannel(family)) { + sc.configureBlocking(false); + sc.connect(new InetSocketAddress(addr, port)); + + Selector aselector = openSelector(); + ssc.configureBlocking(false).register(aselector, OP_ACCEPT); + assertEquals(aselector.select(), 1); + + try (SocketChannel sc2 = ssc.accept()) { + assertTrue(sc.finishConnect()); + TestThread t = TestThread.of("t9a", () -> { + try (Selector selector = openSelector()) { + ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x9A).flip(); + sc.configureBlocking(false); + SelectionKey k = sc.register(selector, OP_WRITE); + selector.select(); + int c; + while ((c = sc.write(bb)) < 1) ; + assertEquals(c, 1); + out.printf("wrote: 0x%x%n", bb.get(0)); + k.interestOps(OP_READ); + selector.select(); + bb.clear(); + while ((c = sc.read(bb)) == 0) ; + assertEquals(c, -1); + } + }); + t.start(); + + ByteBuffer bb = ByteBuffer.allocate(10); + sc2.configureBlocking(false); + try (Selector selector = openSelector()) { + sc2.register(selector, OP_READ); + selector.select(); + int c; + while ((c = sc2.read(bb)) == 0) ; + assertEquals(c, 1); + out.printf("read: 0x%x%n", bb.get(0)); + assertEquals(bb.get(0), (byte)0x9A); + sc2.shutdownOutput(); + } + t.awaitCompletion(); + } + } + } + } + + @Test + public void SELNBAccep_NBConn_NBIO_RW_10a() throws Throwable { + try (ServerSocketChannel ssc = openServerSocketChannel(family)) { + ssc.bind(new InetSocketAddress(addr, 0)); + final int port = ssc.socket().getLocalPort(); + + try (SocketChannel sc = openSocketChannel(family)) { + sc.configureBlocking(false); + sc.connect(new InetSocketAddress(addr, port)); + + Selector aselector = openSelector(); + ssc.configureBlocking(false).register(aselector, OP_ACCEPT); + assertEquals(aselector.select(), 1); + + try (SocketChannel sc2 = ssc.accept()) { + assertTrue(sc.finishConnect()); + TestThread t = TestThread.of("t10a", () -> { + try (Selector selector = openSelector()) { + ByteBuffer bb = ByteBuffer.allocate(10); + sc.register(selector, OP_READ); + selector.select(); + int c; + while ((c = sc.read(bb)) == 0); + assertEquals(c, 1); + out.printf("read: 0x%x%n", bb.get(0)); + assertEquals(bb.get(0), (byte)0xAA); + sc.shutdownOutput(); + } + }); + t.start(); + + ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0xAA).flip(); + sc2.configureBlocking(false); + try (Selector selector = openSelector()) { + SelectionKey k = sc2.register(selector, OP_WRITE); + selector.select(); + int c; + while ((c = sc2.write(bb)) < 1) ; + assertEquals(c, 1); + out.printf("wrote: 0x%x%n", bb.get(0)); + k.interestOps(OP_READ); + selector.select(); + bb.clear(); + while ((c = sc2.read(bb)) == 0) ; + assertEquals(c, -1); + } + t.awaitCompletion(); + } + } + } + } + + @Test + public void SPINBAccep_NBConn_NBIO_WR_11a() throws Throwable { + try (ServerSocketChannel ssc = openServerSocketChannel(family)) { + ssc.bind(new InetSocketAddress(addr, 0)); + final int port = ssc.socket().getLocalPort(); + + try (SocketChannel sc = openSocketChannel(family)) { + sc.configureBlocking(false); + sc.connect(new InetSocketAddress(addr, port)); + + SocketChannel accepted; + for (;;) { + accepted = ssc.accept(); + if (accepted != null) { + out.println("accepted new connection"); + break; + } + Thread.onSpinWait(); + } + + try (SocketChannel sc2 = accepted) { + assertTrue(sc.finishConnect()); + TestThread t = TestThread.of("t11a", () -> { + try (Selector selector = openSelector()) { + ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0xBA).flip(); + sc.configureBlocking(false); + SelectionKey k = sc.register(selector, OP_WRITE); + selector.select(); + int c; + while ((c = sc.write(bb)) < 1) ; + assertEquals(c, 1); + out.printf("wrote: 0x%x%n", bb.get(0)); + k.interestOps(OP_READ); + selector.select(); + bb.clear(); + while ((c = sc.read(bb)) == 0) ; + assertEquals(c, -1); + } + }); + t.start(); + + ByteBuffer bb = ByteBuffer.allocate(10); + sc2.configureBlocking(false); + try (Selector selector = openSelector()) { + sc2.register(selector, OP_READ); + selector.select(); + int c; + while ((c = sc2.read(bb)) == 0) ; + assertEquals(c, 1); + out.printf("read: 0x%x%n", bb.get(0)); + assertEquals(bb.get(0), (byte)0xBA); + sc2.shutdownOutput(); + } + t.awaitCompletion(); + } + } + } + } + + @Test + public void SPINBAccep_NBConn_NBIO_RW_12a() throws Throwable { + try (ServerSocketChannel ssc = openServerSocketChannel(family)) { + ssc.bind(new InetSocketAddress(addr, 0)); + final int port = ssc.socket().getLocalPort(); + + try (SocketChannel sc = openSocketChannel(family)) { + sc.configureBlocking(false); + sc.connect(new InetSocketAddress(addr, port)); + + SocketChannel accepted; + for (;;) { + accepted = ssc.accept(); + if (accepted != null) { + out.println("accepted new connection"); + break; + } + Thread.onSpinWait(); + } + + try (SocketChannel sc2 = accepted) { + assertTrue(sc.finishConnect()); + TestThread t = TestThread.of("t10a", () -> { + try (Selector selector = openSelector()) { + ByteBuffer bb = ByteBuffer.allocate(10); + sc.register(selector, OP_READ); + selector.select(); + int c; + while ((c = sc.read(bb)) == 0); + assertEquals(c, 1); + out.printf("read: 0x%x%n", bb.get(0)); + assertEquals(bb.get(0), (byte)0xCA); + sc.shutdownOutput(); + } + }); + t.start(); + + ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0xCA).flip(); + sc2.configureBlocking(false); + try (Selector selector = openSelector()) { + SelectionKey k = sc2.register(selector, OP_WRITE); + selector.select(); + int c; + while ((c = sc2.write(bb)) < 1) ; + assertEquals(c, 1); + out.printf("wrote: 0x%x%n", bb.get(0)); + k.interestOps(OP_READ); + selector.select(); + bb.clear(); + while ((c = sc2.read(bb)) == 0) ; + assertEquals(c, -1); + } + t.awaitCompletion(); + } + } + } + } + + // -- + + static class TestThread extends Thread { + private final UncheckedRunnable runnable; + private volatile Throwable throwable; + + TestThread(UncheckedRunnable runnable, String name) { + super(name); + this.runnable = runnable; + } + + @Override + public void run() { + try { + runnable.run(); + } catch (Throwable t) { + out.printf("[%s] caught unexpected: %s%n", getName(), t); + throwable = t; + } + } + + interface UncheckedRunnable { + void run() throws Throwable; + } + + static TestThread of(String name, UncheckedRunnable runnable) { + return new TestThread(runnable, name); + } + + void awaitCompletion() throws Throwable { + this.join(); + if (throwable != null) + throw throwable; + } + } +}