1 /* 2 * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 /* 25 * @test 26 * @bug 8195160 27 * @summary Tests various combinations of blocking/nonblocking connections 28 * @requires (os.family == "linux") 29 * @library ../ /test/lib 30 * @build RsocketTest 31 * @run testng/othervm IOExchanges 32 * @author chegar 33 */ 34 35 import java.io.IOException; 36 import java.net.Inet6Address; 37 import java.net.InetAddress; 38 import java.net.InetSocketAddress; 39 import java.net.ProtocolFamily; 40 import java.nio.channels.SelectionKey; 41 import java.nio.channels.Selector; 42 import java.nio.channels.ServerSocketChannel; 43 import java.nio.channels.SocketChannel; 44 import java.nio.ByteBuffer; 45 import jdk.net.RdmaSockets; 46 import org.testng.SkipException; 47 import org.testng.annotations.BeforeTest; 48 import org.testng.annotations.Test; 49 import static java.lang.System.out; 50 import static java.net.StandardProtocolFamily.INET; 51 import static java.net.StandardProtocolFamily.INET6; 52 import static java.nio.channels.SelectionKey.OP_ACCEPT; 53 import static java.nio.channels.SelectionKey.OP_READ; 54 import static java.nio.channels.SelectionKey.OP_WRITE; 55 import static org.testng.Assert.assertEquals; 56 import static org.testng.Assert.assertTrue; 57 58 public class IOExchanges { 59 60 // Whether, or not, to use RDMA channels or regular socket channels. 61 // For test assertion purposes during development. 62 static final boolean useRDMA = true; 63 64 static InetAddress addr; 65 static ProtocolFamily family; 66 67 @BeforeTest 68 public void setup() throws Exception { 69 if (useRDMA && !RsocketTest.isRsocketAvailable()) 70 throw new SkipException("rsocket is not available"); 71 72 addr = InetAddress.getLocalHost(); 73 family = addr instanceof Inet6Address ? INET6 : INET; 74 out.printf("local address: %s%n", addr); 75 out.printf("useRDMA: %b%n", useRDMA); 76 } 77 78 static SocketChannel openSocketChannel(ProtocolFamily family) 79 throws IOException 80 { 81 return useRDMA ? RdmaSockets.openSocketChannel(family) 82 : SocketChannel.open(); 83 } 84 static ServerSocketChannel openServerSocketChannel(ProtocolFamily family) 85 throws IOException 86 { 87 return useRDMA ? RdmaSockets.openServerSocketChannel(family) 88 : ServerSocketChannel.open(); 89 } 90 static Selector openSelector( ) throws IOException { 91 return useRDMA ? RdmaSockets.openSelector() : Selector.open(); 92 } 93 94 /* 95 The following, non-exhaustive set, of tests exercise different combinations 96 of blocking and non-blocking accept/connect calls along with I/O 97 operations, that exchange a single byte. The intent it to test a reasonable 98 set of blocking and non-blocking scenarios. 99 100 The individual test method names follow their test scenario. 101 [BAccep|SELNBAccep|SPINNBAccep] - Accept either: 102 blocking, select-non-blocking, spinning-non-blocking 103 [BConn|NBConn] - blocking connect / non-blocking connect 104 [BIO|NBIO] - blocking / non-blocking I/O operations (read/write) 105 [WR|RW] - connecting thread write/accepting thread reads first, and vice-versa 106 [Id] - unique test Id 107 108 BAccep_BConn_BIO_WR_1 109 BAccep_BConn_BIO_RW_2 110 SELNBAccep_BConn_BIO_WR_3 111 SELNBAccep_BConn_BIO_RW_4 112 SPINNBAccep_BConn_BIO_WR_5 113 SPINNBAccep_BConn_BIO_RW_6 114 BAccep_NBConn_BIO_WR_7 115 BAccep_NBConn_BIO_RW_8 116 SELNBAccep_NBConn_BIO_WR_9 117 SELNBAccep_NBConn_BIO_RW_10 118 SPINNBAccep_NBConn_BIO_WR_11 119 SPINNBAccep_NBConn_BIO_RW_12 120 121 BAccep_BConn_NBIO_WR_1a << Non-Blocking I/O 122 BAccep_BConn_NBIO_RW_2a 123 SELNBAccep_BConn_NBIO_WR_3a 124 SELNBAccep_BConn_NBIO_RW_4a 125 SPINNBAccep_BConn_NBIO_WR_5a 126 SPINNBAccep_BConn_NBIO_RW_6a 127 BAccep_NBConn_NBIO_WR_7a 128 BAccep_NBConn_NBIO_RW_8a 129 SELNBAccep_NBConn_NBIO_WR_9a 130 SELNBAccep_NBConn_NBIO_RW_10a 131 SPINBAccep_NBConn_NBIO_WR_11a 132 SPINBAccep_NBConn_NBIO_RW_12a 133 */ 134 135 136 @Test 137 public void BAccep_BConn_BIO_WR_1() throws Throwable { 138 try (ServerSocketChannel ssc = openServerSocketChannel(family)) { 139 ssc.bind(new InetSocketAddress(addr, 0)); 140 final int port = ssc.socket().getLocalPort(); 141 142 TestThread t = TestThread.of("t1", () -> { 143 try (SocketChannel sc = openSocketChannel(family)) { 144 assertTrue(sc.connect(new InetSocketAddress(addr, port))); 145 ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x01).flip(); 146 assertEquals(sc.write(bb), 1); 147 out.printf("wrote: 0x%x%n", bb.get(0)); 148 assertEquals(sc.read(bb.clear()), -1); 149 } 150 }); 151 t.start(); 152 153 try (SocketChannel sc = ssc.accept()) { 154 ByteBuffer bb = ByteBuffer.allocate(10); 155 assertEquals(sc.read(bb), 1); 156 out.printf("read: 0x%x%n", bb.get(0)); 157 assertEquals(bb.get(0), 0x01); 158 } 159 t.awaitCompletion(); 160 } 161 } 162 163 @Test 164 public void BAccep_BConn_BIO_RW_2() throws Throwable { 165 try (ServerSocketChannel ssc = openServerSocketChannel(family)) { 166 ssc.bind(new InetSocketAddress(addr, 0)); 167 final int port = ssc.socket().getLocalPort(); 168 169 TestThread t = TestThread.of("t2", () -> { 170 try (SocketChannel sc = openSocketChannel(family)) { 171 assertTrue(sc.connect(new InetSocketAddress(addr, port))); 172 ByteBuffer bb = ByteBuffer.allocate(10); 173 assertEquals(sc.read(bb), 1); 174 out.printf("read: 0x%x%n", bb.get(0)); 175 assertEquals(bb.get(0), 0x02); 176 } 177 }); 178 t.start(); 179 180 try (SocketChannel sc = ssc.accept()) { 181 ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x02).flip(); 182 assertEquals(sc.write(bb), 1); 183 out.printf("wrote: 0x%x%n", bb.get(0)); 184 assertEquals(sc.read(bb.clear()), -1); 185 } 186 t.awaitCompletion(); 187 } 188 } 189 190 @Test 191 public void SELNBAccep_BConn_BIO_WR_3() throws Throwable { 192 try (ServerSocketChannel ssc = openServerSocketChannel(family); 193 Selector selector = openSelector()) { 194 ssc.bind(new InetSocketAddress(addr, 0)); 195 final int port = ssc.socket().getLocalPort(); 196 197 TestThread t = TestThread.of("t3", () -> { 198 try (SocketChannel sc = openSocketChannel(family)) { 199 assertTrue(sc.connect(new InetSocketAddress(addr, port))); 200 ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x03).flip(); 201 assertEquals(sc.write(bb), 1); 202 out.printf("wrote: 0x%x%n", bb.get(0)); 203 assertEquals(sc.read(bb.clear()), -1); 204 } 205 }); 206 t.start(); 207 208 ssc.configureBlocking(false).register(selector, OP_ACCEPT); 209 assertEquals(selector.select(), 1); 210 211 try (SocketChannel sc = ssc.accept()) { 212 ByteBuffer bb = ByteBuffer.allocate(10); 213 assertEquals(sc.read(bb), 1); 214 out.printf("read: 0x%x%n", bb.get(0)); 215 assertEquals(bb.get(0), 0x03); 216 } 217 t.awaitCompletion(); 218 } 219 } 220 221 @Test 222 public void SELNBAccep_BConn_BIO_RW_4() throws Throwable { 223 try (ServerSocketChannel ssc = openServerSocketChannel(family); 224 Selector selector = openSelector()) { 225 ssc.bind(new InetSocketAddress(addr, 0)); 226 final int port = ssc.socket().getLocalPort(); 227 228 TestThread t = TestThread.of("t4", () -> { 229 try (SocketChannel sc = openSocketChannel(family)) { 230 assertTrue(sc.connect(new InetSocketAddress(addr, port))); 231 ByteBuffer bb = ByteBuffer.allocate(10); 232 assertEquals(sc.read(bb), 1); 233 out.printf("read: 0x%x%n", bb.get(0)); 234 assertEquals(bb.get(0), 0x04); 235 } 236 }); 237 t.start(); 238 239 ssc.configureBlocking(false).register(selector, OP_ACCEPT); 240 assertEquals(selector.select(), 1); 241 242 try (SocketChannel sc = ssc.accept()) { 243 ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x04).flip(); 244 assertEquals(sc.write(bb), 1); 245 out.printf("wrote: 0x%x%n", bb.get(0)); 246 assertEquals(sc.read(bb.clear()), -1); 247 248 } 249 t.awaitCompletion(); 250 } 251 } 252 253 @Test 254 public void SPINNBAccep_BConn_BIO_WR_5() throws Throwable { 255 try (ServerSocketChannel ssc = openServerSocketChannel(family)) { 256 ssc.bind(new InetSocketAddress(addr, 0)); 257 final int port = ssc.socket().getLocalPort(); 258 259 TestThread t = TestThread.of("t5", () -> { 260 try (SocketChannel sc = openSocketChannel(family)) { 261 assertTrue(sc.connect(new InetSocketAddress(addr, port))); 262 ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x05).flip(); 263 assertEquals(sc.write(bb), 1); 264 out.printf("wrote: 0x%x%n", bb.get(0)); 265 assertEquals(sc.read(bb.clear()), -1); 266 } 267 }); 268 t.start(); 269 270 SocketChannel accepted; 271 for (;;) { 272 accepted = ssc.accept(); 273 if (accepted != null) { 274 out.println("accepted new connection"); 275 break; 276 } 277 Thread.onSpinWait(); 278 } 279 280 try (SocketChannel sc = accepted) { 281 ByteBuffer bb = ByteBuffer.allocate(10); 282 assertEquals(sc.read(bb), 1); 283 out.printf("read: 0x%x%n", bb.get(0)); 284 assertEquals(bb.get(0), 0x05); 285 } 286 t.awaitCompletion(); 287 } 288 } 289 290 @Test 291 public void SPINNBAccep_BConn_BIO_RW_6() throws Throwable { 292 try (ServerSocketChannel ssc = openServerSocketChannel(family)) { 293 ssc.bind(new InetSocketAddress(addr, 0)); 294 final int port = ssc.socket().getLocalPort(); 295 296 TestThread t = TestThread.of("t6", () -> { 297 try (SocketChannel sc = openSocketChannel(family)) { 298 assertTrue(sc.connect(new InetSocketAddress(addr, port))); 299 ByteBuffer bb = ByteBuffer.allocate(10); 300 assertEquals(sc.read(bb), 1); 301 out.printf("read: 0x%x%n", bb.get(0)); 302 assertEquals(bb.get(0), 0x06); 303 } 304 }); 305 t.start(); 306 307 SocketChannel accepted; 308 for (;;) { 309 accepted = ssc.accept(); 310 if (accepted != null) { 311 out.println("accepted new connection"); 312 break; 313 } 314 Thread.onSpinWait(); 315 } 316 317 try (SocketChannel sc = accepted) { 318 ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x06).flip(); 319 assertEquals(sc.write(bb), 1); 320 out.printf("wrote: 0x%x%n", bb.get(0)); 321 assertEquals(sc.read(bb.clear()), -1); 322 323 } 324 t.awaitCompletion(); 325 } 326 } 327 328 // Similar to the previous six scenarios, but with same-thread 329 // non-blocking connect. 330 331 @Test 332 public void BAccep_NBConn_BIO_WR_7() throws Throwable { 333 try (ServerSocketChannel ssc = openServerSocketChannel(family)) { 334 ssc.bind(new InetSocketAddress(addr, 0)); 335 final int port = ssc.socket().getLocalPort(); 336 337 try (SocketChannel sc = openSocketChannel(family)) { 338 sc.configureBlocking(false); 339 sc.connect(new InetSocketAddress(addr, port)); 340 341 try (SocketChannel sc2 = ssc.accept()) { 342 assertTrue(sc.finishConnect()); 343 sc.configureBlocking(true); 344 TestThread t = TestThread.of("t7", () -> { 345 ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x07).flip(); 346 assertEquals(sc.write(bb), 1); 347 out.printf("wrote: 0x%x%n", bb.get(0)); 348 assertEquals(sc.read(bb.clear()), -1); 349 }); 350 t.start(); 351 352 ByteBuffer bb = ByteBuffer.allocate(10); 353 assertEquals(sc2.read(bb), 1); 354 out.printf("read: 0x%x%n", bb.get(0)); 355 assertEquals(bb.get(0), 0x07); 356 sc2.shutdownOutput(); 357 t.awaitCompletion(); 358 } 359 } 360 } 361 } 362 363 @Test 364 public void BAccep_NBConn_BIO_RW_8() throws Throwable { 365 try (ServerSocketChannel ssc = openServerSocketChannel(family)) { 366 ssc.bind(new InetSocketAddress(addr, 0)); 367 final int port = ssc.socket().getLocalPort(); 368 369 try (SocketChannel sc = openSocketChannel(family)) { 370 sc.configureBlocking(false); 371 sc.connect(new InetSocketAddress(addr, port)); 372 373 try (SocketChannel sc2 = ssc.accept()) { 374 assertTrue(sc.finishConnect()); 375 sc.configureBlocking(true); 376 TestThread t = TestThread.of("t8", () -> { 377 ByteBuffer bb = ByteBuffer.allocate(10); 378 assertEquals(sc.read(bb), 1); 379 out.printf("read: 0x%x%n", bb.get(0)); 380 assertEquals(bb.get(0), 0x08); 381 sc.shutdownOutput(); 382 }); 383 t.start(); 384 385 ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x08).flip(); 386 assertEquals(sc2.write(bb), 1); 387 out.printf("wrote: 0x%x%n", bb.get(0)); 388 assertEquals(sc2.read(bb.clear()), -1); 389 t.awaitCompletion(); 390 } 391 } 392 } 393 } 394 395 @Test 396 public void SELNBAccep_NBConn_BIO_WR_9() throws Throwable { 397 try (ServerSocketChannel ssc = openServerSocketChannel(family)) { 398 ssc.bind(new InetSocketAddress(addr, 0)); 399 final int port = ssc.socket().getLocalPort(); 400 401 try (SocketChannel sc = openSocketChannel(family); 402 Selector selector = openSelector()) { 403 sc.configureBlocking(false); 404 sc.connect(new InetSocketAddress(addr, port)); 405 406 ssc.configureBlocking(false).register(selector, OP_ACCEPT); 407 assertEquals(selector.select(), 1); 408 409 try (SocketChannel sc2 = ssc.accept()) { 410 assertTrue(sc.finishConnect()); 411 sc.configureBlocking(true); 412 TestThread t = TestThread.of("t9", () -> { 413 ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x09).flip(); 414 assertEquals(sc.write(bb), 1); 415 out.printf("wrote: 0x%x%n", bb.get(0)); 416 assertEquals(sc.read(bb.clear()), -1); 417 }); 418 t.start(); 419 420 ByteBuffer bb = ByteBuffer.allocate(10); 421 assertEquals(sc2.read(bb), 1); 422 out.printf("read: 0x%x%n", bb.get(0)); 423 assertEquals(bb.get(0), 0x09); 424 sc2.shutdownOutput(); 425 t.awaitCompletion(); 426 } 427 } 428 } 429 } 430 431 @Test 432 public void SELNBAccep_NBConn_BIO_RW_10() throws Throwable { 433 try (ServerSocketChannel ssc = openServerSocketChannel(family)) { 434 ssc.bind(new InetSocketAddress(addr, 0)); 435 final int port = ssc.socket().getLocalPort(); 436 437 try (SocketChannel sc = openSocketChannel(family); 438 Selector selector = openSelector()) { 439 sc.configureBlocking(false); 440 sc.connect(new InetSocketAddress(addr, port)); 441 442 ssc.configureBlocking(false).register(selector, OP_ACCEPT); 443 assertEquals(selector.select(), 1); 444 445 try (SocketChannel sc2 = ssc.accept()) { 446 assertTrue(sc.finishConnect()); 447 sc.configureBlocking(true); 448 TestThread t = TestThread.of("t10", () -> { 449 ByteBuffer bb = ByteBuffer.allocate(10); 450 assertEquals(sc.read(bb), 1); 451 out.printf("read: 0x%x%n", bb.get(0)); 452 assertEquals(bb.get(0), 0x10); 453 sc.shutdownOutput(); 454 }); 455 t.start(); 456 457 ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x10).flip(); 458 assertEquals(sc2.write(bb), 1); 459 out.printf("wrote: 0x%x%n", bb.get(0)); 460 assertEquals(sc2.read(bb.clear()), -1); 461 t.awaitCompletion(); 462 } 463 } 464 } 465 } 466 467 @Test 468 public void SPINNBAccep_NBConn_BIO_WR_11() throws Throwable { 469 try (ServerSocketChannel ssc = openServerSocketChannel(family)) { 470 ssc.bind(new InetSocketAddress(addr, 0)); 471 final int port = ssc.socket().getLocalPort(); 472 473 try (SocketChannel sc = openSocketChannel(family)) { 474 sc.configureBlocking(false); 475 sc.connect(new InetSocketAddress(addr, port)); 476 477 SocketChannel accepted; 478 for (;;) { 479 accepted = ssc.accept(); 480 if (accepted != null) { 481 out.println("accepted new connection"); 482 break; 483 } 484 Thread.onSpinWait(); 485 } 486 487 try (SocketChannel sc2 = accepted) { 488 assertTrue(sc.finishConnect()); 489 sc.configureBlocking(true); 490 TestThread t = TestThread.of("t11", () -> { 491 ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x11).flip(); 492 assertEquals(sc.write(bb), 1); 493 out.printf("wrote: 0x%x%n", bb.get(0)); 494 assertEquals(sc.read(bb.clear()), -1); 495 }); 496 t.start(); 497 498 ByteBuffer bb = ByteBuffer.allocate(10); 499 assertEquals(sc2.read(bb), 1); 500 out.printf("read: 0x%x%n", bb.get(0)); 501 assertEquals(bb.get(0), 0x11); 502 sc2.shutdownOutput(); 503 t.awaitCompletion(); 504 } 505 } 506 } 507 } 508 509 @Test 510 public void SPINNBAccep_NBConn_BIO_RW_12() throws Throwable { 511 try (ServerSocketChannel ssc = openServerSocketChannel(family)) { 512 ssc.bind(new InetSocketAddress(addr, 0)); 513 final int port = ssc.socket().getLocalPort(); 514 515 try (SocketChannel sc = openSocketChannel(family)) { 516 sc.configureBlocking(false); 517 sc.connect(new InetSocketAddress(addr, port)); 518 519 SocketChannel accepted; 520 for (;;) { 521 accepted = ssc.accept(); 522 if (accepted != null) { 523 out.println("accepted new connection"); 524 break; 525 } 526 Thread.onSpinWait(); 527 } 528 529 try (SocketChannel sc2 = accepted) { 530 assertTrue(sc.finishConnect()); 531 sc.configureBlocking(true); 532 TestThread t = TestThread.of("t12", () -> { 533 ByteBuffer bb = ByteBuffer.allocate(10); 534 assertEquals(sc.read(bb), 1); 535 out.printf("read: 0x%x%n", bb.get(0)); 536 assertEquals(bb.get(0), 0x12); 537 sc.shutdownOutput(); 538 }); 539 t.start(); 540 541 ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x12).flip(); 542 assertEquals(sc2.write(bb), 1); 543 out.printf("wrote: 0x%x%n", bb.get(0)); 544 assertEquals(sc2.read(bb.clear()), -1); 545 t.awaitCompletion(); 546 } 547 } 548 } 549 } 550 551 // --- 552 // Similar to the previous twelve scenarios but with non-blocking IO 553 // --- 554 555 @Test 556 public void BAccep_BConn_NBIO_WR_1a() throws Throwable { 557 try (ServerSocketChannel ssc = openServerSocketChannel(family)) { 558 ssc.bind(new InetSocketAddress(addr, 0)); 559 final int port = ssc.socket().getLocalPort(); 560 561 TestThread t = TestThread.of("t1a", () -> { 562 try (SocketChannel sc = openSocketChannel(family); 563 Selector selector = openSelector()) { 564 assertTrue(sc.connect(new InetSocketAddress(addr, port))); 565 ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x1A).flip(); 566 sc.configureBlocking(false); 567 SelectionKey k = sc.register(selector, OP_WRITE); 568 selector.select(); 569 int c; 570 while ((c = sc.write(bb)) < 1); 571 assertEquals(c, 1); 572 out.printf("wrote: 0x%x%n", bb.get(0)); 573 k.interestOps(OP_READ); 574 selector.select(); 575 bb.clear(); 576 while ((c = sc.read(bb)) == 0); 577 assertEquals(c, -1); 578 } 579 }); 580 t.start(); 581 582 try (SocketChannel sc = ssc.accept(); 583 Selector selector = openSelector()) { 584 ByteBuffer bb = ByteBuffer.allocate(10); 585 sc.configureBlocking(false); 586 sc.register(selector, OP_READ); 587 selector.select(); 588 int c; 589 while ((c = sc.read(bb)) == 0) ; 590 assertEquals(c, 1); 591 out.printf("read: 0x%x%n", bb.get(0)); 592 assertEquals(bb.get(0), 0x1A); 593 } 594 t.awaitCompletion(); 595 } 596 } 597 598 @Test 599 public void BAccep_BConn_NBIO_RW_2a() throws Throwable { 600 try (ServerSocketChannel ssc = openServerSocketChannel(family)) { 601 ssc.bind(new InetSocketAddress(addr, 0)); 602 final int port = ssc.socket().getLocalPort(); 603 604 TestThread t = TestThread.of("t2a", () -> { 605 try (SocketChannel sc = openSocketChannel(family); 606 Selector selector = openSelector()) { 607 assertTrue(sc.connect(new InetSocketAddress(addr, port))); 608 ByteBuffer bb = ByteBuffer.allocate(10); 609 sc.configureBlocking(false); 610 sc.register(selector, OP_READ); 611 selector.select(); 612 int c; 613 while ((c = sc.read(bb)) == 0); 614 assertEquals(c, 1); 615 out.printf("read: 0x%x%n", bb.get(0)); 616 assertEquals(bb.get(0), 0x2A); 617 } 618 }); 619 t.start(); 620 621 try (SocketChannel sc = ssc.accept(); 622 Selector selector = openSelector()) { 623 ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x2A).flip(); 624 sc.configureBlocking(false); 625 SelectionKey k = sc.register(selector, OP_WRITE); 626 selector.select(); 627 int c; 628 while ((c = sc.write(bb)) < 1); 629 assertEquals(c, 1); 630 out.printf("wrote: 0x%x%n", bb.get(0)); 631 k.interestOps(OP_READ); 632 selector.select(); 633 bb.clear(); 634 while ((c = sc.read(bb)) == 0); 635 assertEquals(c, -1); 636 } 637 t.awaitCompletion(); 638 } 639 } 640 641 @Test 642 public void SELNBAccep_BConn_NBIO_WR_3a() throws Throwable { 643 try (ServerSocketChannel ssc = openServerSocketChannel(family); 644 Selector aselector = openSelector()) { 645 ssc.bind(new InetSocketAddress(addr, 0)); 646 final int port = ssc.socket().getLocalPort(); 647 648 TestThread t = TestThread.of("t3a", () -> { 649 try (SocketChannel sc = openSocketChannel(family); 650 Selector selector = openSelector()) { 651 assertTrue(sc.connect(new InetSocketAddress(addr, port))); 652 ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x3A).flip(); 653 sc.configureBlocking(false); 654 SelectionKey k = sc.register(selector, OP_WRITE); 655 selector.select(); 656 int c; 657 while ((c = sc.write(bb)) < 1); 658 assertEquals(c, 1); 659 out.printf("wrote: 0x%x%n", bb.get(0)); 660 k.interestOps(OP_READ); 661 selector.select(); 662 bb.clear(); 663 while ((c = sc.read(bb)) == 0); 664 assertEquals(c, -1); 665 } 666 }); 667 t.start(); 668 669 ssc.configureBlocking(false).register(aselector, OP_ACCEPT); 670 assertEquals(aselector.select(), 1); 671 672 try (SocketChannel sc = ssc.accept(); 673 Selector selector = openSelector()) { 674 ByteBuffer bb = ByteBuffer.allocate(10); 675 sc.configureBlocking(false); 676 sc.register(selector, OP_READ); 677 selector.select(); 678 int c; 679 while ((c = sc.read(bb)) == 0) ; 680 assertEquals(c, 1); 681 out.printf("read: 0x%x%n", bb.get(0)); 682 assertEquals(bb.get(0), 0x3A); 683 } 684 t.awaitCompletion(); 685 } 686 } 687 688 @Test 689 public void SELNBAccep_BConn_NBIO_RW_4a() throws Throwable { 690 try (ServerSocketChannel ssc = openServerSocketChannel(family); 691 Selector aselector = openSelector()) { 692 ssc.bind(new InetSocketAddress(addr, 0)); 693 final int port = ssc.socket().getLocalPort(); 694 695 TestThread t = TestThread.of("t4a", () -> { 696 try (SocketChannel sc = openSocketChannel(family); 697 Selector selector = openSelector()) { 698 assertTrue(sc.connect(new InetSocketAddress(addr, port))); 699 ByteBuffer bb = ByteBuffer.allocate(10); 700 sc.configureBlocking(false); 701 sc.register(selector, OP_READ); 702 selector.select(); 703 int c; 704 while ((c = sc.read(bb)) == 0); 705 assertEquals(c, 1); 706 out.printf("read: 0x%x%n", bb.get(0)); 707 assertEquals(bb.get(0), 0x4A); 708 } 709 }); 710 t.start(); 711 712 ssc.configureBlocking(false).register(aselector, OP_ACCEPT); 713 assertEquals(aselector.select(), 1); 714 715 try (SocketChannel sc = ssc.accept(); 716 Selector selector = openSelector()) { 717 ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x4A).flip(); 718 sc.configureBlocking(false); 719 SelectionKey k = sc.register(selector, OP_WRITE); 720 selector.select(); 721 int c; 722 while ((c = sc.write(bb)) < 1); 723 assertEquals(c, 1); 724 out.printf("wrote: 0x%x%n", bb.get(0)); 725 k.interestOps(OP_READ); 726 selector.select(); 727 bb.clear(); 728 while ((c = sc.read(bb)) == 0); 729 assertEquals(c, -1); 730 } 731 t.awaitCompletion(); 732 } 733 } 734 735 @Test 736 public void SPINNBAccep_BConn_NBIO_WR_5a() throws Throwable { 737 try (ServerSocketChannel ssc = openServerSocketChannel(family)) { 738 ssc.bind(new InetSocketAddress(addr, 0)); 739 final int port = ssc.socket().getLocalPort(); 740 741 TestThread t = TestThread.of("t5a", () -> { 742 try (SocketChannel sc = openSocketChannel(family); 743 Selector selector = openSelector()) { 744 assertTrue(sc.connect(new InetSocketAddress(addr, port))); 745 ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x5A).flip(); 746 sc.configureBlocking(false); 747 SelectionKey k = sc.register(selector, OP_WRITE); 748 selector.select(); 749 int c; 750 while ((c = sc.write(bb)) < 1); 751 assertEquals(c, 1); 752 out.printf("wrote: 0x%x%n", bb.get(0)); 753 k.interestOps(OP_READ); 754 selector.select(); 755 bb.clear(); 756 while ((c = sc.read(bb)) == 0); 757 assertEquals(c, -1); 758 } 759 }); 760 t.start(); 761 762 SocketChannel accepted; 763 for (;;) { 764 accepted = ssc.accept(); 765 if (accepted != null) { 766 out.println("accepted new connection"); 767 break; 768 } 769 Thread.onSpinWait(); 770 } 771 772 try (SocketChannel sc = accepted; 773 Selector selector = openSelector()) { 774 ByteBuffer bb = ByteBuffer.allocate(10); 775 sc.configureBlocking(false); 776 sc.register(selector, OP_READ); 777 selector.select(); 778 int c; 779 while ((c = sc.read(bb)) == 0) ; 780 assertEquals(c, 1); 781 out.printf("read: 0x%x%n", bb.get(0)); 782 assertEquals(bb.get(0), 0x5A); 783 } 784 t.awaitCompletion(); 785 } 786 } 787 788 @Test 789 public void SPINNBAccep_BConn_NBIO_RW_6a() throws Throwable { 790 try (ServerSocketChannel ssc = openServerSocketChannel(family)) { 791 ssc.bind(new InetSocketAddress(addr, 0)); 792 final int port = ssc.socket().getLocalPort(); 793 794 TestThread t = TestThread.of("t6a", () -> { 795 try (SocketChannel sc = openSocketChannel(family); 796 Selector selector = openSelector()) { 797 assertTrue(sc.connect(new InetSocketAddress(addr, port))); 798 ByteBuffer bb = ByteBuffer.allocate(10); 799 sc.configureBlocking(false); 800 sc.register(selector, OP_READ); 801 selector.select(); 802 int c; 803 while ((c = sc.read(bb)) == 0); 804 assertEquals(c, 1); 805 out.printf("read: 0x%x%n", bb.get(0)); 806 assertEquals(bb.get(0), 0x6A); 807 } 808 }); 809 t.start(); 810 811 SocketChannel accepted; 812 for (;;) { 813 accepted = ssc.accept(); 814 if (accepted != null) { 815 out.println("accepted new connection"); 816 break; 817 } 818 Thread.onSpinWait(); 819 } 820 821 try (SocketChannel sc = accepted; 822 Selector selector = openSelector()) { 823 ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x6A).flip(); 824 sc.configureBlocking(false); 825 SelectionKey k = sc.register(selector, OP_WRITE); 826 selector.select(); 827 int c; 828 while ((c = sc.write(bb)) < 1); 829 assertEquals(c, 1); 830 out.printf("wrote: 0x%x%n", bb.get(0)); 831 k.interestOps(OP_READ); 832 selector.select(); 833 bb.clear(); 834 while ((c = sc.read(bb)) == 0); 835 assertEquals(c, -1); 836 837 } 838 t.awaitCompletion(); 839 } 840 } 841 842 // Similar to the previous six scenarios but with same-thread 843 // non-blocking connect. 844 845 @Test 846 public void BAccep_NBConn_NBIO_WR_7a() throws Throwable { 847 try (ServerSocketChannel ssc = openServerSocketChannel(family)) { 848 ssc.bind(new InetSocketAddress(addr, 0)); 849 final int port = ssc.socket().getLocalPort(); 850 851 try (SocketChannel sc = openSocketChannel(family)) { 852 sc.configureBlocking(false); 853 sc.connect(new InetSocketAddress(addr, port)); 854 855 try (SocketChannel sc2 = ssc.accept()) { 856 assertTrue(sc.finishConnect()); 857 TestThread t = TestThread.of("t7a", () -> { 858 try (Selector selector = openSelector()) { 859 ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x7A).flip(); 860 sc.configureBlocking(false); 861 SelectionKey k = sc.register(selector, OP_WRITE); 862 selector.select(); 863 int c; 864 while ((c = sc.write(bb)) < 1) ; 865 assertEquals(c, 1); 866 out.printf("wrote: 0x%x%n", bb.get(0)); 867 k.interestOps(OP_READ); 868 selector.select(); 869 bb.clear(); 870 while ((c = sc.read(bb)) == 0) ; 871 assertEquals(c, -1); 872 } 873 }); 874 t.start(); 875 876 ByteBuffer bb = ByteBuffer.allocate(10); 877 sc2.configureBlocking(false); 878 try (Selector selector = openSelector()) { 879 sc2.register(selector, OP_READ); 880 selector.select(); 881 int c; 882 while ((c = sc2.read(bb)) == 0) ; 883 assertEquals(c, 1); 884 out.printf("read: 0x%x%n", bb.get(0)); 885 assertEquals(bb.get(0), 0x7A); 886 sc2.shutdownOutput(); 887 } 888 t.awaitCompletion(); 889 } 890 } 891 } 892 } 893 894 @Test 895 public void BAccep_NBConn_NBIO_RW_8a() throws Throwable { 896 try (ServerSocketChannel ssc = openServerSocketChannel(family)) { 897 ssc.bind(new InetSocketAddress(addr, 0)); 898 final int port = ssc.socket().getLocalPort(); 899 900 try (SocketChannel sc = openSocketChannel(family)) { 901 sc.configureBlocking(false); 902 sc.connect(new InetSocketAddress(addr, port)); 903 904 try (SocketChannel sc2 = ssc.accept()) { 905 assertTrue(sc.finishConnect()); 906 TestThread t = TestThread.of("t8a", () -> { 907 try (Selector selector = openSelector()) { 908 ByteBuffer bb = ByteBuffer.allocate(10); 909 sc.register(selector, OP_READ); 910 selector.select(); 911 int c; 912 while ((c = sc.read(bb)) == 0); 913 assertEquals(c, 1); 914 out.printf("read: 0x%x%n", bb.get(0)); 915 assertEquals(bb.get(0), (byte)0x8A); 916 sc.shutdownOutput(); 917 } 918 }); 919 t.start(); 920 921 ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x8A).flip(); 922 sc2.configureBlocking(false); 923 try (Selector selector = openSelector()) { 924 SelectionKey k = sc2.register(selector, OP_WRITE); 925 selector.select(); 926 int c; 927 while ((c = sc2.write(bb)) < 1) ; 928 assertEquals(c, 1); 929 out.printf("wrote: 0x%x%n", bb.get(0)); 930 k.interestOps(OP_READ); 931 selector.select(); 932 bb.clear(); 933 while ((c = sc2.read(bb)) == 0) ; 934 assertEquals(c, -1); 935 } 936 t.awaitCompletion(); 937 } 938 } 939 } 940 } 941 942 @Test 943 public void SELNBAccep_NBConn_NBIO_WR_9a() throws Throwable { 944 try (ServerSocketChannel ssc = openServerSocketChannel(family)) { 945 ssc.bind(new InetSocketAddress(addr, 0)); 946 final int port = ssc.socket().getLocalPort(); 947 948 try (SocketChannel sc = openSocketChannel(family)) { 949 sc.configureBlocking(false); 950 sc.connect(new InetSocketAddress(addr, port)); 951 952 Selector aselector = openSelector(); 953 ssc.configureBlocking(false).register(aselector, OP_ACCEPT); 954 assertEquals(aselector.select(), 1); 955 956 try (SocketChannel sc2 = ssc.accept()) { 957 assertTrue(sc.finishConnect()); 958 TestThread t = TestThread.of("t9a", () -> { 959 try (Selector selector = openSelector()) { 960 ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x9A).flip(); 961 sc.configureBlocking(false); 962 SelectionKey k = sc.register(selector, OP_WRITE); 963 selector.select(); 964 int c; 965 while ((c = sc.write(bb)) < 1) ; 966 assertEquals(c, 1); 967 out.printf("wrote: 0x%x%n", bb.get(0)); 968 k.interestOps(OP_READ); 969 selector.select(); 970 bb.clear(); 971 while ((c = sc.read(bb)) == 0) ; 972 assertEquals(c, -1); 973 } 974 }); 975 t.start(); 976 977 ByteBuffer bb = ByteBuffer.allocate(10); 978 sc2.configureBlocking(false); 979 try (Selector selector = openSelector()) { 980 sc2.register(selector, OP_READ); 981 selector.select(); 982 int c; 983 while ((c = sc2.read(bb)) == 0) ; 984 assertEquals(c, 1); 985 out.printf("read: 0x%x%n", bb.get(0)); 986 assertEquals(bb.get(0), (byte)0x9A); 987 sc2.shutdownOutput(); 988 } 989 t.awaitCompletion(); 990 } 991 } 992 } 993 } 994 995 @Test 996 public void SELNBAccep_NBConn_NBIO_RW_10a() throws Throwable { 997 try (ServerSocketChannel ssc = openServerSocketChannel(family)) { 998 ssc.bind(new InetSocketAddress(addr, 0)); 999 final int port = ssc.socket().getLocalPort(); 1000 1001 try (SocketChannel sc = openSocketChannel(family)) { 1002 sc.configureBlocking(false); 1003 sc.connect(new InetSocketAddress(addr, port)); 1004 1005 Selector aselector = openSelector(); 1006 ssc.configureBlocking(false).register(aselector, OP_ACCEPT); 1007 assertEquals(aselector.select(), 1); 1008 1009 try (SocketChannel sc2 = ssc.accept()) { 1010 assertTrue(sc.finishConnect()); 1011 TestThread t = TestThread.of("t10a", () -> { 1012 try (Selector selector = openSelector()) { 1013 ByteBuffer bb = ByteBuffer.allocate(10); 1014 sc.register(selector, OP_READ); 1015 selector.select(); 1016 int c; 1017 while ((c = sc.read(bb)) == 0); 1018 assertEquals(c, 1); 1019 out.printf("read: 0x%x%n", bb.get(0)); 1020 assertEquals(bb.get(0), (byte)0xAA); 1021 sc.shutdownOutput(); 1022 } 1023 }); 1024 t.start(); 1025 1026 ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0xAA).flip(); 1027 sc2.configureBlocking(false); 1028 try (Selector selector = openSelector()) { 1029 SelectionKey k = sc2.register(selector, OP_WRITE); 1030 selector.select(); 1031 int c; 1032 while ((c = sc2.write(bb)) < 1) ; 1033 assertEquals(c, 1); 1034 out.printf("wrote: 0x%x%n", bb.get(0)); 1035 k.interestOps(OP_READ); 1036 selector.select(); 1037 bb.clear(); 1038 while ((c = sc2.read(bb)) == 0) ; 1039 assertEquals(c, -1); 1040 } 1041 t.awaitCompletion(); 1042 } 1043 } 1044 } 1045 } 1046 1047 @Test 1048 public void SPINBAccep_NBConn_NBIO_WR_11a() throws Throwable { 1049 try (ServerSocketChannel ssc = openServerSocketChannel(family)) { 1050 ssc.bind(new InetSocketAddress(addr, 0)); 1051 final int port = ssc.socket().getLocalPort(); 1052 1053 try (SocketChannel sc = openSocketChannel(family)) { 1054 sc.configureBlocking(false); 1055 sc.connect(new InetSocketAddress(addr, port)); 1056 1057 SocketChannel accepted; 1058 for (;;) { 1059 accepted = ssc.accept(); 1060 if (accepted != null) { 1061 out.println("accepted new connection"); 1062 break; 1063 } 1064 Thread.onSpinWait(); 1065 } 1066 1067 try (SocketChannel sc2 = accepted) { 1068 assertTrue(sc.finishConnect()); 1069 TestThread t = TestThread.of("t11a", () -> { 1070 try (Selector selector = openSelector()) { 1071 ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0xBA).flip(); 1072 sc.configureBlocking(false); 1073 SelectionKey k = sc.register(selector, OP_WRITE); 1074 selector.select(); 1075 int c; 1076 while ((c = sc.write(bb)) < 1) ; 1077 assertEquals(c, 1); 1078 out.printf("wrote: 0x%x%n", bb.get(0)); 1079 k.interestOps(OP_READ); 1080 selector.select(); 1081 bb.clear(); 1082 while ((c = sc.read(bb)) == 0) ; 1083 assertEquals(c, -1); 1084 } 1085 }); 1086 t.start(); 1087 1088 ByteBuffer bb = ByteBuffer.allocate(10); 1089 sc2.configureBlocking(false); 1090 try (Selector selector = openSelector()) { 1091 sc2.register(selector, OP_READ); 1092 selector.select(); 1093 int c; 1094 while ((c = sc2.read(bb)) == 0) ; 1095 assertEquals(c, 1); 1096 out.printf("read: 0x%x%n", bb.get(0)); 1097 assertEquals(bb.get(0), (byte)0xBA); 1098 sc2.shutdownOutput(); 1099 } 1100 t.awaitCompletion(); 1101 } 1102 } 1103 } 1104 } 1105 1106 @Test 1107 public void SPINBAccep_NBConn_NBIO_RW_12a() throws Throwable { 1108 try (ServerSocketChannel ssc = openServerSocketChannel(family)) { 1109 ssc.bind(new InetSocketAddress(addr, 0)); 1110 final int port = ssc.socket().getLocalPort(); 1111 1112 try (SocketChannel sc = openSocketChannel(family)) { 1113 sc.configureBlocking(false); 1114 sc.connect(new InetSocketAddress(addr, port)); 1115 1116 SocketChannel accepted; 1117 for (;;) { 1118 accepted = ssc.accept(); 1119 if (accepted != null) { 1120 out.println("accepted new connection"); 1121 break; 1122 } 1123 Thread.onSpinWait(); 1124 } 1125 1126 try (SocketChannel sc2 = accepted) { 1127 assertTrue(sc.finishConnect()); 1128 TestThread t = TestThread.of("t10a", () -> { 1129 try (Selector selector = openSelector()) { 1130 ByteBuffer bb = ByteBuffer.allocate(10); 1131 sc.register(selector, OP_READ); 1132 selector.select(); 1133 int c; 1134 while ((c = sc.read(bb)) == 0); 1135 assertEquals(c, 1); 1136 out.printf("read: 0x%x%n", bb.get(0)); 1137 assertEquals(bb.get(0), (byte)0xCA); 1138 sc.shutdownOutput(); 1139 } 1140 }); 1141 t.start(); 1142 1143 ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0xCA).flip(); 1144 sc2.configureBlocking(false); 1145 try (Selector selector = openSelector()) { 1146 SelectionKey k = sc2.register(selector, OP_WRITE); 1147 selector.select(); 1148 int c; 1149 while ((c = sc2.write(bb)) < 1) ; 1150 assertEquals(c, 1); 1151 out.printf("wrote: 0x%x%n", bb.get(0)); 1152 k.interestOps(OP_READ); 1153 selector.select(); 1154 bb.clear(); 1155 while ((c = sc2.read(bb)) == 0) ; 1156 assertEquals(c, -1); 1157 } 1158 t.awaitCompletion(); 1159 } 1160 } 1161 } 1162 } 1163 1164 // -- 1165 1166 static class TestThread extends Thread { 1167 private final UncheckedRunnable runnable; 1168 private volatile Throwable throwable; 1169 1170 TestThread(UncheckedRunnable runnable, String name) { 1171 super(name); 1172 this.runnable = runnable; 1173 } 1174 1175 @Override 1176 public void run() { 1177 try { 1178 runnable.run(); 1179 } catch (Throwable t) { 1180 out.printf("[%s] caught unexpected: %s%n", getName(), t); 1181 throwable = t; 1182 } 1183 } 1184 1185 interface UncheckedRunnable { 1186 void run() throws Throwable; 1187 } 1188 1189 static TestThread of(String name, UncheckedRunnable runnable) { 1190 return new TestThread(runnable, name); 1191 } 1192 1193 void awaitCompletion() throws Throwable { 1194 this.join(); 1195 if (throwable != null) 1196 throw throwable; 1197 } 1198 } 1199 }