1 /*
   2  * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 /* @test
  25  * @bug 8199433
  26  * @run testng SelectWithConsumer
  27  * @summary Unit test for Selector select(Consumer), select(Consumer,long) and
  28  *          selectNow(Consumer)
  29  */
  30 
  31 import java.io.Closeable;
  32 import java.io.IOException;
  33 import java.net.InetSocketAddress;
  34 import java.nio.ByteBuffer;
  35 import java.nio.channels.ClosedSelectorException;
  36 import java.nio.channels.Pipe;
  37 import java.nio.channels.SelectionKey;
  38 import java.nio.channels.Selector;
  39 import java.nio.channels.ServerSocketChannel;
  40 import java.nio.channels.SocketChannel;
  41 import java.nio.channels.WritableByteChannel;
  42 import java.util.concurrent.Executors;
  43 import java.util.concurrent.ScheduledExecutorService;
  44 import java.util.concurrent.TimeUnit;
  45 import java.util.concurrent.atomic.AtomicInteger;
  46 import static java.util.concurrent.TimeUnit.*;
  47 
  48 import org.testng.annotations.AfterTest;
  49 import org.testng.annotations.Test;
  50 import static org.testng.Assert.*;
  51 
  52 @Test
  53 public class SelectWithConsumer {
  54 
  55     /**
  56      * Invoke the select methods that take an action and check that the
  57      * accumulated ready ops notified to the action matches the expected ops.
  58      */
  59     void testActionInvoked(SelectionKey key, int expectedOps) throws Exception {
  60         var callerThread = Thread.currentThread();
  61         var sel = key.selector();
  62         var interestOps = key.interestOps();
  63         var notifiedOps = new AtomicInteger();
  64 
  65         // select(Consumer)
  66         if (expectedOps == 0)
  67             sel.wakeup(); // ensure select does not block
  68         notifiedOps.set(0);
  69         int n = sel.select(k -> {
  70             assertTrue(Thread.currentThread() == callerThread);
  71             assertTrue(k == key);
  72             int readyOps = key.readyOps();
  73             assertTrue((readyOps & interestOps) != 0);
  74             assertTrue((readyOps & notifiedOps.get()) == 0);
  75             notifiedOps.set(notifiedOps.get() | readyOps);
  76         });
  77         assertTrue((n == 1) ^ (expectedOps == 0));
  78         assertTrue(notifiedOps.get() == expectedOps);
  79 
  80         // select(Consumer, timeout)
  81         notifiedOps.set(0);
  82         n = sel.select(k -> {
  83             assertTrue(Thread.currentThread() == callerThread);
  84             assertTrue(k == key);
  85             int readyOps = key.readyOps();
  86             assertTrue((readyOps & interestOps) != 0);
  87             assertTrue((readyOps & notifiedOps.get()) == 0);
  88             notifiedOps.set(notifiedOps.get() | readyOps);
  89         }, 1000);
  90         assertTrue((n == 1) ^ (expectedOps == 0));
  91         assertTrue(notifiedOps.get() == expectedOps);
  92 
  93         // selectNow(Consumer)
  94         notifiedOps.set(0);
  95         n = sel.selectNow(k -> {
  96             assertTrue(Thread.currentThread() == callerThread);
  97             assertTrue(k == key);
  98             int readyOps = key.readyOps();
  99             assertTrue((readyOps & interestOps) != 0);
 100             assertTrue((readyOps & notifiedOps.get()) == 0);
 101             notifiedOps.set(notifiedOps.get() | readyOps);
 102         });
 103         assertTrue((n == 1) ^ (expectedOps == 0));
 104         assertTrue(notifiedOps.get() == expectedOps);
 105     }
 106 
 107     /**
 108      * Test that an action is performed when a channel is ready for reading.
 109      */
 110     public void testReadable() throws Exception {
 111         Pipe p = Pipe.open();
 112         try (Selector sel = Selector.open()) {
 113             Pipe.SinkChannel sink = p.sink();
 114             Pipe.SourceChannel source = p.source();
 115             source.configureBlocking(false);
 116             SelectionKey key = source.register(sel, SelectionKey.OP_READ);
 117 
 118             // write to sink to ensure source is readable
 119             scheduleWrite(sink, messageBuffer(), 100, MILLISECONDS);
 120 
 121             // test that action is invoked
 122             testActionInvoked(key, SelectionKey.OP_READ);
 123         } finally {
 124             closePipe(p);
 125         }
 126     }
 127 
 128     /**
 129      * Test that an action is performed when a channel is ready for writing.
 130      */
 131     public void testWritable() throws Exception {
 132         Pipe p = Pipe.open();
 133         try (Selector sel = Selector.open()) {
 134             Pipe.SourceChannel source = p.source();
 135             Pipe.SinkChannel sink = p.sink();
 136             sink.configureBlocking(false);
 137             SelectionKey key = sink.register(sel, SelectionKey.OP_WRITE);
 138 
 139             // test that action is invoked
 140             testActionInvoked(key, SelectionKey.OP_WRITE);
 141         } finally {
 142             closePipe(p);
 143         }
 144     }
 145 
 146     /**
 147      * Test that an action is performed when a channel is ready for both
 148      * reading and writing.
 149      */
 150     public void testReadableAndWriteable() throws Exception {
 151         ServerSocketChannel ssc = null;
 152         SocketChannel sc = null;
 153         SocketChannel peer = null;
 154         try (Selector sel = Selector.open()) {
 155             ssc = ServerSocketChannel.open().bind(new InetSocketAddress(0));
 156             sc = SocketChannel.open(ssc.getLocalAddress());
 157             sc.configureBlocking(false);
 158             SelectionKey key = sc.register(sel, (SelectionKey.OP_READ |
 159                                                  SelectionKey.OP_WRITE));
 160 
 161             // accept connection and write data so the source is readable
 162             peer = ssc.accept();
 163             peer.write(messageBuffer());
 164 
 165             // test that action is invoked
 166             testActionInvoked(key, (SelectionKey.OP_READ | SelectionKey.OP_WRITE));
 167         } finally {
 168             if (ssc != null) ssc.close();
 169             if (sc != null) sc.close();
 170             if (peer != null) peer.close();
 171         }
 172     }
 173 
 174     /**
 175      * Test that the action is called for two selected channels
 176      */
 177     public void testTwoChannels() throws Exception {
 178         Pipe p = Pipe.open();
 179         try (Selector sel = Selector.open()) {
 180             Pipe.SourceChannel source = p.source();
 181             Pipe.SinkChannel sink = p.sink();
 182             source.configureBlocking(false);
 183             sink.configureBlocking(false);
 184             SelectionKey key1 = source.register(sel, SelectionKey.OP_READ);
 185             SelectionKey key2 = sink.register(sel, SelectionKey.OP_WRITE);
 186 
 187             // write to sink to ensure that the source is readable
 188             sink.write(messageBuffer());
 189 
 190             var counter = new AtomicInteger();
 191 
 192             // select(Consumer)
 193             counter.set(0);
 194             int n = sel.select(k -> {
 195                 counter.incrementAndGet();
 196                 if (k == key1) {
 197                     assertTrue(k.isReadable());
 198                 } else if (k == key2) {
 199                     assertTrue(k.isWritable());
 200                 } else {
 201                     assertTrue(false);
 202                 }
 203             });
 204             assertTrue(n == 2);
 205             assertTrue(counter.get() == 2);
 206 
 207             // select(Consumer, timeout)
 208             counter.set(0);
 209             n = sel.select(k -> {
 210                 counter.incrementAndGet();
 211                 if (k == key1) {
 212                     assertTrue(k.isReadable());
 213                 } else if (k == key2) {
 214                     assertTrue(k.isWritable());
 215                 } else {
 216                     assertTrue(false);
 217                 }
 218             }, 1000);
 219             assertTrue(n == 2);
 220             assertTrue(counter.get() == 2);
 221 
 222             // selectNow(Consumer)
 223             counter.set(0);
 224             n = sel.selectNow(k -> {
 225                 counter.incrementAndGet();
 226                 if (k == key1) {
 227                     assertTrue(k.isReadable());
 228                 } else if (k == key2) {
 229                     assertTrue(k.isWritable());
 230                 } else {
 231                     assertTrue(false);
 232                 }
 233             });
 234             assertTrue(n == 2);
 235             assertTrue(counter.get() == 2);
 236         } finally {
 237             closePipe(p);
 238         }
 239     }
 240 
 241     /**
 242      * Test calling select twice, the action should be invoked each time
 243      */
 244     public void testRepeatedSelect1() throws Exception {
 245         Pipe p = Pipe.open();
 246         try (Selector sel = Selector.open()) {
 247             Pipe.SourceChannel source = p.source();
 248             Pipe.SinkChannel sink = p.sink();
 249             source.configureBlocking(false);
 250             SelectionKey key = source.register(sel, SelectionKey.OP_READ);
 251 
 252             // write to sink to ensure that the source is readable
 253             sink.write(messageBuffer());
 254 
 255             // test that action is invoked
 256             testActionInvoked(key, SelectionKey.OP_READ);
 257             testActionInvoked(key, SelectionKey.OP_READ);
 258 
 259         } finally {
 260             closePipe(p);
 261         }
 262     }
 263 
 264     /**
 265      * Test calling select twice. An I/O operation is performed after the
 266      * first select so the channel will not be selected by the second select.
 267      */
 268     public void testRepeatedSelect2() throws Exception {
 269         Pipe p = Pipe.open();
 270         try (Selector sel = Selector.open()) {
 271             Pipe.SourceChannel source = p.source();
 272             Pipe.SinkChannel sink = p.sink();
 273             source.configureBlocking(false);
 274             SelectionKey key = source.register(sel, SelectionKey.OP_READ);
 275 
 276             // write to sink to ensure that the source is readable
 277             sink.write(messageBuffer());
 278 
 279             // test that action is invoked
 280             testActionInvoked(key, SelectionKey.OP_READ);
 281 
 282             // read all bytes
 283             int n;
 284             ByteBuffer bb = ByteBuffer.allocate(100);
 285             do {
 286                 n = source.read(bb);
 287                 bb.clear();
 288             } while (n > 0);
 289 
 290             // test that action is not invoked
 291             testActionInvoked(key, 0);
 292         } finally {
 293             closePipe(p);
 294         }
 295     }
 296 
 297     /**
 298      * Test timeout
 299      */
 300     public void testTimeout() throws Exception {
 301         Pipe p = Pipe.open();
 302         try (Selector sel = Selector.open()) {
 303             Pipe.SourceChannel source = p.source();
 304             Pipe.SinkChannel sink = p.sink();
 305             source.configureBlocking(false);
 306             source.register(sel, SelectionKey.OP_READ);
 307             long start = System.currentTimeMillis();
 308             int n = sel.select(k -> assertTrue(false), 1000L);
 309             long duration = System.currentTimeMillis() - start;
 310             assertTrue(n == 0);
 311             assertTrue(duration > 500, "select took " + duration + " ms");
 312         } finally {
 313             closePipe(p);
 314         }
 315     }
 316 
 317     /**
 318      * Test wakeup prior to select
 319      */
 320     public void testWakeupBeforeSelect() throws Exception {
 321         // select(Consumer)
 322         try (Selector sel = Selector.open()) {
 323             sel.wakeup();
 324             int n = sel.select(k -> assertTrue(false));
 325             assertTrue(n == 0);
 326         }
 327 
 328         // select(Consumer, timeout)
 329         try (Selector sel = Selector.open()) {
 330             sel.wakeup();
 331             long start = System.currentTimeMillis();
 332             int n = sel.select(k -> assertTrue(false), 60*1000);
 333             long duration = System.currentTimeMillis() - start;
 334             assertTrue(n == 0);
 335             assertTrue(duration < 5000, "select took " + duration + " ms");
 336         }
 337     }
 338 
 339     /**
 340      * Test wakeup during select
 341      */
 342     public void testWakeupDuringSelect() throws Exception {
 343         // select(Consumer)
 344         try (Selector sel = Selector.open()) {
 345             scheduleWakeup(sel, 1, SECONDS);
 346             int n = sel.select(k -> assertTrue(false));
 347             assertTrue(n == 0);
 348         }
 349 
 350         // select(Consumer, timeout)
 351         try (Selector sel = Selector.open()) {
 352             scheduleWakeup(sel, 1, SECONDS);
 353             long start = System.currentTimeMillis();
 354             int n = sel.select(k -> assertTrue(false), 60*1000);
 355             long duration = System.currentTimeMillis() - start;
 356             assertTrue(n == 0);
 357             assertTrue(duration > 500 && duration < 10*1000,
 358                     "select took " + duration + " ms");
 359         }
 360     }
 361 
 362     /**
 363      * Test invoking select with interrupt status set
 364      */
 365     public void testInterruptBeforeSelect() throws Exception {
 366         // select(Consumer)
 367         try (Selector sel = Selector.open()) {
 368             Thread.currentThread().interrupt();
 369             int n = sel.select(k -> assertTrue(false));
 370             assertTrue(n == 0);
 371             assertTrue(Thread.currentThread().isInterrupted());
 372             assertTrue(sel.isOpen());
 373         } finally {
 374             Thread.currentThread().interrupted();  // clear interrupt status
 375         }
 376 
 377         // select(Consumer, timeout)
 378         try (Selector sel = Selector.open()) {
 379             Thread.currentThread().interrupt();
 380             long start = System.currentTimeMillis();
 381             int n = sel.select(k -> assertTrue(false), 60*1000);
 382             long duration = System.currentTimeMillis() - start;
 383             assertTrue(n == 0);
 384             assertTrue(duration < 5000, "select took " + duration + " ms");
 385             assertTrue(Thread.currentThread().isInterrupted());
 386             assertTrue(sel.isOpen());
 387         } finally {
 388             Thread.currentThread().interrupted();  // clear interrupt status
 389         }
 390     }
 391 
 392     /**
 393      * Test interrupt thread during select
 394      */
 395     public void testInterruptDuringSelect() throws Exception {
 396         // select(Consumer)
 397         try (Selector sel = Selector.open()) {
 398             scheduleInterrupt(Thread.currentThread(), 1, SECONDS);
 399             int n = sel.select(k -> assertTrue(false));
 400             assertTrue(n == 0);
 401             assertTrue(Thread.currentThread().isInterrupted());
 402             assertTrue(sel.isOpen());
 403         } finally {
 404             Thread.currentThread().interrupted();  // clear interrupt status
 405         }
 406 
 407         // select(Consumer, timeout)
 408         try (Selector sel = Selector.open()) {
 409             scheduleInterrupt(Thread.currentThread(), 1, SECONDS);
 410             long start = System.currentTimeMillis();
 411             int n = sel.select(k -> assertTrue(false), 60*1000);
 412             long duration = System.currentTimeMillis() - start;
 413             assertTrue(n == 0);
 414             assertTrue(duration > 500 && duration < 5000,
 415                     "select took " + duration + " ms");
 416             assertTrue(Thread.currentThread().isInterrupted());
 417             assertTrue(sel.isOpen());
 418         } finally {
 419             Thread.currentThread().interrupted();  // clear interrupt status
 420         }
 421     }
 422 
 423     /**
 424      * Test invoking select on a closed selector
 425      */
 426     @Test(expectedExceptions = ClosedSelectorException.class)
 427     public void testClosedSelector1() throws Exception {
 428         Selector sel = Selector.open();
 429         sel.close();
 430         sel.select(k -> assertTrue(false));
 431     }
 432     @Test(expectedExceptions = ClosedSelectorException.class)
 433     public void testClosedSelector2() throws Exception {
 434         Selector sel = Selector.open();
 435         sel.close();
 436         sel.select(k -> assertTrue(false), 1000);
 437     }
 438     @Test(expectedExceptions = ClosedSelectorException.class)
 439     public void testClosedSelector3() throws Exception {
 440         Selector sel = Selector.open();
 441         sel.close();
 442         sel.selectNow(k -> assertTrue(false));
 443     }
 444 
 445     /**
 446      * Test closing selector while in a selection operation
 447      */
 448     public void testCloseDuringSelect() throws Exception {
 449         // select(Consumer)
 450         try (Selector sel = Selector.open()) {
 451             scheduleClose(sel, 3, SECONDS);
 452             int n = sel.select(k -> assertTrue(false));
 453             assertTrue(n == 0);
 454             assertFalse(sel.isOpen());
 455         }
 456 
 457         // select(Consumer, timeout)
 458         try (Selector sel = Selector.open()) {
 459             scheduleClose(sel, 3, SECONDS);
 460             long start = System.currentTimeMillis();
 461             int n = sel.select(k -> assertTrue(false), 60*1000);
 462             long duration = System.currentTimeMillis() - start;
 463             assertTrue(n == 0);
 464             assertTrue(duration > 2000 && duration < 10*1000,
 465                     "select took " + duration + " ms");
 466             assertFalse(sel.isOpen());
 467         }
 468     }
 469 
 470     /**
 471      * Test action closing selector
 472      */
 473     @Test(expectedExceptions = ClosedSelectorException.class)
 474     public void testActionClosingSelector() throws Exception {
 475         Pipe p = Pipe.open();
 476         try (Selector sel = Selector.open()) {
 477             Pipe.SourceChannel source = p.source();
 478             Pipe.SinkChannel sink = p.sink();
 479             source.configureBlocking(false);
 480             SelectionKey key = source.register(sel, SelectionKey.OP_READ);
 481 
 482             // write to sink to ensure that the source is readable
 483             sink.write(messageBuffer());
 484 
 485             // should relay ClosedSelectorException
 486             sel.select(k -> {
 487                 assertTrue(k == key);
 488                 try {
 489                     sel.close();
 490                 } catch (IOException ioe) { }
 491             });
 492         } finally {
 493             closePipe(p);
 494         }
 495     }
 496 
 497     /**
 498      * Test that the action is invoked while synchronized on the selector and
 499      * its selected-key set.
 500      */
 501     public void testLocks() throws Exception {
 502         Pipe p = Pipe.open();
 503         try (Selector sel = Selector.open()) {
 504             Pipe.SourceChannel source = p.source();
 505             Pipe.SinkChannel sink = p.sink();
 506             source.configureBlocking(false);
 507             SelectionKey key = source.register(sel, SelectionKey.OP_READ);
 508 
 509             // write to sink to ensure that the source is readable
 510             sink.write(messageBuffer());
 511 
 512             // select(Consumer)
 513             sel.select(k -> {
 514                 assertTrue(k == key);
 515                 assertTrue(Thread.holdsLock(sel));
 516                 assertFalse(Thread.holdsLock(sel.keys()));
 517                 assertTrue(Thread.holdsLock(sel.selectedKeys()));
 518             });
 519 
 520             // select(Consumer, timeout)
 521             sel.select(k -> {
 522                 assertTrue(k == key);
 523                 assertTrue(Thread.holdsLock(sel));
 524                 assertFalse(Thread.holdsLock(sel.keys()));
 525                 assertTrue(Thread.holdsLock(sel.selectedKeys()));
 526             }, 1000L);
 527 
 528             // selectNow(Consumer)
 529             sel.selectNow(k -> {
 530                 assertTrue(k == key);
 531                 assertTrue(Thread.holdsLock(sel));
 532                 assertFalse(Thread.holdsLock(sel.keys()));
 533                 assertTrue(Thread.holdsLock(sel.selectedKeys()));
 534             });
 535         } finally {
 536             closePipe(p);
 537         }
 538     }
 539 
 540     /**
 541      * Test that selection operations remove cancelled keys from the selector's
 542      * key and selected-key sets.
 543      */
 544     public void testCancel() throws Exception {
 545         Pipe p = Pipe.open();
 546         try (Selector sel = Selector.open()) {
 547             Pipe.SinkChannel sink = p.sink();
 548             Pipe.SourceChannel source = p.source();
 549 
 550             // write to sink to ensure that the source is readable
 551             sink.write(messageBuffer());
 552 
 553             sink.configureBlocking(false);
 554             source.configureBlocking(false);
 555             SelectionKey key1 = sink.register(sel, SelectionKey.OP_WRITE);
 556             SelectionKey key2 = source.register(sel, SelectionKey.OP_READ);
 557 
 558             sel.selectNow();
 559             assertTrue(sel.keys().contains(key1));
 560             assertTrue(sel.keys().contains(key2));
 561             assertTrue(sel.selectedKeys().contains(key1));
 562             assertTrue(sel.selectedKeys().contains(key2));
 563 
 564             // cancel key1
 565             key1.cancel();
 566             int n = sel.selectNow(k -> assertTrue(k == key2));
 567             assertTrue(n == 1);
 568             assertFalse(sel.keys().contains(key1));
 569             assertTrue(sel.keys().contains(key2));
 570             assertFalse(sel.selectedKeys().contains(key1));
 571             assertTrue(sel.selectedKeys().contains(key2));
 572 
 573             // cancel key2
 574             key2.cancel();
 575             n = sel.selectNow(k -> assertTrue(false));
 576             assertTrue(n == 0);
 577             assertFalse(sel.keys().contains(key1));
 578             assertFalse(sel.keys().contains(key2));
 579             assertFalse(sel.selectedKeys().contains(key1));
 580             assertFalse(sel.selectedKeys().contains(key2));
 581         } finally {
 582             closePipe(p);
 583         }
 584     }
 585 
 586     /**
 587      * Test an action invoking select()
 588      */
 589     public void testReentrantSelect1() throws Exception {
 590         Pipe p = Pipe.open();
 591         try (Selector sel = Selector.open()) {
 592             Pipe.SinkChannel sink = p.sink();
 593             Pipe.SourceChannel source = p.source();
 594             source.configureBlocking(false);
 595             source.register(sel, SelectionKey.OP_READ);
 596 
 597             // write to sink to ensure that the source is readable
 598             scheduleWrite(sink, messageBuffer(), 100, MILLISECONDS);
 599 
 600             int n = sel.select(k -> {
 601                 try {
 602                     sel.select();
 603                     assertTrue(false);
 604                 } catch (IOException ioe) {
 605                     throw new RuntimeException(ioe);
 606                 } catch (IllegalStateException expected) {
 607                 }
 608             });
 609             assertTrue(n == 1);
 610         } finally {
 611             closePipe(p);
 612         }
 613     }
 614 
 615     /**
 616      * Test an action invoking selectNow()
 617      */
 618     public void testReentrantSelect2() throws Exception {
 619         Pipe p = Pipe.open();
 620         try (Selector sel = Selector.open()) {
 621             Pipe.SinkChannel sink = p.sink();
 622             Pipe.SourceChannel source = p.source();
 623 
 624             // write to sink to ensure that the source is readable
 625             scheduleWrite(sink, messageBuffer(), 100, MILLISECONDS);
 626 
 627             source.configureBlocking(false);
 628             source.register(sel, SelectionKey.OP_READ);
 629             int n = sel.select(k -> {
 630                 try {
 631                     sel.selectNow();
 632                     assertTrue(false);
 633                 } catch (IOException ioe) {
 634                     throw new RuntimeException(ioe);
 635                 } catch (IllegalStateException expected) {
 636                 }
 637             });
 638             assertTrue(n == 1);
 639         } finally {
 640             closePipe(p);
 641         }
 642     }
 643 
 644     /**
 645      * Test an action invoking select(Consumer)
 646      */
 647     public void testReentrantSelect3() throws Exception {
 648         Pipe p = Pipe.open();
 649         try (Selector sel = Selector.open()) {
 650             Pipe.SinkChannel sink = p.sink();
 651             Pipe.SourceChannel source = p.source();
 652 
 653             // write to sink to ensure that the source is readable
 654             scheduleWrite(sink, messageBuffer(), 100, MILLISECONDS);
 655 
 656             source.configureBlocking(false);
 657             source.register(sel, SelectionKey.OP_READ);
 658             int n = sel.select(k -> {
 659                 try {
 660                     sel.select(x -> assertTrue(false));
 661                     assertTrue(false);
 662                 } catch (IOException ioe) {
 663                     throw new RuntimeException(ioe);
 664                 } catch (IllegalStateException expected) {
 665                 }
 666             });
 667             assertTrue(n == 1);
 668         } finally {
 669             closePipe(p);
 670         }
 671     }
 672 
 673     /**
 674      * Negative timeout
 675      */
 676     @Test(expectedExceptions = IllegalArgumentException.class)
 677     public void testNegativeTimeout() throws Exception {
 678         try (Selector sel = Selector.open()) {
 679             sel.select(k -> { }, -1L);
 680         }
 681     }
 682 
 683     /**
 684      * Null action
 685      */
 686     @Test(expectedExceptions = NullPointerException.class)
 687     public void testNull1() throws Exception {
 688         try (Selector sel = Selector.open()) {
 689             sel.select(null);
 690         }
 691     }
 692     @Test(expectedExceptions = NullPointerException.class)
 693     public void testNull2() throws Exception {
 694         try (Selector sel = Selector.open()) {
 695             sel.select(null, 1000);
 696         }
 697     }
 698     @Test(expectedExceptions = NullPointerException.class)
 699     public void testNull3() throws Exception {
 700         try (Selector sel = Selector.open()) {
 701             sel.selectNow(null);
 702         }
 703     }
 704 
 705 
 706     // -- support methods ---
 707 
 708     private final ScheduledExecutorService POOL = Executors.newScheduledThreadPool(1);
 709 
 710     @AfterTest
 711     void shutdownThreadPool() {
 712         POOL.shutdown();
 713     }
 714 
 715     void scheduleWakeup(Selector sel, long delay, TimeUnit unit) {
 716         POOL.schedule(() -> sel.wakeup(), delay, unit);
 717     }
 718 
 719     void scheduleInterrupt(Thread t, long delay, TimeUnit unit) {
 720         POOL.schedule(() -> t.interrupt(), delay, unit);
 721     }
 722 
 723     void scheduleClose(Closeable c, long delay, TimeUnit unit) {
 724         POOL.schedule(() -> {
 725             try {
 726                 c.close();
 727             } catch (IOException ioe) {
 728                 ioe.printStackTrace();
 729             }
 730         }, delay, unit);
 731     }
 732 
 733     void scheduleWrite(WritableByteChannel sink, ByteBuffer buf, long delay, TimeUnit unit) {
 734         POOL.schedule(() -> {
 735             try {
 736                 sink.write(buf);
 737             } catch (IOException ioe) {
 738                 ioe.printStackTrace();
 739             }
 740         }, delay, unit);
 741     }
 742 
 743     static void closePipe(Pipe p) {
 744         try { p.sink().close(); } catch (IOException ignore) { }
 745         try { p.source().close(); } catch (IOException ignore) { }
 746     }
 747 
 748     static ByteBuffer messageBuffer() {
 749         try {
 750             return ByteBuffer.wrap("message".getBytes("UTF-8"));
 751         } catch (Exception e) {
 752             throw new RuntimeException(e);
 753         }
 754     }
 755 }