1 /*
   2  * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 /* @test
  25  * @bug 8199433
  26  * @run testng SelectWithConsumer
  27  * @summary Unit test for Selector select(Consumer), select(Consumer,long) and
  28  *          selectNow(Consumer)
  29  */
  30 
  31 import java.io.Closeable;
  32 import java.io.IOException;
  33 import java.net.InetSocketAddress;
  34 import java.nio.ByteBuffer;
  35 import java.nio.channels.ClosedSelectorException;
  36 import java.nio.channels.Pipe;
  37 import java.nio.channels.SelectionKey;
  38 import java.nio.channels.Selector;
  39 import java.nio.channels.ServerSocketChannel;
  40 import java.nio.channels.SocketChannel;
  41 import java.nio.channels.WritableByteChannel;
  42 import java.util.concurrent.Executors;
  43 import java.util.concurrent.ScheduledExecutorService;
  44 import java.util.concurrent.TimeUnit;
  45 import java.util.concurrent.atomic.AtomicInteger;
  46 import static java.util.concurrent.TimeUnit.*;
  47 
  48 import org.testng.annotations.AfterTest;
  49 import org.testng.annotations.Test;
  50 import static org.testng.Assert.*;
  51 
  52 @Test
  53 public class SelectWithConsumer {
  54 
  55     /**
  56      * Invoke the select methods that take an action and checks that the
  57      * accumulated ready ops notified to the action matches the expected ops.
  58      */
  59     void testActionInvoked(SelectionKey key, int expectedOps) throws Exception {
  60         var callerThread = Thread.currentThread();
  61         var sel = key.selector();
  62         var interestOps = key.interestOps();
  63         var notifiedOps = new AtomicInteger();
  64 
  65         // select(Consumer)
  66         if (expectedOps == 0)
  67             sel.wakeup(); // ensure select does not block
  68         notifiedOps.set(0);
  69         int n = sel.select(k -> {
  70             assertTrue(Thread.currentThread() == callerThread);
  71             assertTrue(k == key);
  72             int readyOps = key.readyOps();
  73             assertTrue((readyOps & interestOps) != 0);
  74             assertTrue((readyOps & notifiedOps.get()) == 0);
  75             notifiedOps.set(notifiedOps.get() | readyOps);
  76         });
  77         assertTrue((n == 1) ^ (expectedOps == 0));
  78         assertTrue(notifiedOps.get() == expectedOps);
  79 
  80         // select(Consumer, timeout)
  81         notifiedOps.set(0);
  82         n = sel.select(k -> {
  83             assertTrue(Thread.currentThread() == callerThread);
  84             assertTrue(k == key);
  85             int readyOps = key.readyOps();
  86             assertTrue((readyOps & interestOps) != 0);
  87             assertTrue((readyOps & notifiedOps.get()) == 0);
  88             notifiedOps.set(notifiedOps.get() | readyOps);
  89         }, 1000);
  90         assertTrue((n == 1) ^ (expectedOps == 0));
  91         assertTrue(notifiedOps.get() == expectedOps);
  92 
  93         // selectNow(Consumer)
  94         notifiedOps.set(0);
  95         n = sel.selectNow(k -> {
  96             assertTrue(Thread.currentThread() == callerThread);
  97             assertTrue(k == key);
  98             int readyOps = key.readyOps();
  99             assertTrue((readyOps & interestOps) != 0);
 100             assertTrue((readyOps & notifiedOps.get()) == 0);
 101             notifiedOps.set(notifiedOps.get() | readyOps);
 102         });
 103         assertTrue((n == 1) ^ (expectedOps == 0));
 104         assertTrue(notifiedOps.get() == expectedOps);
 105     }
 106 
 107     /**
 108      * Test that an action is performed when a channel is ready for reading.
 109      */
 110     public void testReadable() throws Exception {
 111         Pipe p = Pipe.open();
 112         try (Selector sel = Selector.open()) {
 113             Pipe.SinkChannel sink = p.sink();
 114             Pipe.SourceChannel source = p.source();
 115             source.configureBlocking(false);
 116             SelectionKey key = source.register(sel, SelectionKey.OP_READ);
 117 
 118             // write to sink to ensure source is readable
 119             scheduleWrite(sink, messageBuffer(), 100, MILLISECONDS);
 120 
 121             // test that action is invoked
 122             testActionInvoked(key, SelectionKey.OP_READ);
 123         } finally {
 124             closePipe(p);
 125         }
 126     }
 127 
 128     /**
 129      * Test that an action is performed when a channel is ready for writing.
 130      */
 131     public void testWritable() throws Exception {
 132         Pipe p = Pipe.open();
 133         try (Selector sel = Selector.open()) {
 134             Pipe.SourceChannel source = p.source();
 135             Pipe.SinkChannel sink = p.sink();
 136             sink.configureBlocking(false);
 137             SelectionKey key = sink.register(sel, SelectionKey.OP_WRITE);
 138 
 139             // test that action is invoked
 140             testActionInvoked(key, SelectionKey.OP_WRITE);
 141         } finally {
 142             closePipe(p);
 143         }
 144     }
 145 
 146     /**
 147      * Test that an action is performed when a channel is ready for both
 148      * reading and writing.
 149      */
 150     public void testReadableAndWriteable() throws Exception {
 151         ServerSocketChannel ssc = null;
 152         SocketChannel sc = null;
 153         SocketChannel peer = null;
 154         try (Selector sel = Selector.open()) {
 155             ssc = ServerSocketChannel.open().bind(new InetSocketAddress(0));
 156             sc = SocketChannel.open(ssc.getLocalAddress());
 157             sc.configureBlocking(false);
 158             SelectionKey key = sc.register(sel, (SelectionKey.OP_READ |
 159                                                  SelectionKey.OP_WRITE));
 160 
 161             // accept connection and write data so the source is readable
 162             peer = ssc.accept();
 163             peer.write(messageBuffer());
 164 
 165             // test that action is invoked
 166             testActionInvoked(key, (SelectionKey.OP_READ | SelectionKey.OP_WRITE));
 167         } finally {
 168             if (ssc != null) ssc.close();
 169             if (sc != null) sc.close();
 170             if (peer != null) peer.close();
 171         }
 172     }
 173 
 174     /**
 175      * Test that the action is called for two selected channels
 176      */
 177     public void testTwoChannels() throws Exception {
 178         Pipe p = Pipe.open();
 179         try (Selector sel = Selector.open()) {
 180             Pipe.SourceChannel source = p.source();
 181             Pipe.SinkChannel sink = p.sink();
 182             source.configureBlocking(false);
 183             sink.configureBlocking(false);
 184             SelectionKey key1 = source.register(sel, SelectionKey.OP_READ);
 185             SelectionKey key2 = sink.register(sel, SelectionKey.OP_WRITE);
 186 
 187             // write to sink to ensure that the source is readable
 188             sink.write(messageBuffer());
 189 
 190             var counter = new AtomicInteger();
 191 
 192             // select(Consumer)
 193             counter.set(0);
 194             int n = sel.select(k -> {
 195                 counter.incrementAndGet();
 196                 if (k == key1) {
 197                     assertTrue(k.isReadable());
 198                 } else if (k == key2) {
 199                     assertTrue(k.isWritable());
 200                 } else {
 201                     assertTrue(false);
 202                 }
 203             });
 204             assertTrue(n == 2);
 205             assertTrue(counter.get() == 2);
 206 
 207             // select(Consumer, timeout)
 208             counter.set(0);
 209             n = sel.select(k -> {
 210                 counter.incrementAndGet();
 211                 if (k == key1) {
 212                     assertTrue(k.isReadable());
 213                 } else if (k == key2) {
 214                     assertTrue(k.isWritable());
 215                 } else {
 216                     assertTrue(false);
 217                 }
 218             }, 1000);
 219             assertTrue(n == 2);
 220             assertTrue(counter.get() == 2);
 221 
 222             // selectNow(Consumer)
 223             counter.set(0);
 224             n = sel.selectNow(k -> {
 225                 counter.incrementAndGet();
 226                 if (k == key1) {
 227                     assertTrue(k.isReadable());
 228                 } else if (k == key2) {
 229                     assertTrue(k.isWritable());
 230                 } else {
 231                     assertTrue(false);
 232                 }
 233             });
 234             assertTrue(n == 2);
 235             assertTrue(counter.get() == 2);
 236         } finally {
 237             closePipe(p);
 238         }
 239     }
 240 
 241     /**
 242      * Test calling select twice, the action should be invoked each time
 243      */
 244     public void testRepeatedSelect1() throws Exception {
 245         Pipe p = Pipe.open();
 246         try (Selector sel = Selector.open()) {
 247             Pipe.SourceChannel source = p.source();
 248             Pipe.SinkChannel sink = p.sink();
 249             source.configureBlocking(false);
 250             SelectionKey key = source.register(sel, SelectionKey.OP_READ);
 251 
 252             // write to sink to ensure that the source is readable
 253             sink.write(messageBuffer());
 254 
 255             // test that action is invoked
 256             testActionInvoked(key, SelectionKey.OP_READ);
 257             testActionInvoked(key, SelectionKey.OP_READ);
 258 
 259         } finally {
 260             closePipe(p);
 261         }
 262     }
 263 
 264     /**
 265      * Test calling select twice. An I/O operation is performed after the
 266      * first select so the channel will not be selected by the second select.
 267      */
 268     public void testRepeatedSelect2() throws Exception {
 269         Pipe p = Pipe.open();
 270         try (Selector sel = Selector.open()) {
 271             Pipe.SourceChannel source = p.source();
 272             Pipe.SinkChannel sink = p.sink();
 273             source.configureBlocking(false);
 274             SelectionKey key = source.register(sel, SelectionKey.OP_READ);
 275 
 276             // write to sink to ensure that the source is readable
 277             sink.write(messageBuffer());
 278 
 279             // test that action is invoked
 280             testActionInvoked(key, SelectionKey.OP_READ);
 281 
 282             // read all bytes
 283             int n;
 284             ByteBuffer bb = ByteBuffer.allocate(100);
 285             do {
 286                 n = source.read(bb);
 287                 bb.clear();
 288             } while (n > 0);
 289 
 290             // test that action is not invoked
 291             testActionInvoked(key, 0);
 292         } finally {
 293             closePipe(p);
 294         }
 295     }
 296 
 297     /**
 298      * Test timeout
 299      */
 300     public void testTimeout() throws Exception {
 301         Pipe p = Pipe.open();
 302         try (Selector sel = Selector.open()) {
 303             Pipe.SourceChannel source = p.source();
 304             Pipe.SinkChannel sink = p.sink();
 305             source.configureBlocking(false);
 306             source.register(sel, SelectionKey.OP_READ);
 307             long start = System.currentTimeMillis();
 308             int n = sel.select(k -> assertTrue(false), 1000L);
 309             long duration = System.currentTimeMillis() - start;
 310             assertTrue(n == 0);
 311             assertTrue(duration > 500L);
 312         } finally {
 313             closePipe(p);
 314         }
 315     }
 316 
 317     /**
 318      * Test wakeup prior to select
 319      */
 320     public void testWakeupBeforeSelect() throws Exception {
 321         // select(Consumer)
 322         try (Selector sel = Selector.open()) {
 323             sel.wakeup();
 324             int n = sel.select(k -> assertTrue(false));
 325             assertTrue(n == 0);
 326         }
 327 
 328         // select(Consumer, timeout)
 329         try (Selector sel = Selector.open()) {
 330             sel.wakeup();
 331             long start = System.currentTimeMillis();
 332             int n = sel.select(k -> assertTrue(false), 60*1000);
 333             long duration = System.currentTimeMillis() - start;
 334             assertTrue(n == 0);
 335             assertTrue(duration < 5000);
 336         }
 337     }
 338 
 339     /**
 340      * Test wakeup during select
 341      */
 342     public void testWakeupDuringSelect() throws Exception {
 343         // select(Consumer)
 344         try (Selector sel = Selector.open()) {
 345             scheduleWakeup(sel, 1, SECONDS);
 346             int n = sel.select(k -> assertTrue(false));
 347             assertTrue(n == 0);
 348         }
 349 
 350         // select(Consumer, timeout)
 351         try (Selector sel = Selector.open()) {
 352             scheduleWakeup(sel, 1, SECONDS);
 353             long start = System.currentTimeMillis();
 354             int n = sel.select(k -> assertTrue(false), 60*1000);
 355             long duration = System.currentTimeMillis() - start;
 356             assertTrue(n == 0);
 357             assertTrue(duration > 500 && duration < 10*1000);
 358         }
 359     }
 360 
 361     /**
 362      * Test invoking select with interrupt status set
 363      */
 364     public void testInterruptBeforeSelect() throws Exception {
 365         // select(Consumer)
 366         try (Selector sel = Selector.open()) {
 367             Thread.currentThread().interrupt();
 368             int n = sel.select(k -> assertTrue(false));
 369             assertTrue(n == 0);
 370             assertTrue(Thread.currentThread().isInterrupted());
 371             assertTrue(sel.isOpen());
 372         } finally {
 373             Thread.currentThread().interrupted();  // clear interrupt status
 374         }
 375 
 376         // select(Consumer, timeout)
 377         try (Selector sel = Selector.open()) {
 378             Thread.currentThread().interrupt();
 379             long start = System.currentTimeMillis();
 380             int n = sel.select(k -> assertTrue(false), 60*1000);
 381             long duration = System.currentTimeMillis() - start;
 382             assertTrue(n == 0);
 383             assertTrue(duration < 5000);
 384             assertTrue(Thread.currentThread().isInterrupted());
 385             assertTrue(sel.isOpen());
 386         } finally {
 387             Thread.currentThread().interrupted();  // clear interrupt status
 388         }
 389     }
 390 
 391     /**
 392      * Test interrupt thread during select
 393      */
 394     public void testInterruptDuringSelect() throws Exception {
 395         // select(Consumer)
 396         try (Selector sel = Selector.open()) {
 397             scheduleInterrupt(Thread.currentThread(), 1, SECONDS);
 398             int n = sel.select(k -> assertTrue(false));
 399             assertTrue(n == 0);
 400             assertTrue(Thread.currentThread().isInterrupted());
 401             assertTrue(sel.isOpen());
 402         } finally {
 403             Thread.currentThread().interrupted();  // clear interrupt status
 404         }
 405 
 406         // select(Consumer, timeout)
 407         try (Selector sel = Selector.open()) {
 408             scheduleInterrupt(Thread.currentThread(), 1, SECONDS);
 409             long start = System.currentTimeMillis();
 410             int n = sel.select(k -> assertTrue(false), 60*1000);
 411             long duration = System.currentTimeMillis() - start;
 412             assertTrue(n == 0);
 413             assertTrue(duration > 500 && duration < 5000);
 414             assertTrue(Thread.currentThread().isInterrupted());
 415             assertTrue(sel.isOpen());
 416         } finally {
 417             Thread.currentThread().interrupted();  // clear interrupt status
 418         }
 419     }
 420 
 421     /**
 422      * Test invoking select on a closed selector
 423      */
 424     @Test(expectedExceptions = ClosedSelectorException.class)
 425     public void testClosedSelector1() throws Exception {
 426         Selector sel = Selector.open();
 427         sel.close();
 428         sel.select(k -> assertTrue(false));
 429     }
 430     @Test(expectedExceptions = ClosedSelectorException.class)
 431     public void testClosedSelector2() throws Exception {
 432         Selector sel = Selector.open();
 433         sel.close();
 434         sel.select(k -> assertTrue(false), 1000);
 435     }
 436     @Test(expectedExceptions = ClosedSelectorException.class)
 437     public void testClosedSelector3() throws Exception {
 438         Selector sel = Selector.open();
 439         sel.close();
 440         sel.selectNow(k -> assertTrue(false));
 441     }
 442 
 443     /**
 444      * Test closing selector while in a selection operation
 445      */
 446     public void testCloseDuringSelect() throws Exception {
 447         // select(Consumer)
 448         try (Selector sel = Selector.open()) {
 449             scheduleClose(sel, 3, SECONDS);
 450             int n = sel.select(k -> assertTrue(false));
 451             assertTrue(n == 0);
 452             assertFalse(sel.isOpen());
 453         }
 454 
 455         // select(Consumer, timeout)
 456         try (Selector sel = Selector.open()) {
 457             scheduleClose(sel, 3, SECONDS);
 458             long start = System.currentTimeMillis();
 459             int n = sel.select(k -> assertTrue(false), 60*1000);
 460             long duration = System.currentTimeMillis() - start;
 461             assertTrue(n == 0);
 462             assertTrue(duration > 2000 && duration < 10*1000);
 463             assertFalse(sel.isOpen());
 464         }
 465     }
 466 
 467     /**
 468      * Test action closing selector
 469      */
 470     @Test(expectedExceptions = ClosedSelectorException.class)
 471     public void testActionClosingSelector() throws Exception {
 472         Pipe p = Pipe.open();
 473         try (Selector sel = Selector.open()) {
 474             Pipe.SourceChannel source = p.source();
 475             Pipe.SinkChannel sink = p.sink();
 476             source.configureBlocking(false);
 477             SelectionKey key = source.register(sel, SelectionKey.OP_READ);
 478 
 479             // write to sink to ensure that the source is readable
 480             sink.write(messageBuffer());
 481 
 482             // should relay ClosedSelectorException
 483             sel.select(k -> {
 484                 assertTrue(k == key);
 485                 try {
 486                     sel.close();
 487                 } catch (IOException ioe) { }
 488             });
 489         } finally {
 490             closePipe(p);
 491         }
 492     }
 493 
 494     /**
 495      * Test that the action is invoked while synchronized on the selector and
 496      * its selected-key set.
 497      */
 498     public void testLocks() throws Exception {
 499         Pipe p = Pipe.open();
 500         try (Selector sel = Selector.open()) {
 501             Pipe.SourceChannel source = p.source();
 502             Pipe.SinkChannel sink = p.sink();
 503             source.configureBlocking(false);
 504             SelectionKey key = source.register(sel, SelectionKey.OP_READ);
 505 
 506             // write to sink to ensure that the source is readable
 507             sink.write(messageBuffer());
 508 
 509             // select(Consumer)
 510             sel.select(k -> {
 511                 assertTrue(k == key);
 512                 assertTrue(Thread.holdsLock(sel));
 513                 assertFalse(Thread.holdsLock(sel.keys()));
 514                 assertTrue(Thread.holdsLock(sel.selectedKeys()));
 515             });
 516 
 517             // select(Consumer, timeout)
 518             sel.select(k -> {
 519                 assertTrue(k == key);
 520                 assertTrue(Thread.holdsLock(sel));
 521                 assertFalse(Thread.holdsLock(sel.keys()));
 522                 assertTrue(Thread.holdsLock(sel.selectedKeys()));
 523             }, 1000L);
 524 
 525             // selectNow(Consumer)
 526             sel.selectNow(k -> {
 527                 assertTrue(k == key);
 528                 assertTrue(Thread.holdsLock(sel));
 529                 assertFalse(Thread.holdsLock(sel.keys()));
 530                 assertTrue(Thread.holdsLock(sel.selectedKeys()));
 531             });
 532         } finally {
 533             closePipe(p);
 534         }
 535     }
 536 
 537     /**
 538      * Test that selection operations removes cancelled keys from the selector's
 539      * key and selected-key sets.
 540      */
 541     public void testCancel() throws Exception {
 542         Pipe p = Pipe.open();
 543         try (Selector sel = Selector.open()) {
 544             Pipe.SinkChannel sink = p.sink();
 545             Pipe.SourceChannel source = p.source();
 546 
 547             // write to sink to ensure that the source is readable
 548             sink.write(messageBuffer());
 549 
 550             sink.configureBlocking(false);
 551             source.configureBlocking(false);
 552             SelectionKey key1 = sink.register(sel, SelectionKey.OP_WRITE);
 553             SelectionKey key2 = source.register(sel, SelectionKey.OP_READ);
 554 
 555             sel.selectNow();
 556             assertTrue(sel.keys().contains(key1));
 557             assertTrue(sel.keys().contains(key2));
 558             assertTrue(sel.selectedKeys().contains(key1));
 559             assertTrue(sel.selectedKeys().contains(key2));
 560 
 561             // cancel key1
 562             key1.cancel();
 563             int n = sel.selectNow(k -> assertTrue(k == key2));
 564             assertTrue(n == 1);
 565             assertFalse(sel.keys().contains(key1));
 566             assertTrue(sel.keys().contains(key2));
 567             assertFalse(sel.selectedKeys().contains(key1));
 568             assertTrue(sel.selectedKeys().contains(key2));
 569 
 570             // cancel key2
 571             key2.cancel();
 572             n = sel.selectNow(k -> assertTrue(false));
 573             assertTrue(n == 0);
 574             assertFalse(sel.keys().contains(key1));
 575             assertFalse(sel.keys().contains(key2));
 576             assertFalse(sel.selectedKeys().contains(key1));
 577             assertFalse(sel.selectedKeys().contains(key2));
 578         } finally {
 579             closePipe(p);
 580         }
 581     }
 582 
 583     /**
 584      * Test an action invoking select()
 585      */
 586     public void testReentrantSelect1() throws Exception {
 587         Pipe p = Pipe.open();
 588         try (Selector sel = Selector.open()) {
 589             Pipe.SinkChannel sink = p.sink();
 590             Pipe.SourceChannel source = p.source();
 591             source.configureBlocking(false);
 592             source.register(sel, SelectionKey.OP_READ);
 593 
 594             // write to sink to ensure that the source is readable
 595             scheduleWrite(sink, messageBuffer(), 100, MILLISECONDS);
 596 
 597             int n = sel.select(k -> {
 598                 try {
 599                     sel.select();
 600                     assertTrue(false);
 601                 } catch (IOException ioe) {
 602                     throw new RuntimeException(ioe);
 603                 } catch (IllegalStateException expected) {
 604                 }
 605             });
 606             assertTrue(n == 1);
 607         } finally {
 608             closePipe(p);
 609         }
 610     }
 611 
 612     /**
 613      * Test an action invoking selectNow()
 614      */
 615     public void testReentrantSelect2() throws Exception {
 616         Pipe p = Pipe.open();
 617         try (Selector sel = Selector.open()) {
 618             Pipe.SinkChannel sink = p.sink();
 619             Pipe.SourceChannel source = p.source();
 620 
 621             // write to sink to ensure that the source is readable
 622             scheduleWrite(sink, messageBuffer(), 100, MILLISECONDS);
 623 
 624             source.configureBlocking(false);
 625             source.register(sel, SelectionKey.OP_READ);
 626             int n = sel.select(k -> {
 627                 try {
 628                     sel.selectNow();
 629                     assertTrue(false);
 630                 } catch (IOException ioe) {
 631                     throw new RuntimeException(ioe);
 632                 } catch (IllegalStateException expected) {
 633                 }
 634             });
 635             assertTrue(n == 1);
 636         } finally {
 637             closePipe(p);
 638         }
 639     }
 640 
 641     /**
 642      * Test an action invoking select(Consumer)
 643      */
 644     public void testReentrantSelect3() throws Exception {
 645         Pipe p = Pipe.open();
 646         try (Selector sel = Selector.open()) {
 647             Pipe.SinkChannel sink = p.sink();
 648             Pipe.SourceChannel source = p.source();
 649 
 650             // write to sink to ensure that the source is readable
 651             scheduleWrite(sink, messageBuffer(), 100, MILLISECONDS);
 652 
 653             source.configureBlocking(false);
 654             source.register(sel, SelectionKey.OP_READ);
 655             int n = sel.select(k -> {
 656                 try {
 657                     sel.select(x -> assertTrue(false));
 658                     assertTrue(false);
 659                 } catch (IOException ioe) {
 660                     throw new RuntimeException(ioe);
 661                 } catch (IllegalStateException expected) {
 662                 }
 663             });
 664             assertTrue(n == 1);
 665         } finally {
 666             closePipe(p);
 667         }
 668     }
 669 
 670     /**
 671      * Negative timeout
 672      */
 673     @Test(expectedExceptions = IllegalArgumentException.class)
 674     public void testNegativeTimeout() throws Exception {
 675         try (Selector sel = Selector.open()) {
 676             sel.select(k -> { }, -1L);
 677         }
 678     }
 679 
 680     /**
 681      * Null action
 682      */
 683     @Test(expectedExceptions = NullPointerException.class)
 684     public void testNull1() throws Exception {
 685         try (Selector sel = Selector.open()) {
 686             sel.select(null);
 687         }
 688     }
 689     @Test(expectedExceptions = NullPointerException.class)
 690     public void testNull2() throws Exception {
 691         try (Selector sel = Selector.open()) {
 692             sel.select(null, 1000);
 693         }
 694     }
 695     @Test(expectedExceptions = NullPointerException.class)
 696     public void testNull3() throws Exception {
 697         try (Selector sel = Selector.open()) {
 698             sel.selectNow(null);
 699         }
 700     }
 701 
 702 
 703     // -- support methods ---
 704 
 705     private final ScheduledExecutorService POOL = Executors.newScheduledThreadPool(1);
 706 
 707     @AfterTest
 708     void shutdownThreadPool() {
 709         POOL.shutdown();
 710     }
 711 
 712     void scheduleWakeup(Selector sel, long delay, TimeUnit unit) {
 713         POOL.schedule(() -> sel.wakeup(), delay, unit);
 714     }
 715 
 716     void scheduleInterrupt(Thread t, long delay, TimeUnit unit) {
 717         POOL.schedule(() -> t.interrupt(), delay, unit);
 718     }
 719 
 720     void scheduleClose(Closeable c, long delay, TimeUnit unit) {
 721         POOL.schedule(() -> {
 722             try {
 723                 c.close();
 724             } catch (IOException ioe) {
 725                 ioe.printStackTrace();
 726             }
 727         }, delay, unit);
 728     }
 729 
 730     void scheduleWrite(WritableByteChannel sink, ByteBuffer buf, long delay, TimeUnit unit) {
 731         POOL.schedule(() -> {
 732             try {
 733                 sink.write(buf);
 734             } catch (IOException ioe) {
 735                 ioe.printStackTrace();
 736             }
 737         }, delay, unit);
 738     }
 739 
 740     static void closePipe(Pipe p) {
 741         try { p.sink().close(); } catch (IOException ignore) { }
 742         try { p.source().close(); } catch (IOException ignore) { }
 743     }
 744 
 745     static ByteBuffer messageBuffer() {
 746         try {
 747             return ByteBuffer.wrap("message".getBytes("UTF-8"));
 748         } catch (Exception e) {
 749             throw new RuntimeException(e);
 750         }
 751     }
 752 }