1 /* 2 * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 /* @test 25 * @bug 8199433 26 * @run testng SelectWithConsumer 27 * @summary Unit test for Selector select(Consumer), select(Consumer,long) and 28 * selectNow(Consumer) 29 */ 30 31 import java.io.Closeable; 32 import java.io.IOException; 33 import java.net.InetSocketAddress; 34 import java.nio.ByteBuffer; 35 import java.nio.channels.ClosedSelectorException; 36 import java.nio.channels.Pipe; 37 import java.nio.channels.SelectionKey; 38 import java.nio.channels.Selector; 39 import java.nio.channels.ServerSocketChannel; 40 import java.nio.channels.SocketChannel; 41 import java.nio.channels.WritableByteChannel; 42 import java.util.concurrent.Executors; 43 import java.util.concurrent.ScheduledExecutorService; 44 import java.util.concurrent.TimeUnit; 45 import java.util.concurrent.atomic.AtomicInteger; 46 import static java.util.concurrent.TimeUnit.*; 47 48 import org.testng.annotations.AfterTest; 49 import org.testng.annotations.Test; 50 import static org.testng.Assert.*; 51 52 @Test 53 public class SelectWithConsumer { 54 55 /** 56 * Invoke the select methods that take an action and check that the 57 * accumulated ready ops notified to the action matches the expected ops. 58 */ 59 void testActionInvoked(SelectionKey key, int expectedOps) throws Exception { 60 var callerThread = Thread.currentThread(); 61 var sel = key.selector(); 62 var interestOps = key.interestOps(); 63 var notifiedOps = new AtomicInteger(); 64 65 // select(Consumer) 66 if (expectedOps == 0) 67 sel.wakeup(); // ensure select does not block 68 notifiedOps.set(0); 69 int n = sel.select(k -> { 70 assertTrue(Thread.currentThread() == callerThread); 71 assertTrue(k == key); 72 int readyOps = key.readyOps(); 73 assertTrue((readyOps & interestOps) != 0); 74 assertTrue((readyOps & notifiedOps.get()) == 0); 75 notifiedOps.set(notifiedOps.get() | readyOps); 76 }); 77 assertTrue((n == 1) ^ (expectedOps == 0)); 78 assertTrue(notifiedOps.get() == expectedOps); 79 80 // select(Consumer, timeout) 81 notifiedOps.set(0); 82 n = sel.select(k -> { 83 assertTrue(Thread.currentThread() == callerThread); 84 assertTrue(k == key); 85 int readyOps = key.readyOps(); 86 assertTrue((readyOps & interestOps) != 0); 87 assertTrue((readyOps & notifiedOps.get()) == 0); 88 notifiedOps.set(notifiedOps.get() | readyOps); 89 }, 1000); 90 assertTrue((n == 1) ^ (expectedOps == 0)); 91 assertTrue(notifiedOps.get() == expectedOps); 92 93 // selectNow(Consumer) 94 notifiedOps.set(0); 95 n = sel.selectNow(k -> { 96 assertTrue(Thread.currentThread() == callerThread); 97 assertTrue(k == key); 98 int readyOps = key.readyOps(); 99 assertTrue((readyOps & interestOps) != 0); 100 assertTrue((readyOps & notifiedOps.get()) == 0); 101 notifiedOps.set(notifiedOps.get() | readyOps); 102 }); 103 assertTrue((n == 1) ^ (expectedOps == 0)); 104 assertTrue(notifiedOps.get() == expectedOps); 105 } 106 107 /** 108 * Test that an action is performed when a channel is ready for reading. 109 */ 110 public void testReadable() throws Exception { 111 Pipe p = Pipe.open(); 112 try (Selector sel = Selector.open()) { 113 Pipe.SinkChannel sink = p.sink(); 114 Pipe.SourceChannel source = p.source(); 115 source.configureBlocking(false); 116 SelectionKey key = source.register(sel, SelectionKey.OP_READ); 117 118 // write to sink to ensure source is readable 119 scheduleWrite(sink, messageBuffer(), 100, MILLISECONDS); 120 121 // test that action is invoked 122 testActionInvoked(key, SelectionKey.OP_READ); 123 } finally { 124 closePipe(p); 125 } 126 } 127 128 /** 129 * Test that an action is performed when a channel is ready for writing. 130 */ 131 public void testWritable() throws Exception { 132 Pipe p = Pipe.open(); 133 try (Selector sel = Selector.open()) { 134 Pipe.SourceChannel source = p.source(); 135 Pipe.SinkChannel sink = p.sink(); 136 sink.configureBlocking(false); 137 SelectionKey key = sink.register(sel, SelectionKey.OP_WRITE); 138 139 // test that action is invoked 140 testActionInvoked(key, SelectionKey.OP_WRITE); 141 } finally { 142 closePipe(p); 143 } 144 } 145 146 /** 147 * Test that an action is performed when a channel is ready for both 148 * reading and writing. 149 */ 150 public void testReadableAndWriteable() throws Exception { 151 ServerSocketChannel ssc = null; 152 SocketChannel sc = null; 153 SocketChannel peer = null; 154 try (Selector sel = Selector.open()) { 155 ssc = ServerSocketChannel.open().bind(new InetSocketAddress(0)); 156 sc = SocketChannel.open(ssc.getLocalAddress()); 157 sc.configureBlocking(false); 158 SelectionKey key = sc.register(sel, (SelectionKey.OP_READ | 159 SelectionKey.OP_WRITE)); 160 161 // accept connection and write data so the source is readable 162 peer = ssc.accept(); 163 peer.write(messageBuffer()); 164 165 // test that action is invoked 166 testActionInvoked(key, (SelectionKey.OP_READ | SelectionKey.OP_WRITE)); 167 } finally { 168 if (ssc != null) ssc.close(); 169 if (sc != null) sc.close(); 170 if (peer != null) peer.close(); 171 } 172 } 173 174 /** 175 * Test that the action is called for two selected channels 176 */ 177 public void testTwoChannels() throws Exception { 178 Pipe p = Pipe.open(); 179 try (Selector sel = Selector.open()) { 180 Pipe.SourceChannel source = p.source(); 181 Pipe.SinkChannel sink = p.sink(); 182 source.configureBlocking(false); 183 sink.configureBlocking(false); 184 SelectionKey key1 = source.register(sel, SelectionKey.OP_READ); 185 SelectionKey key2 = sink.register(sel, SelectionKey.OP_WRITE); 186 187 // write to sink to ensure that the source is readable 188 sink.write(messageBuffer()); 189 190 var counter = new AtomicInteger(); 191 192 // select(Consumer) 193 counter.set(0); 194 int n = sel.select(k -> { 195 counter.incrementAndGet(); 196 if (k == key1) { 197 assertTrue(k.isReadable()); 198 } else if (k == key2) { 199 assertTrue(k.isWritable()); 200 } else { 201 assertTrue(false); 202 } 203 }); 204 assertTrue(n == 2); 205 assertTrue(counter.get() == 2); 206 207 // select(Consumer, timeout) 208 counter.set(0); 209 n = sel.select(k -> { 210 counter.incrementAndGet(); 211 if (k == key1) { 212 assertTrue(k.isReadable()); 213 } else if (k == key2) { 214 assertTrue(k.isWritable()); 215 } else { 216 assertTrue(false); 217 } 218 }, 1000); 219 assertTrue(n == 2); 220 assertTrue(counter.get() == 2); 221 222 // selectNow(Consumer) 223 counter.set(0); 224 n = sel.selectNow(k -> { 225 counter.incrementAndGet(); 226 if (k == key1) { 227 assertTrue(k.isReadable()); 228 } else if (k == key2) { 229 assertTrue(k.isWritable()); 230 } else { 231 assertTrue(false); 232 } 233 }); 234 assertTrue(n == 2); 235 assertTrue(counter.get() == 2); 236 } finally { 237 closePipe(p); 238 } 239 } 240 241 /** 242 * Test calling select twice, the action should be invoked each time 243 */ 244 public void testRepeatedSelect1() throws Exception { 245 Pipe p = Pipe.open(); 246 try (Selector sel = Selector.open()) { 247 Pipe.SourceChannel source = p.source(); 248 Pipe.SinkChannel sink = p.sink(); 249 source.configureBlocking(false); 250 SelectionKey key = source.register(sel, SelectionKey.OP_READ); 251 252 // write to sink to ensure that the source is readable 253 sink.write(messageBuffer()); 254 255 // test that action is invoked 256 testActionInvoked(key, SelectionKey.OP_READ); 257 testActionInvoked(key, SelectionKey.OP_READ); 258 259 } finally { 260 closePipe(p); 261 } 262 } 263 264 /** 265 * Test calling select twice. An I/O operation is performed after the 266 * first select so the channel will not be selected by the second select. 267 */ 268 public void testRepeatedSelect2() throws Exception { 269 Pipe p = Pipe.open(); 270 try (Selector sel = Selector.open()) { 271 Pipe.SourceChannel source = p.source(); 272 Pipe.SinkChannel sink = p.sink(); 273 source.configureBlocking(false); 274 SelectionKey key = source.register(sel, SelectionKey.OP_READ); 275 276 // write to sink to ensure that the source is readable 277 sink.write(messageBuffer()); 278 279 // test that action is invoked 280 testActionInvoked(key, SelectionKey.OP_READ); 281 282 // read all bytes 283 int n; 284 ByteBuffer bb = ByteBuffer.allocate(100); 285 do { 286 n = source.read(bb); 287 bb.clear(); 288 } while (n > 0); 289 290 // test that action is not invoked 291 testActionInvoked(key, 0); 292 } finally { 293 closePipe(p); 294 } 295 } 296 297 /** 298 * Test timeout 299 */ 300 public void testTimeout() throws Exception { 301 Pipe p = Pipe.open(); 302 try (Selector sel = Selector.open()) { 303 Pipe.SourceChannel source = p.source(); 304 Pipe.SinkChannel sink = p.sink(); 305 source.configureBlocking(false); 306 source.register(sel, SelectionKey.OP_READ); 307 long start = System.currentTimeMillis(); 308 int n = sel.select(k -> assertTrue(false), 1000L); 309 long duration = System.currentTimeMillis() - start; 310 assertTrue(n == 0); 311 assertTrue(duration > 500, "select took " + duration + " ms"); 312 } finally { 313 closePipe(p); 314 } 315 } 316 317 /** 318 * Test wakeup prior to select 319 */ 320 public void testWakeupBeforeSelect() throws Exception { 321 // select(Consumer) 322 try (Selector sel = Selector.open()) { 323 sel.wakeup(); 324 int n = sel.select(k -> assertTrue(false)); 325 assertTrue(n == 0); 326 } 327 328 // select(Consumer, timeout) 329 try (Selector sel = Selector.open()) { 330 sel.wakeup(); 331 long start = System.currentTimeMillis(); 332 int n = sel.select(k -> assertTrue(false), 60*1000); 333 long duration = System.currentTimeMillis() - start; 334 assertTrue(n == 0); 335 assertTrue(duration < 5000, "select took " + duration + " ms"); 336 } 337 } 338 339 /** 340 * Test wakeup during select 341 */ 342 public void testWakeupDuringSelect() throws Exception { 343 // select(Consumer) 344 try (Selector sel = Selector.open()) { 345 scheduleWakeup(sel, 1, SECONDS); 346 int n = sel.select(k -> assertTrue(false)); 347 assertTrue(n == 0); 348 } 349 350 // select(Consumer, timeout) 351 try (Selector sel = Selector.open()) { 352 scheduleWakeup(sel, 1, SECONDS); 353 long start = System.currentTimeMillis(); 354 int n = sel.select(k -> assertTrue(false), 60*1000); 355 long duration = System.currentTimeMillis() - start; 356 assertTrue(n == 0); 357 assertTrue(duration > 500 && duration < 10*1000, 358 "select took " + duration + " ms"); 359 } 360 } 361 362 /** 363 * Test invoking select with interrupt status set 364 */ 365 public void testInterruptBeforeSelect() throws Exception { 366 // select(Consumer) 367 try (Selector sel = Selector.open()) { 368 Thread.currentThread().interrupt(); 369 int n = sel.select(k -> assertTrue(false)); 370 assertTrue(n == 0); 371 assertTrue(Thread.currentThread().isInterrupted()); 372 assertTrue(sel.isOpen()); 373 } finally { 374 Thread.currentThread().interrupted(); // clear interrupt status 375 } 376 377 // select(Consumer, timeout) 378 try (Selector sel = Selector.open()) { 379 Thread.currentThread().interrupt(); 380 long start = System.currentTimeMillis(); 381 int n = sel.select(k -> assertTrue(false), 60*1000); 382 long duration = System.currentTimeMillis() - start; 383 assertTrue(n == 0); 384 assertTrue(duration < 5000, "select took " + duration + " ms"); 385 assertTrue(Thread.currentThread().isInterrupted()); 386 assertTrue(sel.isOpen()); 387 } finally { 388 Thread.currentThread().interrupted(); // clear interrupt status 389 } 390 } 391 392 /** 393 * Test interrupt thread during select 394 */ 395 public void testInterruptDuringSelect() throws Exception { 396 // select(Consumer) 397 try (Selector sel = Selector.open()) { 398 scheduleInterrupt(Thread.currentThread(), 1, SECONDS); 399 int n = sel.select(k -> assertTrue(false)); 400 assertTrue(n == 0); 401 assertTrue(Thread.currentThread().isInterrupted()); 402 assertTrue(sel.isOpen()); 403 } finally { 404 Thread.currentThread().interrupted(); // clear interrupt status 405 } 406 407 // select(Consumer, timeout) 408 try (Selector sel = Selector.open()) { 409 scheduleInterrupt(Thread.currentThread(), 1, SECONDS); 410 long start = System.currentTimeMillis(); 411 int n = sel.select(k -> assertTrue(false), 60*1000); 412 long duration = System.currentTimeMillis() - start; 413 assertTrue(n == 0); 414 assertTrue(Thread.currentThread().isInterrupted()); 415 assertTrue(sel.isOpen()); 416 } finally { 417 Thread.currentThread().interrupted(); // clear interrupt status 418 } 419 } 420 421 /** 422 * Test invoking select on a closed selector 423 */ 424 @Test(expectedExceptions = ClosedSelectorException.class) 425 public void testClosedSelector1() throws Exception { 426 Selector sel = Selector.open(); 427 sel.close(); 428 sel.select(k -> assertTrue(false)); 429 } 430 @Test(expectedExceptions = ClosedSelectorException.class) 431 public void testClosedSelector2() throws Exception { 432 Selector sel = Selector.open(); 433 sel.close(); 434 sel.select(k -> assertTrue(false), 1000); 435 } 436 @Test(expectedExceptions = ClosedSelectorException.class) 437 public void testClosedSelector3() throws Exception { 438 Selector sel = Selector.open(); 439 sel.close(); 440 sel.selectNow(k -> assertTrue(false)); 441 } 442 443 /** 444 * Test closing selector while in a selection operation 445 */ 446 public void testCloseDuringSelect() throws Exception { 447 // select(Consumer) 448 try (Selector sel = Selector.open()) { 449 scheduleClose(sel, 3, SECONDS); 450 int n = sel.select(k -> assertTrue(false)); 451 assertTrue(n == 0); 452 assertFalse(sel.isOpen()); 453 } 454 455 // select(Consumer, timeout) 456 try (Selector sel = Selector.open()) { 457 scheduleClose(sel, 3, SECONDS); 458 long start = System.currentTimeMillis(); 459 int n = sel.select(k -> assertTrue(false), 60*1000); 460 long duration = System.currentTimeMillis() - start; 461 assertTrue(n == 0); 462 assertTrue(duration > 2000 && duration < 10*1000, 463 "select took " + duration + " ms"); 464 assertFalse(sel.isOpen()); 465 } 466 } 467 468 /** 469 * Test action closing selector 470 */ 471 @Test(expectedExceptions = ClosedSelectorException.class) 472 public void testActionClosingSelector() throws Exception { 473 Pipe p = Pipe.open(); 474 try (Selector sel = Selector.open()) { 475 Pipe.SourceChannel source = p.source(); 476 Pipe.SinkChannel sink = p.sink(); 477 source.configureBlocking(false); 478 SelectionKey key = source.register(sel, SelectionKey.OP_READ); 479 480 // write to sink to ensure that the source is readable 481 sink.write(messageBuffer()); 482 483 // should relay ClosedSelectorException 484 sel.select(k -> { 485 assertTrue(k == key); 486 try { 487 sel.close(); 488 } catch (IOException ioe) { } 489 }); 490 } finally { 491 closePipe(p); 492 } 493 } 494 495 /** 496 * Test that the action is invoked while synchronized on the selector and 497 * its selected-key set. 498 */ 499 public void testLocks() throws Exception { 500 Pipe p = Pipe.open(); 501 try (Selector sel = Selector.open()) { 502 Pipe.SourceChannel source = p.source(); 503 Pipe.SinkChannel sink = p.sink(); 504 source.configureBlocking(false); 505 SelectionKey key = source.register(sel, SelectionKey.OP_READ); 506 507 // write to sink to ensure that the source is readable 508 sink.write(messageBuffer()); 509 510 // select(Consumer) 511 sel.select(k -> { 512 assertTrue(k == key); 513 assertTrue(Thread.holdsLock(sel)); 514 assertFalse(Thread.holdsLock(sel.keys())); 515 assertTrue(Thread.holdsLock(sel.selectedKeys())); 516 }); 517 518 // select(Consumer, timeout) 519 sel.select(k -> { 520 assertTrue(k == key); 521 assertTrue(Thread.holdsLock(sel)); 522 assertFalse(Thread.holdsLock(sel.keys())); 523 assertTrue(Thread.holdsLock(sel.selectedKeys())); 524 }, 1000L); 525 526 // selectNow(Consumer) 527 sel.selectNow(k -> { 528 assertTrue(k == key); 529 assertTrue(Thread.holdsLock(sel)); 530 assertFalse(Thread.holdsLock(sel.keys())); 531 assertTrue(Thread.holdsLock(sel.selectedKeys())); 532 }); 533 } finally { 534 closePipe(p); 535 } 536 } 537 538 /** 539 * Test that selection operations remove cancelled keys from the selector's 540 * key and selected-key sets. 541 */ 542 public void testCancel() throws Exception { 543 Pipe p = Pipe.open(); 544 try (Selector sel = Selector.open()) { 545 Pipe.SinkChannel sink = p.sink(); 546 Pipe.SourceChannel source = p.source(); 547 548 // write to sink to ensure that the source is readable 549 sink.write(messageBuffer()); 550 551 source.configureBlocking(false); 552 SelectionKey key1 = source.register(sel, SelectionKey.OP_READ); 553 // make sure pipe source is readable before we do following checks. 554 // this is sometime necessary on windows where pipe is implemented 555 // as a pair of connected socket, so there is no guarantee that written 556 // bytes on sink side is immediately available on source side. 557 sel.select(); 558 559 sink.configureBlocking(false); 560 SelectionKey key2 = sink.register(sel, SelectionKey.OP_WRITE); 561 sel.selectNow(); 562 563 assertTrue(sel.keys().contains(key1)); 564 assertTrue(sel.keys().contains(key2)); 565 assertTrue(sel.selectedKeys().contains(key1)); 566 assertTrue(sel.selectedKeys().contains(key2)); 567 568 // cancel key1 569 key1.cancel(); 570 int n = sel.selectNow(k -> assertTrue(k == key2)); 571 assertTrue(n == 1); 572 assertFalse(sel.keys().contains(key1)); 573 assertTrue(sel.keys().contains(key2)); 574 assertFalse(sel.selectedKeys().contains(key1)); 575 assertTrue(sel.selectedKeys().contains(key2)); 576 577 // cancel key2 578 key2.cancel(); 579 n = sel.selectNow(k -> assertTrue(false)); 580 assertTrue(n == 0); 581 assertFalse(sel.keys().contains(key1)); 582 assertFalse(sel.keys().contains(key2)); 583 assertFalse(sel.selectedKeys().contains(key1)); 584 assertFalse(sel.selectedKeys().contains(key2)); 585 } finally { 586 closePipe(p); 587 } 588 } 589 590 /** 591 * Test an action invoking select() 592 */ 593 public void testReentrantSelect1() throws Exception { 594 Pipe p = Pipe.open(); 595 try (Selector sel = Selector.open()) { 596 Pipe.SinkChannel sink = p.sink(); 597 Pipe.SourceChannel source = p.source(); 598 source.configureBlocking(false); 599 source.register(sel, SelectionKey.OP_READ); 600 601 // write to sink to ensure that the source is readable 602 scheduleWrite(sink, messageBuffer(), 100, MILLISECONDS); 603 604 int n = sel.select(k -> { 605 try { 606 sel.select(); 607 assertTrue(false); 608 } catch (IOException ioe) { 609 throw new RuntimeException(ioe); 610 } catch (IllegalStateException expected) { 611 } 612 }); 613 assertTrue(n == 1); 614 } finally { 615 closePipe(p); 616 } 617 } 618 619 /** 620 * Test an action invoking selectNow() 621 */ 622 public void testReentrantSelect2() throws Exception { 623 Pipe p = Pipe.open(); 624 try (Selector sel = Selector.open()) { 625 Pipe.SinkChannel sink = p.sink(); 626 Pipe.SourceChannel source = p.source(); 627 628 // write to sink to ensure that the source is readable 629 scheduleWrite(sink, messageBuffer(), 100, MILLISECONDS); 630 631 source.configureBlocking(false); 632 source.register(sel, SelectionKey.OP_READ); 633 int n = sel.select(k -> { 634 try { 635 sel.selectNow(); 636 assertTrue(false); 637 } catch (IOException ioe) { 638 throw new RuntimeException(ioe); 639 } catch (IllegalStateException expected) { 640 } 641 }); 642 assertTrue(n == 1); 643 } finally { 644 closePipe(p); 645 } 646 } 647 648 /** 649 * Test an action invoking select(Consumer) 650 */ 651 public void testReentrantSelect3() throws Exception { 652 Pipe p = Pipe.open(); 653 try (Selector sel = Selector.open()) { 654 Pipe.SinkChannel sink = p.sink(); 655 Pipe.SourceChannel source = p.source(); 656 657 // write to sink to ensure that the source is readable 658 scheduleWrite(sink, messageBuffer(), 100, MILLISECONDS); 659 660 source.configureBlocking(false); 661 source.register(sel, SelectionKey.OP_READ); 662 int n = sel.select(k -> { 663 try { 664 sel.select(x -> assertTrue(false)); 665 assertTrue(false); 666 } catch (IOException ioe) { 667 throw new RuntimeException(ioe); 668 } catch (IllegalStateException expected) { 669 } 670 }); 671 assertTrue(n == 1); 672 } finally { 673 closePipe(p); 674 } 675 } 676 677 /** 678 * Negative timeout 679 */ 680 @Test(expectedExceptions = IllegalArgumentException.class) 681 public void testNegativeTimeout() throws Exception { 682 try (Selector sel = Selector.open()) { 683 sel.select(k -> { }, -1L); 684 } 685 } 686 687 /** 688 * Null action 689 */ 690 @Test(expectedExceptions = NullPointerException.class) 691 public void testNull1() throws Exception { 692 try (Selector sel = Selector.open()) { 693 sel.select(null); 694 } 695 } 696 @Test(expectedExceptions = NullPointerException.class) 697 public void testNull2() throws Exception { 698 try (Selector sel = Selector.open()) { 699 sel.select(null, 1000); 700 } 701 } 702 @Test(expectedExceptions = NullPointerException.class) 703 public void testNull3() throws Exception { 704 try (Selector sel = Selector.open()) { 705 sel.selectNow(null); 706 } 707 } 708 709 710 // -- support methods --- 711 712 private final ScheduledExecutorService POOL = Executors.newScheduledThreadPool(1); 713 714 @AfterTest 715 void shutdownThreadPool() { 716 POOL.shutdown(); 717 } 718 719 void scheduleWakeup(Selector sel, long delay, TimeUnit unit) { 720 POOL.schedule(() -> sel.wakeup(), delay, unit); 721 } 722 723 void scheduleInterrupt(Thread t, long delay, TimeUnit unit) { 724 POOL.schedule(() -> t.interrupt(), delay, unit); 725 } 726 727 void scheduleClose(Closeable c, long delay, TimeUnit unit) { 728 POOL.schedule(() -> { 729 try { 730 c.close(); 731 } catch (IOException ioe) { 732 ioe.printStackTrace(); 733 } 734 }, delay, unit); 735 } 736 737 void scheduleWrite(WritableByteChannel sink, ByteBuffer buf, long delay, TimeUnit unit) { 738 POOL.schedule(() -> { 739 try { 740 sink.write(buf); 741 } catch (IOException ioe) { 742 ioe.printStackTrace(); 743 } 744 }, delay, unit); 745 } 746 747 static void closePipe(Pipe p) { 748 try { p.sink().close(); } catch (IOException ignore) { } 749 try { p.source().close(); } catch (IOException ignore) { } 750 } 751 752 static ByteBuffer messageBuffer() { 753 try { 754 return ByteBuffer.wrap("message".getBytes("UTF-8")); 755 } catch (Exception e) { 756 throw new RuntimeException(e); 757 } 758 } 759 }