1 /* 2 * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 /* @test 25 * @bug 8199433 26 * @run testng SelectWithConsumer 27 * @summary Unit test for Selector select(Consumer), select(Consumer,long) and 28 * selectNow(Consumer) 29 */ 30 31 import java.io.Closeable; 32 import java.io.IOException; 33 import java.net.InetSocketAddress; 34 import java.nio.ByteBuffer; 35 import java.nio.channels.ClosedSelectorException; 36 import java.nio.channels.Pipe; 37 import java.nio.channels.SelectionKey; 38 import java.nio.channels.Selector; 39 import java.nio.channels.ServerSocketChannel; 40 import java.nio.channels.SocketChannel; 41 import java.nio.channels.WritableByteChannel; 42 import java.util.concurrent.Executors; 43 import java.util.concurrent.ScheduledExecutorService; 44 import java.util.concurrent.TimeUnit; 45 import java.util.concurrent.atomic.AtomicInteger; 46 import static java.util.concurrent.TimeUnit.*; 47 48 import org.testng.annotations.AfterTest; 49 import org.testng.annotations.Test; 50 import static org.testng.Assert.*; 51 52 @Test 53 public class SelectWithConsumer { 54 55 /** 56 * Invoke the select methods that take an action and check that the 57 * accumulated ready ops notified to the action matches the expected ops. 58 */ 59 void testActionInvoked(SelectionKey key, int expectedOps) throws Exception { 60 var callerThread = Thread.currentThread(); 61 var sel = key.selector(); 62 var interestOps = key.interestOps(); 63 var notifiedOps = new AtomicInteger(); 64 65 // select(Consumer) 66 if (expectedOps == 0) 67 sel.wakeup(); // ensure select does not block 68 notifiedOps.set(0); 69 int n = sel.select(k -> { 70 assertTrue(Thread.currentThread() == callerThread); 71 assertTrue(k == key); 72 int readyOps = key.readyOps(); 73 assertTrue((readyOps & interestOps) != 0); 74 assertTrue((readyOps & notifiedOps.get()) == 0); 75 notifiedOps.set(notifiedOps.get() | readyOps); 76 }); 77 assertTrue((n == 1) ^ (expectedOps == 0)); 78 assertTrue(notifiedOps.get() == expectedOps); 79 80 // select(Consumer, timeout) 81 notifiedOps.set(0); 82 n = sel.select(k -> { 83 assertTrue(Thread.currentThread() == callerThread); 84 assertTrue(k == key); 85 int readyOps = key.readyOps(); 86 assertTrue((readyOps & interestOps) != 0); 87 assertTrue((readyOps & notifiedOps.get()) == 0); 88 notifiedOps.set(notifiedOps.get() | readyOps); 89 }, 1000); 90 assertTrue((n == 1) ^ (expectedOps == 0)); 91 assertTrue(notifiedOps.get() == expectedOps); 92 93 // selectNow(Consumer) 94 notifiedOps.set(0); 95 n = sel.selectNow(k -> { 96 assertTrue(Thread.currentThread() == callerThread); 97 assertTrue(k == key); 98 int readyOps = key.readyOps(); 99 assertTrue((readyOps & interestOps) != 0); 100 assertTrue((readyOps & notifiedOps.get()) == 0); 101 notifiedOps.set(notifiedOps.get() | readyOps); 102 }); 103 assertTrue((n == 1) ^ (expectedOps == 0)); 104 assertTrue(notifiedOps.get() == expectedOps); 105 } 106 107 /** 108 * Test that an action is performed when a channel is ready for reading. 109 */ 110 public void testReadable() throws Exception { 111 Pipe p = Pipe.open(); 112 try (Selector sel = Selector.open()) { 113 Pipe.SinkChannel sink = p.sink(); 114 Pipe.SourceChannel source = p.source(); 115 source.configureBlocking(false); 116 SelectionKey key = source.register(sel, SelectionKey.OP_READ); 117 118 // write to sink to ensure source is readable 119 scheduleWrite(sink, messageBuffer(), 100, MILLISECONDS); 120 121 // test that action is invoked 122 testActionInvoked(key, SelectionKey.OP_READ); 123 } finally { 124 closePipe(p); 125 } 126 } 127 128 /** 129 * Test that an action is performed when a channel is ready for writing. 130 */ 131 public void testWritable() throws Exception { 132 Pipe p = Pipe.open(); 133 try (Selector sel = Selector.open()) { 134 Pipe.SourceChannel source = p.source(); 135 Pipe.SinkChannel sink = p.sink(); 136 sink.configureBlocking(false); 137 SelectionKey key = sink.register(sel, SelectionKey.OP_WRITE); 138 139 // test that action is invoked 140 testActionInvoked(key, SelectionKey.OP_WRITE); 141 } finally { 142 closePipe(p); 143 } 144 } 145 146 /** 147 * Test that an action is performed when a channel is ready for both 148 * reading and writing. 149 */ 150 public void testReadableAndWriteable() throws Exception { 151 ServerSocketChannel ssc = null; 152 SocketChannel sc = null; 153 SocketChannel peer = null; 154 try (Selector sel = Selector.open()) { 155 ssc = ServerSocketChannel.open().bind(new InetSocketAddress(0)); 156 sc = SocketChannel.open(ssc.getLocalAddress()); 157 sc.configureBlocking(false); 158 SelectionKey key = sc.register(sel, (SelectionKey.OP_READ | 159 SelectionKey.OP_WRITE)); 160 161 // accept connection and write data so the source is readable 162 peer = ssc.accept(); 163 peer.write(messageBuffer()); 164 165 // test that action is invoked 166 testActionInvoked(key, (SelectionKey.OP_READ | SelectionKey.OP_WRITE)); 167 } finally { 168 if (ssc != null) ssc.close(); 169 if (sc != null) sc.close(); 170 if (peer != null) peer.close(); 171 } 172 } 173 174 /** 175 * Test that the action is called for two selected channels 176 */ 177 public void testTwoChannels() throws Exception { 178 Pipe p = Pipe.open(); 179 try (Selector sel = Selector.open()) { 180 Pipe.SourceChannel source = p.source(); 181 Pipe.SinkChannel sink = p.sink(); 182 source.configureBlocking(false); 183 sink.configureBlocking(false); 184 SelectionKey key1 = source.register(sel, SelectionKey.OP_READ); 185 SelectionKey key2 = sink.register(sel, SelectionKey.OP_WRITE); 186 187 // write to sink to ensure that the source is readable 188 sink.write(messageBuffer()); 189 190 var counter = new AtomicInteger(); 191 192 // select(Consumer) 193 counter.set(0); 194 int n = sel.select(k -> { 195 counter.incrementAndGet(); 196 if (k == key1) { 197 assertTrue(k.isReadable()); 198 } else if (k == key2) { 199 assertTrue(k.isWritable()); 200 } else { 201 assertTrue(false); 202 } 203 }); 204 assertTrue(n == 2); 205 assertTrue(counter.get() == 2); 206 207 // select(Consumer, timeout) 208 counter.set(0); 209 n = sel.select(k -> { 210 counter.incrementAndGet(); 211 if (k == key1) { 212 assertTrue(k.isReadable()); 213 } else if (k == key2) { 214 assertTrue(k.isWritable()); 215 } else { 216 assertTrue(false); 217 } 218 }, 1000); 219 assertTrue(n == 2); 220 assertTrue(counter.get() == 2); 221 222 // selectNow(Consumer) 223 counter.set(0); 224 n = sel.selectNow(k -> { 225 counter.incrementAndGet(); 226 if (k == key1) { 227 assertTrue(k.isReadable()); 228 } else if (k == key2) { 229 assertTrue(k.isWritable()); 230 } else { 231 assertTrue(false); 232 } 233 }); 234 assertTrue(n == 2); 235 assertTrue(counter.get() == 2); 236 } finally { 237 closePipe(p); 238 } 239 } 240 241 /** 242 * Test calling select twice, the action should be invoked each time 243 */ 244 public void testRepeatedSelect1() throws Exception { 245 Pipe p = Pipe.open(); 246 try (Selector sel = Selector.open()) { 247 Pipe.SourceChannel source = p.source(); 248 Pipe.SinkChannel sink = p.sink(); 249 source.configureBlocking(false); 250 SelectionKey key = source.register(sel, SelectionKey.OP_READ); 251 252 // write to sink to ensure that the source is readable 253 sink.write(messageBuffer()); 254 255 // test that action is invoked 256 testActionInvoked(key, SelectionKey.OP_READ); 257 testActionInvoked(key, SelectionKey.OP_READ); 258 259 } finally { 260 closePipe(p); 261 } 262 } 263 264 /** 265 * Test calling select twice. An I/O operation is performed after the 266 * first select so the channel will not be selected by the second select. 267 */ 268 public void testRepeatedSelect2() throws Exception { 269 Pipe p = Pipe.open(); 270 try (Selector sel = Selector.open()) { 271 Pipe.SourceChannel source = p.source(); 272 Pipe.SinkChannel sink = p.sink(); 273 source.configureBlocking(false); 274 SelectionKey key = source.register(sel, SelectionKey.OP_READ); 275 276 // write to sink to ensure that the source is readable 277 sink.write(messageBuffer()); 278 279 // test that action is invoked 280 testActionInvoked(key, SelectionKey.OP_READ); 281 282 // read all bytes 283 int n; 284 ByteBuffer bb = ByteBuffer.allocate(100); 285 do { 286 n = source.read(bb); 287 bb.clear(); 288 } while (n > 0); 289 290 // test that action is not invoked 291 testActionInvoked(key, 0); 292 } finally { 293 closePipe(p); 294 } 295 } 296 297 /** 298 * Test timeout 299 */ 300 public void testTimeout() throws Exception { 301 Pipe p = Pipe.open(); 302 try (Selector sel = Selector.open()) { 303 Pipe.SourceChannel source = p.source(); 304 Pipe.SinkChannel sink = p.sink(); 305 source.configureBlocking(false); 306 source.register(sel, SelectionKey.OP_READ); 307 long start = System.currentTimeMillis(); 308 int n = sel.select(k -> assertTrue(false), 1000L); 309 long duration = System.currentTimeMillis() - start; 310 assertTrue(n == 0); 311 assertTrue(duration > 500, "select took " + duration + " ms"); 312 } finally { 313 closePipe(p); 314 } 315 } 316 317 /** 318 * Test wakeup prior to select 319 */ 320 public void testWakeupBeforeSelect() throws Exception { 321 // select(Consumer) 322 try (Selector sel = Selector.open()) { 323 sel.wakeup(); 324 int n = sel.select(k -> assertTrue(false)); 325 assertTrue(n == 0); 326 } 327 328 // select(Consumer, timeout) 329 try (Selector sel = Selector.open()) { 330 sel.wakeup(); 331 long start = System.currentTimeMillis(); 332 int n = sel.select(k -> assertTrue(false), 60*1000); 333 long duration = System.currentTimeMillis() - start; 334 assertTrue(n == 0); 335 assertTrue(duration < 5000, "select took " + duration + " ms"); 336 } 337 } 338 339 /** 340 * Test wakeup during select 341 */ 342 public void testWakeupDuringSelect() throws Exception { 343 // select(Consumer) 344 try (Selector sel = Selector.open()) { 345 scheduleWakeup(sel, 1, SECONDS); 346 int n = sel.select(k -> assertTrue(false)); 347 assertTrue(n == 0); 348 } 349 350 // select(Consumer, timeout) 351 try (Selector sel = Selector.open()) { 352 scheduleWakeup(sel, 1, SECONDS); 353 long start = System.currentTimeMillis(); 354 int n = sel.select(k -> assertTrue(false), 60*1000); 355 long duration = System.currentTimeMillis() - start; 356 assertTrue(n == 0); 357 assertTrue(duration > 500 && duration < 10*1000, 358 "select took " + duration + " ms"); 359 } 360 } 361 362 /** 363 * Test invoking select with interrupt status set 364 */ 365 public void testInterruptBeforeSelect() throws Exception { 366 // select(Consumer) 367 try (Selector sel = Selector.open()) { 368 Thread.currentThread().interrupt(); 369 int n = sel.select(k -> assertTrue(false)); 370 assertTrue(n == 0); 371 assertTrue(Thread.currentThread().isInterrupted()); 372 assertTrue(sel.isOpen()); 373 } finally { 374 Thread.currentThread().interrupted(); // clear interrupt status 375 } 376 377 // select(Consumer, timeout) 378 try (Selector sel = Selector.open()) { 379 Thread.currentThread().interrupt(); 380 long start = System.currentTimeMillis(); 381 int n = sel.select(k -> assertTrue(false), 60*1000); 382 long duration = System.currentTimeMillis() - start; 383 assertTrue(n == 0); 384 assertTrue(duration < 5000, "select took " + duration + " ms"); 385 assertTrue(Thread.currentThread().isInterrupted()); 386 assertTrue(sel.isOpen()); 387 } finally { 388 Thread.currentThread().interrupted(); // clear interrupt status 389 } 390 } 391 392 /** 393 * Test interrupt thread during select 394 */ 395 public void testInterruptDuringSelect() throws Exception { 396 // select(Consumer) 397 try (Selector sel = Selector.open()) { 398 scheduleInterrupt(Thread.currentThread(), 1, SECONDS); 399 int n = sel.select(k -> assertTrue(false)); 400 assertTrue(n == 0); 401 assertTrue(Thread.currentThread().isInterrupted()); 402 assertTrue(sel.isOpen()); 403 } finally { 404 Thread.currentThread().interrupted(); // clear interrupt status 405 } 406 407 // select(Consumer, timeout) 408 try (Selector sel = Selector.open()) { 409 scheduleInterrupt(Thread.currentThread(), 1, SECONDS); 410 long start = System.currentTimeMillis(); 411 int n = sel.select(k -> assertTrue(false), 60*1000); 412 long duration = System.currentTimeMillis() - start; 413 assertTrue(n == 0); 414 assertTrue(duration > 500 && duration < 5000, 415 "select took " + duration + " ms"); 416 assertTrue(Thread.currentThread().isInterrupted()); 417 assertTrue(sel.isOpen()); 418 } finally { 419 Thread.currentThread().interrupted(); // clear interrupt status 420 } 421 } 422 423 /** 424 * Test invoking select on a closed selector 425 */ 426 @Test(expectedExceptions = ClosedSelectorException.class) 427 public void testClosedSelector1() throws Exception { 428 Selector sel = Selector.open(); 429 sel.close(); 430 sel.select(k -> assertTrue(false)); 431 } 432 @Test(expectedExceptions = ClosedSelectorException.class) 433 public void testClosedSelector2() throws Exception { 434 Selector sel = Selector.open(); 435 sel.close(); 436 sel.select(k -> assertTrue(false), 1000); 437 } 438 @Test(expectedExceptions = ClosedSelectorException.class) 439 public void testClosedSelector3() throws Exception { 440 Selector sel = Selector.open(); 441 sel.close(); 442 sel.selectNow(k -> assertTrue(false)); 443 } 444 445 /** 446 * Test closing selector while in a selection operation 447 */ 448 public void testCloseDuringSelect() throws Exception { 449 // select(Consumer) 450 try (Selector sel = Selector.open()) { 451 scheduleClose(sel, 3, SECONDS); 452 int n = sel.select(k -> assertTrue(false)); 453 assertTrue(n == 0); 454 assertFalse(sel.isOpen()); 455 } 456 457 // select(Consumer, timeout) 458 try (Selector sel = Selector.open()) { 459 scheduleClose(sel, 3, SECONDS); 460 long start = System.currentTimeMillis(); 461 int n = sel.select(k -> assertTrue(false), 60*1000); 462 long duration = System.currentTimeMillis() - start; 463 assertTrue(n == 0); 464 assertTrue(duration > 2000 && duration < 10*1000, 465 "select took " + duration + " ms"); 466 assertFalse(sel.isOpen()); 467 } 468 } 469 470 /** 471 * Test action closing selector 472 */ 473 @Test(expectedExceptions = ClosedSelectorException.class) 474 public void testActionClosingSelector() throws Exception { 475 Pipe p = Pipe.open(); 476 try (Selector sel = Selector.open()) { 477 Pipe.SourceChannel source = p.source(); 478 Pipe.SinkChannel sink = p.sink(); 479 source.configureBlocking(false); 480 SelectionKey key = source.register(sel, SelectionKey.OP_READ); 481 482 // write to sink to ensure that the source is readable 483 sink.write(messageBuffer()); 484 485 // should relay ClosedSelectorException 486 sel.select(k -> { 487 assertTrue(k == key); 488 try { 489 sel.close(); 490 } catch (IOException ioe) { } 491 }); 492 } finally { 493 closePipe(p); 494 } 495 } 496 497 /** 498 * Test that the action is invoked while synchronized on the selector and 499 * its selected-key set. 500 */ 501 public void testLocks() throws Exception { 502 Pipe p = Pipe.open(); 503 try (Selector sel = Selector.open()) { 504 Pipe.SourceChannel source = p.source(); 505 Pipe.SinkChannel sink = p.sink(); 506 source.configureBlocking(false); 507 SelectionKey key = source.register(sel, SelectionKey.OP_READ); 508 509 // write to sink to ensure that the source is readable 510 sink.write(messageBuffer()); 511 512 // select(Consumer) 513 sel.select(k -> { 514 assertTrue(k == key); 515 assertTrue(Thread.holdsLock(sel)); 516 assertFalse(Thread.holdsLock(sel.keys())); 517 assertTrue(Thread.holdsLock(sel.selectedKeys())); 518 }); 519 520 // select(Consumer, timeout) 521 sel.select(k -> { 522 assertTrue(k == key); 523 assertTrue(Thread.holdsLock(sel)); 524 assertFalse(Thread.holdsLock(sel.keys())); 525 assertTrue(Thread.holdsLock(sel.selectedKeys())); 526 }, 1000L); 527 528 // selectNow(Consumer) 529 sel.selectNow(k -> { 530 assertTrue(k == key); 531 assertTrue(Thread.holdsLock(sel)); 532 assertFalse(Thread.holdsLock(sel.keys())); 533 assertTrue(Thread.holdsLock(sel.selectedKeys())); 534 }); 535 } finally { 536 closePipe(p); 537 } 538 } 539 540 /** 541 * Test that selection operations remove cancelled keys from the selector's 542 * key and selected-key sets. 543 */ 544 public void testCancel() throws Exception { 545 Pipe p = Pipe.open(); 546 try (Selector sel = Selector.open()) { 547 Pipe.SinkChannel sink = p.sink(); 548 Pipe.SourceChannel source = p.source(); 549 550 // write to sink to ensure that the source is readable 551 sink.write(messageBuffer()); 552 553 sink.configureBlocking(false); 554 source.configureBlocking(false); 555 SelectionKey key1 = sink.register(sel, SelectionKey.OP_WRITE); 556 SelectionKey key2 = source.register(sel, SelectionKey.OP_READ); 557 558 sel.selectNow(); 559 assertTrue(sel.keys().contains(key1)); 560 assertTrue(sel.keys().contains(key2)); 561 assertTrue(sel.selectedKeys().contains(key1)); 562 assertTrue(sel.selectedKeys().contains(key2)); 563 564 // cancel key1 565 key1.cancel(); 566 int n = sel.selectNow(k -> assertTrue(k == key2)); 567 assertTrue(n == 1); 568 assertFalse(sel.keys().contains(key1)); 569 assertTrue(sel.keys().contains(key2)); 570 assertFalse(sel.selectedKeys().contains(key1)); 571 assertTrue(sel.selectedKeys().contains(key2)); 572 573 // cancel key2 574 key2.cancel(); 575 n = sel.selectNow(k -> assertTrue(false)); 576 assertTrue(n == 0); 577 assertFalse(sel.keys().contains(key1)); 578 assertFalse(sel.keys().contains(key2)); 579 assertFalse(sel.selectedKeys().contains(key1)); 580 assertFalse(sel.selectedKeys().contains(key2)); 581 } finally { 582 closePipe(p); 583 } 584 } 585 586 /** 587 * Test an action invoking select() 588 */ 589 public void testReentrantSelect1() throws Exception { 590 Pipe p = Pipe.open(); 591 try (Selector sel = Selector.open()) { 592 Pipe.SinkChannel sink = p.sink(); 593 Pipe.SourceChannel source = p.source(); 594 source.configureBlocking(false); 595 source.register(sel, SelectionKey.OP_READ); 596 597 // write to sink to ensure that the source is readable 598 scheduleWrite(sink, messageBuffer(), 100, MILLISECONDS); 599 600 int n = sel.select(k -> { 601 try { 602 sel.select(); 603 assertTrue(false); 604 } catch (IOException ioe) { 605 throw new RuntimeException(ioe); 606 } catch (IllegalStateException expected) { 607 } 608 }); 609 assertTrue(n == 1); 610 } finally { 611 closePipe(p); 612 } 613 } 614 615 /** 616 * Test an action invoking selectNow() 617 */ 618 public void testReentrantSelect2() throws Exception { 619 Pipe p = Pipe.open(); 620 try (Selector sel = Selector.open()) { 621 Pipe.SinkChannel sink = p.sink(); 622 Pipe.SourceChannel source = p.source(); 623 624 // write to sink to ensure that the source is readable 625 scheduleWrite(sink, messageBuffer(), 100, MILLISECONDS); 626 627 source.configureBlocking(false); 628 source.register(sel, SelectionKey.OP_READ); 629 int n = sel.select(k -> { 630 try { 631 sel.selectNow(); 632 assertTrue(false); 633 } catch (IOException ioe) { 634 throw new RuntimeException(ioe); 635 } catch (IllegalStateException expected) { 636 } 637 }); 638 assertTrue(n == 1); 639 } finally { 640 closePipe(p); 641 } 642 } 643 644 /** 645 * Test an action invoking select(Consumer) 646 */ 647 public void testReentrantSelect3() throws Exception { 648 Pipe p = Pipe.open(); 649 try (Selector sel = Selector.open()) { 650 Pipe.SinkChannel sink = p.sink(); 651 Pipe.SourceChannel source = p.source(); 652 653 // write to sink to ensure that the source is readable 654 scheduleWrite(sink, messageBuffer(), 100, MILLISECONDS); 655 656 source.configureBlocking(false); 657 source.register(sel, SelectionKey.OP_READ); 658 int n = sel.select(k -> { 659 try { 660 sel.select(x -> assertTrue(false)); 661 assertTrue(false); 662 } catch (IOException ioe) { 663 throw new RuntimeException(ioe); 664 } catch (IllegalStateException expected) { 665 } 666 }); 667 assertTrue(n == 1); 668 } finally { 669 closePipe(p); 670 } 671 } 672 673 /** 674 * Negative timeout 675 */ 676 @Test(expectedExceptions = IllegalArgumentException.class) 677 public void testNegativeTimeout() throws Exception { 678 try (Selector sel = Selector.open()) { 679 sel.select(k -> { }, -1L); 680 } 681 } 682 683 /** 684 * Null action 685 */ 686 @Test(expectedExceptions = NullPointerException.class) 687 public void testNull1() throws Exception { 688 try (Selector sel = Selector.open()) { 689 sel.select(null); 690 } 691 } 692 @Test(expectedExceptions = NullPointerException.class) 693 public void testNull2() throws Exception { 694 try (Selector sel = Selector.open()) { 695 sel.select(null, 1000); 696 } 697 } 698 @Test(expectedExceptions = NullPointerException.class) 699 public void testNull3() throws Exception { 700 try (Selector sel = Selector.open()) { 701 sel.selectNow(null); 702 } 703 } 704 705 706 // -- support methods --- 707 708 private final ScheduledExecutorService POOL = Executors.newScheduledThreadPool(1); 709 710 @AfterTest 711 void shutdownThreadPool() { 712 POOL.shutdown(); 713 } 714 715 void scheduleWakeup(Selector sel, long delay, TimeUnit unit) { 716 POOL.schedule(() -> sel.wakeup(), delay, unit); 717 } 718 719 void scheduleInterrupt(Thread t, long delay, TimeUnit unit) { 720 POOL.schedule(() -> t.interrupt(), delay, unit); 721 } 722 723 void scheduleClose(Closeable c, long delay, TimeUnit unit) { 724 POOL.schedule(() -> { 725 try { 726 c.close(); 727 } catch (IOException ioe) { 728 ioe.printStackTrace(); 729 } 730 }, delay, unit); 731 } 732 733 void scheduleWrite(WritableByteChannel sink, ByteBuffer buf, long delay, TimeUnit unit) { 734 POOL.schedule(() -> { 735 try { 736 sink.write(buf); 737 } catch (IOException ioe) { 738 ioe.printStackTrace(); 739 } 740 }, delay, unit); 741 } 742 743 static void closePipe(Pipe p) { 744 try { p.sink().close(); } catch (IOException ignore) { } 745 try { p.source().close(); } catch (IOException ignore) { } 746 } 747 748 static ByteBuffer messageBuffer() { 749 try { 750 return ByteBuffer.wrap("message".getBytes("UTF-8")); 751 } catch (Exception e) { 752 throw new RuntimeException(e); 753 } 754 } 755 }