1 /* 2 * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 /* @test 25 * @bug 8199433 26 * @run testng SelectWithConsumer 27 * @summary Unit test for Selector select(Consumer), select(Consumer,long) and 28 * selectNow(Consumer) 29 */ 30 31 import java.io.Closeable; 32 import java.io.IOException; 33 import java.net.InetSocketAddress; 34 import java.nio.ByteBuffer; 35 import java.nio.channels.ClosedSelectorException; 36 import java.nio.channels.Pipe; 37 import java.nio.channels.SelectionKey; 38 import java.nio.channels.Selector; 39 import java.nio.channels.ServerSocketChannel; 40 import java.nio.channels.SocketChannel; 41 import java.nio.channels.WritableByteChannel; 42 import java.util.concurrent.Executors; 43 import java.util.concurrent.ScheduledExecutorService; 44 import java.util.concurrent.TimeUnit; 45 import java.util.concurrent.atomic.AtomicInteger; 46 import static java.util.concurrent.TimeUnit.*; 47 48 import org.testng.annotations.AfterTest; 49 import org.testng.annotations.Test; 50 import static org.testng.Assert.*; 51 52 @Test 53 public class SelectWithConsumer { 54 55 /** 56 * Invoke the select methods that take an action and checks that the 57 * accumulated ready ops notified to the action matches the expected ops. 58 */ 59 void testActionInvoked(SelectionKey key, int expectedOps) throws Exception { 60 var callerThread = Thread.currentThread(); 61 var sel = key.selector(); 62 var interestOps = key.interestOps(); 63 var notifiedOps = new AtomicInteger(); 64 65 // select(Consumer) 66 if (expectedOps == 0) 67 sel.wakeup(); // ensure select does not block 68 notifiedOps.set(0); 69 int n = sel.select(k -> { 70 assertTrue(Thread.currentThread() == callerThread); 71 assertTrue(k == key); 72 int readyOps = key.readyOps(); 73 assertTrue((readyOps & interestOps) != 0); 74 assertTrue((readyOps & notifiedOps.get()) == 0); 75 notifiedOps.set(notifiedOps.get() | readyOps); 76 }); 77 assertTrue((n == 1) ^ (expectedOps == 0)); 78 assertTrue(notifiedOps.get() == expectedOps); 79 80 // select(Consumer, timeout) 81 notifiedOps.set(0); 82 n = sel.select(k -> { 83 assertTrue(Thread.currentThread() == callerThread); 84 assertTrue(k == key); 85 int readyOps = key.readyOps(); 86 assertTrue((readyOps & interestOps) != 0); 87 assertTrue((readyOps & notifiedOps.get()) == 0); 88 notifiedOps.set(notifiedOps.get() | readyOps); 89 }, 1000); 90 assertTrue((n == 1) ^ (expectedOps == 0)); 91 assertTrue(notifiedOps.get() == expectedOps); 92 93 // selectNow(Consumer) 94 notifiedOps.set(0); 95 n = sel.selectNow(k -> { 96 assertTrue(Thread.currentThread() == callerThread); 97 assertTrue(k == key); 98 int readyOps = key.readyOps(); 99 assertTrue((readyOps & interestOps) != 0); 100 assertTrue((readyOps & notifiedOps.get()) == 0); 101 notifiedOps.set(notifiedOps.get() | readyOps); 102 }); 103 assertTrue((n == 1) ^ (expectedOps == 0)); 104 assertTrue(notifiedOps.get() == expectedOps); 105 } 106 107 /** 108 * Test that an action is performed when a channel is ready for reading. 109 */ 110 public void testReadable() throws Exception { 111 Pipe p = Pipe.open(); 112 try (Selector sel = Selector.open()) { 113 Pipe.SinkChannel sink = p.sink(); 114 Pipe.SourceChannel source = p.source(); 115 source.configureBlocking(false); 116 SelectionKey key = source.register(sel, SelectionKey.OP_READ); 117 118 // write to sink to ensure source is readable 119 scheduleWrite(sink, messageBuffer(), 100, MILLISECONDS); 120 121 // test that action is invoked 122 testActionInvoked(key, SelectionKey.OP_READ); 123 } finally { 124 closePipe(p); 125 } 126 } 127 128 /** 129 * Test that an action is performed when a channel is ready for writing. 130 */ 131 public void testWritable() throws Exception { 132 Pipe p = Pipe.open(); 133 try (Selector sel = Selector.open()) { 134 Pipe.SourceChannel source = p.source(); 135 Pipe.SinkChannel sink = p.sink(); 136 sink.configureBlocking(false); 137 SelectionKey key = sink.register(sel, SelectionKey.OP_WRITE); 138 139 // test that action is invoked 140 testActionInvoked(key, SelectionKey.OP_WRITE); 141 } finally { 142 closePipe(p); 143 } 144 } 145 146 /** 147 * Test that an action is performed when a channel is ready for both 148 * reading and writing. 149 */ 150 public void testReadableAndWriteable() throws Exception { 151 ServerSocketChannel ssc = null; 152 SocketChannel sc = null; 153 SocketChannel peer = null; 154 try (Selector sel = Selector.open()) { 155 ssc = ServerSocketChannel.open().bind(new InetSocketAddress(0)); 156 sc = SocketChannel.open(ssc.getLocalAddress()); 157 sc.configureBlocking(false); 158 SelectionKey key = sc.register(sel, (SelectionKey.OP_READ | 159 SelectionKey.OP_WRITE)); 160 161 // accept connection and write data so the source is readable 162 peer = ssc.accept(); 163 peer.write(messageBuffer()); 164 165 // test that action is invoked 166 testActionInvoked(key, (SelectionKey.OP_READ | SelectionKey.OP_WRITE)); 167 } finally { 168 if (ssc != null) ssc.close(); 169 if (sc != null) sc.close(); 170 if (peer != null) peer.close(); 171 } 172 } 173 174 /** 175 * Test that the action is called for two selected channels 176 */ 177 public void testTwoChannels() throws Exception { 178 Pipe p = Pipe.open(); 179 try (Selector sel = Selector.open()) { 180 Pipe.SourceChannel source = p.source(); 181 Pipe.SinkChannel sink = p.sink(); 182 source.configureBlocking(false); 183 sink.configureBlocking(false); 184 SelectionKey key1 = source.register(sel, SelectionKey.OP_READ); 185 SelectionKey key2 = sink.register(sel, SelectionKey.OP_WRITE); 186 187 // write to sink to ensure that the source is readable 188 sink.write(messageBuffer()); 189 190 var counter = new AtomicInteger(); 191 192 // select(Consumer) 193 counter.set(0); 194 int n = sel.select(k -> { 195 counter.incrementAndGet(); 196 if (k == key1) { 197 assertTrue(k.isReadable()); 198 } else if (k == key2) { 199 assertTrue(k.isWritable()); 200 } else { 201 assertTrue(false); 202 } 203 }); 204 assertTrue(n == 2); 205 assertTrue(counter.get() == 2); 206 207 // select(Consumer, timeout) 208 counter.set(0); 209 n = sel.select(k -> { 210 counter.incrementAndGet(); 211 if (k == key1) { 212 assertTrue(k.isReadable()); 213 } else if (k == key2) { 214 assertTrue(k.isWritable()); 215 } else { 216 assertTrue(false); 217 } 218 }, 1000); 219 assertTrue(n == 2); 220 assertTrue(counter.get() == 2); 221 222 // selectNow(Consumer) 223 counter.set(0); 224 n = sel.selectNow(k -> { 225 counter.incrementAndGet(); 226 if (k == key1) { 227 assertTrue(k.isReadable()); 228 } else if (k == key2) { 229 assertTrue(k.isWritable()); 230 } else { 231 assertTrue(false); 232 } 233 }); 234 assertTrue(n == 2); 235 assertTrue(counter.get() == 2); 236 } finally { 237 closePipe(p); 238 } 239 } 240 241 /** 242 * Test calling select twice, the action should be invoked each time 243 */ 244 public void testRepeatedSelect1() throws Exception { 245 Pipe p = Pipe.open(); 246 try (Selector sel = Selector.open()) { 247 Pipe.SourceChannel source = p.source(); 248 Pipe.SinkChannel sink = p.sink(); 249 source.configureBlocking(false); 250 SelectionKey key = source.register(sel, SelectionKey.OP_READ); 251 252 // write to sink to ensure that the source is readable 253 sink.write(messageBuffer()); 254 255 // test that action is invoked 256 testActionInvoked(key, SelectionKey.OP_READ); 257 testActionInvoked(key, SelectionKey.OP_READ); 258 259 } finally { 260 closePipe(p); 261 } 262 } 263 264 /** 265 * Test calling select twice. An I/O operation is performed after the 266 * first select so the channel will not be selected by the second select. 267 */ 268 public void testRepeatedSelect2() throws Exception { 269 Pipe p = Pipe.open(); 270 try (Selector sel = Selector.open()) { 271 Pipe.SourceChannel source = p.source(); 272 Pipe.SinkChannel sink = p.sink(); 273 source.configureBlocking(false); 274 SelectionKey key = source.register(sel, SelectionKey.OP_READ); 275 276 // write to sink to ensure that the source is readable 277 sink.write(messageBuffer()); 278 279 // test that action is invoked 280 testActionInvoked(key, SelectionKey.OP_READ); 281 282 // read all bytes 283 int n; 284 ByteBuffer bb = ByteBuffer.allocate(100); 285 do { 286 n = source.read(bb); 287 bb.clear(); 288 } while (n > 0); 289 290 // test that action is not invoked 291 testActionInvoked(key, 0); 292 } finally { 293 closePipe(p); 294 } 295 } 296 297 /** 298 * Test timeout 299 */ 300 public void testTimeout() throws Exception { 301 Pipe p = Pipe.open(); 302 try (Selector sel = Selector.open()) { 303 Pipe.SourceChannel source = p.source(); 304 Pipe.SinkChannel sink = p.sink(); 305 source.configureBlocking(false); 306 source.register(sel, SelectionKey.OP_READ); 307 long start = System.currentTimeMillis(); 308 int n = sel.select(k -> assertTrue(false), 1000L); 309 long duration = System.currentTimeMillis() - start; 310 assertTrue(n == 0); 311 assertTrue(duration > 500L); 312 } finally { 313 closePipe(p); 314 } 315 } 316 317 /** 318 * Test wakeup prior to select 319 */ 320 public void testWakeupBeforeSelect() throws Exception { 321 // select(Consumer) 322 try (Selector sel = Selector.open()) { 323 sel.wakeup(); 324 int n = sel.select(k -> assertTrue(false)); 325 assertTrue(n == 0); 326 } 327 328 // select(Consumer, timeout) 329 try (Selector sel = Selector.open()) { 330 sel.wakeup(); 331 long start = System.currentTimeMillis(); 332 int n = sel.select(k -> assertTrue(false), 60*1000); 333 long duration = System.currentTimeMillis() - start; 334 assertTrue(n == 0); 335 assertTrue(duration < 5000); 336 } 337 } 338 339 /** 340 * Test wakeup during select 341 */ 342 public void testWakeupDuringSelect() throws Exception { 343 // select(Consumer) 344 try (Selector sel = Selector.open()) { 345 scheduleWakeup(sel, 1, SECONDS); 346 int n = sel.select(k -> assertTrue(false)); 347 assertTrue(n == 0); 348 } 349 350 // select(Consumer, timeout) 351 try (Selector sel = Selector.open()) { 352 scheduleWakeup(sel, 1, SECONDS); 353 long start = System.currentTimeMillis(); 354 int n = sel.select(k -> assertTrue(false), 60*1000); 355 long duration = System.currentTimeMillis() - start; 356 assertTrue(n == 0); 357 assertTrue(duration > 500 && duration < 10*1000); 358 } 359 } 360 361 /** 362 * Test invoking select with interrupt status set 363 */ 364 public void testInterruptBeforeSelect() throws Exception { 365 // select(Consumer) 366 try (Selector sel = Selector.open()) { 367 Thread.currentThread().interrupt(); 368 int n = sel.select(k -> assertTrue(false)); 369 assertTrue(n == 0); 370 assertTrue(Thread.currentThread().isInterrupted()); 371 assertTrue(sel.isOpen()); 372 } finally { 373 Thread.currentThread().interrupted(); // clear interrupt status 374 } 375 376 // select(Consumer, timeout) 377 try (Selector sel = Selector.open()) { 378 Thread.currentThread().interrupt(); 379 long start = System.currentTimeMillis(); 380 int n = sel.select(k -> assertTrue(false), 60*1000); 381 long duration = System.currentTimeMillis() - start; 382 assertTrue(n == 0); 383 assertTrue(duration < 5000); 384 assertTrue(Thread.currentThread().isInterrupted()); 385 assertTrue(sel.isOpen()); 386 } finally { 387 Thread.currentThread().interrupted(); // clear interrupt status 388 } 389 } 390 391 /** 392 * Test interrupt thread during select 393 */ 394 public void testInterruptDuringSelect() throws Exception { 395 // select(Consumer) 396 try (Selector sel = Selector.open()) { 397 scheduleInterrupt(Thread.currentThread(), 1, SECONDS); 398 int n = sel.select(k -> assertTrue(false)); 399 assertTrue(n == 0); 400 assertTrue(Thread.currentThread().isInterrupted()); 401 assertTrue(sel.isOpen()); 402 } finally { 403 Thread.currentThread().interrupted(); // clear interrupt status 404 } 405 406 // select(Consumer, timeout) 407 try (Selector sel = Selector.open()) { 408 scheduleInterrupt(Thread.currentThread(), 1, SECONDS); 409 long start = System.currentTimeMillis(); 410 int n = sel.select(k -> assertTrue(false), 60*1000); 411 long duration = System.currentTimeMillis() - start; 412 assertTrue(n == 0); 413 assertTrue(duration > 500 && duration < 5000); 414 assertTrue(Thread.currentThread().isInterrupted()); 415 assertTrue(sel.isOpen()); 416 } finally { 417 Thread.currentThread().interrupted(); // clear interrupt status 418 } 419 } 420 421 /** 422 * Test invoking select on a closed selector 423 */ 424 @Test(expectedExceptions = ClosedSelectorException.class) 425 public void testClosedSelector1() throws Exception { 426 Selector sel = Selector.open(); 427 sel.close(); 428 sel.select(k -> assertTrue(false)); 429 } 430 @Test(expectedExceptions = ClosedSelectorException.class) 431 public void testClosedSelector2() throws Exception { 432 Selector sel = Selector.open(); 433 sel.close(); 434 sel.select(k -> assertTrue(false), 1000); 435 } 436 @Test(expectedExceptions = ClosedSelectorException.class) 437 public void testClosedSelector3() throws Exception { 438 Selector sel = Selector.open(); 439 sel.close(); 440 sel.selectNow(k -> assertTrue(false)); 441 } 442 443 /** 444 * Test closing selector while in a selection operation 445 */ 446 public void testCloseDuringSelect() throws Exception { 447 // select(Consumer) 448 try (Selector sel = Selector.open()) { 449 scheduleClose(sel, 3, SECONDS); 450 int n = sel.select(k -> assertTrue(false)); 451 assertTrue(n == 0); 452 assertFalse(sel.isOpen()); 453 } 454 455 // select(Consumer, timeout) 456 try (Selector sel = Selector.open()) { 457 scheduleClose(sel, 3, SECONDS); 458 long start = System.currentTimeMillis(); 459 int n = sel.select(k -> assertTrue(false), 60*1000); 460 long duration = System.currentTimeMillis() - start; 461 assertTrue(n == 0); 462 assertTrue(duration > 2000 && duration < 10*1000); 463 assertFalse(sel.isOpen()); 464 } 465 } 466 467 /** 468 * Test action closing selector 469 */ 470 @Test(expectedExceptions = ClosedSelectorException.class) 471 public void testActionClosingSelector() throws Exception { 472 Pipe p = Pipe.open(); 473 try (Selector sel = Selector.open()) { 474 Pipe.SourceChannel source = p.source(); 475 Pipe.SinkChannel sink = p.sink(); 476 source.configureBlocking(false); 477 SelectionKey key = source.register(sel, SelectionKey.OP_READ); 478 479 // write to sink to ensure that the source is readable 480 sink.write(messageBuffer()); 481 482 // should relay ClosedSelectorException 483 sel.select(k -> { 484 assertTrue(k == key); 485 try { 486 sel.close(); 487 } catch (IOException ioe) { } 488 }); 489 } finally { 490 closePipe(p); 491 } 492 } 493 494 /** 495 * Test that the action is invoked while synchronized on the selector and 496 * its selected-key set. 497 */ 498 public void testLocks() throws Exception { 499 Pipe p = Pipe.open(); 500 try (Selector sel = Selector.open()) { 501 Pipe.SourceChannel source = p.source(); 502 Pipe.SinkChannel sink = p.sink(); 503 source.configureBlocking(false); 504 SelectionKey key = source.register(sel, SelectionKey.OP_READ); 505 506 // write to sink to ensure that the source is readable 507 sink.write(messageBuffer()); 508 509 // select(Consumer) 510 sel.select(k -> { 511 assertTrue(k == key); 512 assertTrue(Thread.holdsLock(sel)); 513 assertFalse(Thread.holdsLock(sel.keys())); 514 assertTrue(Thread.holdsLock(sel.selectedKeys())); 515 }); 516 517 // select(Consumer, timeout) 518 sel.select(k -> { 519 assertTrue(k == key); 520 assertTrue(Thread.holdsLock(sel)); 521 assertFalse(Thread.holdsLock(sel.keys())); 522 assertTrue(Thread.holdsLock(sel.selectedKeys())); 523 }, 1000L); 524 525 // selectNow(Consumer) 526 sel.selectNow(k -> { 527 assertTrue(k == key); 528 assertTrue(Thread.holdsLock(sel)); 529 assertFalse(Thread.holdsLock(sel.keys())); 530 assertTrue(Thread.holdsLock(sel.selectedKeys())); 531 }); 532 } finally { 533 closePipe(p); 534 } 535 } 536 537 /** 538 * Test that selection operations removes cancelled keys from the selector's 539 * key and selected-key sets. 540 */ 541 public void testCancel() throws Exception { 542 Pipe p = Pipe.open(); 543 try (Selector sel = Selector.open()) { 544 Pipe.SinkChannel sink = p.sink(); 545 Pipe.SourceChannel source = p.source(); 546 547 // write to sink to ensure that the source is readable 548 sink.write(messageBuffer()); 549 550 sink.configureBlocking(false); 551 source.configureBlocking(false); 552 SelectionKey key1 = sink.register(sel, SelectionKey.OP_WRITE); 553 SelectionKey key2 = source.register(sel, SelectionKey.OP_READ); 554 555 sel.selectNow(); 556 assertTrue(sel.keys().contains(key1)); 557 assertTrue(sel.keys().contains(key2)); 558 assertTrue(sel.selectedKeys().contains(key1)); 559 assertTrue(sel.selectedKeys().contains(key2)); 560 561 // cancel key1 562 key1.cancel(); 563 int n = sel.selectNow(k -> assertTrue(k == key2)); 564 assertTrue(n == 1); 565 assertFalse(sel.keys().contains(key1)); 566 assertTrue(sel.keys().contains(key2)); 567 assertFalse(sel.selectedKeys().contains(key1)); 568 assertTrue(sel.selectedKeys().contains(key2)); 569 570 // cancel key2 571 key2.cancel(); 572 n = sel.selectNow(k -> assertTrue(false)); 573 assertTrue(n == 0); 574 assertFalse(sel.keys().contains(key1)); 575 assertFalse(sel.keys().contains(key2)); 576 assertFalse(sel.selectedKeys().contains(key1)); 577 assertFalse(sel.selectedKeys().contains(key2)); 578 } finally { 579 closePipe(p); 580 } 581 } 582 583 /** 584 * Test an action invoking select() 585 */ 586 public void testReentrantSelect1() throws Exception { 587 Pipe p = Pipe.open(); 588 try (Selector sel = Selector.open()) { 589 Pipe.SinkChannel sink = p.sink(); 590 Pipe.SourceChannel source = p.source(); 591 source.configureBlocking(false); 592 source.register(sel, SelectionKey.OP_READ); 593 594 // write to sink to ensure that the source is readable 595 scheduleWrite(sink, messageBuffer(), 100, MILLISECONDS); 596 597 int n = sel.select(k -> { 598 try { 599 sel.select(); 600 assertTrue(false); 601 } catch (IOException ioe) { 602 throw new RuntimeException(ioe); 603 } catch (IllegalStateException expected) { 604 } 605 }); 606 assertTrue(n == 1); 607 } finally { 608 closePipe(p); 609 } 610 } 611 612 /** 613 * Test an action invoking selectNow() 614 */ 615 public void testReentrantSelect2() throws Exception { 616 Pipe p = Pipe.open(); 617 try (Selector sel = Selector.open()) { 618 Pipe.SinkChannel sink = p.sink(); 619 Pipe.SourceChannel source = p.source(); 620 621 // write to sink to ensure that the source is readable 622 scheduleWrite(sink, messageBuffer(), 100, MILLISECONDS); 623 624 source.configureBlocking(false); 625 source.register(sel, SelectionKey.OP_READ); 626 int n = sel.select(k -> { 627 try { 628 sel.selectNow(); 629 assertTrue(false); 630 } catch (IOException ioe) { 631 throw new RuntimeException(ioe); 632 } catch (IllegalStateException expected) { 633 } 634 }); 635 assertTrue(n == 1); 636 } finally { 637 closePipe(p); 638 } 639 } 640 641 /** 642 * Test an action invoking select(Consumer) 643 */ 644 public void testReentrantSelect3() throws Exception { 645 Pipe p = Pipe.open(); 646 try (Selector sel = Selector.open()) { 647 Pipe.SinkChannel sink = p.sink(); 648 Pipe.SourceChannel source = p.source(); 649 650 // write to sink to ensure that the source is readable 651 scheduleWrite(sink, messageBuffer(), 100, MILLISECONDS); 652 653 source.configureBlocking(false); 654 source.register(sel, SelectionKey.OP_READ); 655 int n = sel.select(k -> { 656 try { 657 sel.select(x -> assertTrue(false)); 658 assertTrue(false); 659 } catch (IOException ioe) { 660 throw new RuntimeException(ioe); 661 } catch (IllegalStateException expected) { 662 } 663 }); 664 assertTrue(n == 1); 665 } finally { 666 closePipe(p); 667 } 668 } 669 670 /** 671 * Negative timeout 672 */ 673 @Test(expectedExceptions = IllegalArgumentException.class) 674 public void testNegativeTimeout() throws Exception { 675 try (Selector sel = Selector.open()) { 676 sel.select(k -> { }, -1L); 677 } 678 } 679 680 /** 681 * Null action 682 */ 683 @Test(expectedExceptions = NullPointerException.class) 684 public void testNull1() throws Exception { 685 try (Selector sel = Selector.open()) { 686 sel.select(null); 687 } 688 } 689 @Test(expectedExceptions = NullPointerException.class) 690 public void testNull2() throws Exception { 691 try (Selector sel = Selector.open()) { 692 sel.select(null, 1000); 693 } 694 } 695 @Test(expectedExceptions = NullPointerException.class) 696 public void testNull3() throws Exception { 697 try (Selector sel = Selector.open()) { 698 sel.selectNow(null); 699 } 700 } 701 702 703 // -- support methods --- 704 705 private final ScheduledExecutorService POOL = Executors.newScheduledThreadPool(1); 706 707 @AfterTest 708 void shutdownThreadPool() { 709 POOL.shutdown(); 710 } 711 712 void scheduleWakeup(Selector sel, long delay, TimeUnit unit) { 713 POOL.schedule(() -> sel.wakeup(), delay, unit); 714 } 715 716 void scheduleInterrupt(Thread t, long delay, TimeUnit unit) { 717 POOL.schedule(() -> t.interrupt(), delay, unit); 718 } 719 720 void scheduleClose(Closeable c, long delay, TimeUnit unit) { 721 POOL.schedule(() -> { 722 try { 723 c.close(); 724 } catch (IOException ioe) { 725 ioe.printStackTrace(); 726 } 727 }, delay, unit); 728 } 729 730 void scheduleWrite(WritableByteChannel sink, ByteBuffer buf, long delay, TimeUnit unit) { 731 POOL.schedule(() -> { 732 try { 733 sink.write(buf); 734 } catch (IOException ioe) { 735 ioe.printStackTrace(); 736 } 737 }, delay, unit); 738 } 739 740 static void closePipe(Pipe p) { 741 try { p.sink().close(); } catch (IOException ignore) { } 742 try { p.source().close(); } catch (IOException ignore) { } 743 } 744 745 static ByteBuffer messageBuffer() { 746 try { 747 return ByteBuffer.wrap("message".getBytes("UTF-8")); 748 } catch (Exception e) { 749 throw new RuntimeException(e); 750 } 751 } 752 }