1 /*
   2  * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 /* @test
  25  * @bug 8199433
  26  * @run testng SelectWithConsumer
  27  * @summary Unit test for Selector select(Consumer), select(Consumer,long) and
  28  *          selectNow(Consumer)
  29  */
  30 
  31 import java.io.Closeable;
  32 import java.io.IOException;
  33 import java.net.InetSocketAddress;
  34 import java.nio.ByteBuffer;
  35 import java.nio.channels.ClosedSelectorException;
  36 import java.nio.channels.Pipe;
  37 import java.nio.channels.SelectionKey;
  38 import java.nio.channels.Selector;
  39 import java.nio.channels.ServerSocketChannel;
  40 import java.nio.channels.SocketChannel;
  41 import java.nio.channels.WritableByteChannel;
  42 import java.util.concurrent.Executors;
  43 import java.util.concurrent.ScheduledExecutorService;
  44 import java.util.concurrent.TimeUnit;
  45 import java.util.concurrent.atomic.AtomicInteger;
  46 import static java.util.concurrent.TimeUnit.*;
  47 
  48 import org.testng.annotations.AfterTest;
  49 import org.testng.annotations.Test;
  50 import static org.testng.Assert.*;
  51 
  52 @Test
  53 public class SelectWithConsumer {
  54 
  55     /**
  56      * Invoke the select methods that take an action and check that the
  57      * accumulated ready ops notified to the action matches the expected ops.
  58      */
  59     void testActionInvoked(SelectionKey key, int expectedOps) throws Exception {
  60         var callerThread = Thread.currentThread();
  61         var sel = key.selector();
  62         var interestOps = key.interestOps();
  63         var notifiedOps = new AtomicInteger();
  64 
  65         // select(Consumer)
  66         if (expectedOps == 0)
  67             sel.wakeup(); // ensure select does not block
  68         notifiedOps.set(0);
  69         int n = sel.select(k -> {
  70             assertTrue(Thread.currentThread() == callerThread);
  71             assertTrue(k == key);
  72             int readyOps = key.readyOps();
  73             assertTrue((readyOps & interestOps) != 0);
  74             assertTrue((readyOps & notifiedOps.get()) == 0);
  75             notifiedOps.set(notifiedOps.get() | readyOps);
  76         });
  77         assertTrue((n == 1) ^ (expectedOps == 0));
  78         assertTrue(notifiedOps.get() == expectedOps);
  79 
  80         // select(Consumer, timeout)
  81         notifiedOps.set(0);
  82         n = sel.select(k -> {
  83             assertTrue(Thread.currentThread() == callerThread);
  84             assertTrue(k == key);
  85             int readyOps = key.readyOps();
  86             assertTrue((readyOps & interestOps) != 0);
  87             assertTrue((readyOps & notifiedOps.get()) == 0);
  88             notifiedOps.set(notifiedOps.get() | readyOps);
  89         }, 1000);
  90         assertTrue((n == 1) ^ (expectedOps == 0));
  91         assertTrue(notifiedOps.get() == expectedOps);
  92 
  93         // selectNow(Consumer)
  94         notifiedOps.set(0);
  95         n = sel.selectNow(k -> {
  96             assertTrue(Thread.currentThread() == callerThread);
  97             assertTrue(k == key);
  98             int readyOps = key.readyOps();
  99             assertTrue((readyOps & interestOps) != 0);
 100             assertTrue((readyOps & notifiedOps.get()) == 0);
 101             notifiedOps.set(notifiedOps.get() | readyOps);
 102         });
 103         assertTrue((n == 1) ^ (expectedOps == 0));
 104         assertTrue(notifiedOps.get() == expectedOps);
 105     }
 106 
 107     /**
 108      * Test that an action is performed when a channel is ready for reading.
 109      */
 110     public void testReadable() throws Exception {
 111         Pipe p = Pipe.open();
 112         try (Selector sel = Selector.open()) {
 113             Pipe.SinkChannel sink = p.sink();
 114             Pipe.SourceChannel source = p.source();
 115             source.configureBlocking(false);
 116             SelectionKey key = source.register(sel, SelectionKey.OP_READ);
 117 
 118             // write to sink to ensure source is readable
 119             scheduleWrite(sink, messageBuffer(), 100, MILLISECONDS);
 120 
 121             // test that action is invoked
 122             testActionInvoked(key, SelectionKey.OP_READ);
 123         } finally {
 124             closePipe(p);
 125         }
 126     }
 127 
 128     /**
 129      * Test that an action is performed when a channel is ready for writing.
 130      */
 131     public void testWritable() throws Exception {
 132         Pipe p = Pipe.open();
 133         try (Selector sel = Selector.open()) {
 134             Pipe.SourceChannel source = p.source();
 135             Pipe.SinkChannel sink = p.sink();
 136             sink.configureBlocking(false);
 137             SelectionKey key = sink.register(sel, SelectionKey.OP_WRITE);
 138 
 139             // test that action is invoked
 140             testActionInvoked(key, SelectionKey.OP_WRITE);
 141         } finally {
 142             closePipe(p);
 143         }
 144     }
 145 
 146     /**
 147      * Test that an action is performed when a channel is ready for both
 148      * reading and writing.
 149      */
 150     public void testReadableAndWriteable() throws Exception {
 151         ServerSocketChannel ssc = null;
 152         SocketChannel sc = null;
 153         SocketChannel peer = null;
 154         try (Selector sel = Selector.open()) {
 155             ssc = ServerSocketChannel.open().bind(new InetSocketAddress(0));
 156             sc = SocketChannel.open(ssc.getLocalAddress());
 157             sc.configureBlocking(false);
 158             SelectionKey key = sc.register(sel, (SelectionKey.OP_READ |
 159                                                  SelectionKey.OP_WRITE));
 160 
 161             // accept connection and write data so the source is readable
 162             peer = ssc.accept();
 163             peer.write(messageBuffer());
 164 
 165             // test that action is invoked
 166             testActionInvoked(key, (SelectionKey.OP_READ | SelectionKey.OP_WRITE));
 167         } finally {
 168             if (ssc != null) ssc.close();
 169             if (sc != null) sc.close();
 170             if (peer != null) peer.close();
 171         }
 172     }
 173 
 174     /**
 175      * Test that the action is called for two selected channels
 176      */
 177     public void testTwoChannels() throws Exception {
 178         Pipe p = Pipe.open();
 179         try (Selector sel = Selector.open()) {
 180             Pipe.SourceChannel source = p.source();
 181             Pipe.SinkChannel sink = p.sink();
 182             source.configureBlocking(false);
 183             sink.configureBlocking(false);
 184             SelectionKey key1 = source.register(sel, SelectionKey.OP_READ);
 185             SelectionKey key2 = sink.register(sel, SelectionKey.OP_WRITE);
 186 
 187             // write to sink to ensure that the source is readable
 188             sink.write(messageBuffer());
 189 
 190             var counter = new AtomicInteger();
 191 
 192             // select(Consumer)
 193             counter.set(0);
 194             int n = sel.select(k -> {
 195                 counter.incrementAndGet();
 196                 if (k == key1) {
 197                     assertTrue(k.isReadable());
 198                 } else if (k == key2) {
 199                     assertTrue(k.isWritable());
 200                 } else {
 201                     assertTrue(false);
 202                 }
 203             });
 204             assertTrue(n == 2);
 205             assertTrue(counter.get() == 2);
 206 
 207             // select(Consumer, timeout)
 208             counter.set(0);
 209             n = sel.select(k -> {
 210                 counter.incrementAndGet();
 211                 if (k == key1) {
 212                     assertTrue(k.isReadable());
 213                 } else if (k == key2) {
 214                     assertTrue(k.isWritable());
 215                 } else {
 216                     assertTrue(false);
 217                 }
 218             }, 1000);
 219             assertTrue(n == 2);
 220             assertTrue(counter.get() == 2);
 221 
 222             // selectNow(Consumer)
 223             counter.set(0);
 224             n = sel.selectNow(k -> {
 225                 counter.incrementAndGet();
 226                 if (k == key1) {
 227                     assertTrue(k.isReadable());
 228                 } else if (k == key2) {
 229                     assertTrue(k.isWritable());
 230                 } else {
 231                     assertTrue(false);
 232                 }
 233             });
 234             assertTrue(n == 2);
 235             assertTrue(counter.get() == 2);
 236         } finally {
 237             closePipe(p);
 238         }
 239     }
 240 
 241     /**
 242      * Test calling select twice, the action should be invoked each time
 243      */
 244     public void testRepeatedSelect1() throws Exception {
 245         Pipe p = Pipe.open();
 246         try (Selector sel = Selector.open()) {
 247             Pipe.SourceChannel source = p.source();
 248             Pipe.SinkChannel sink = p.sink();
 249             source.configureBlocking(false);
 250             SelectionKey key = source.register(sel, SelectionKey.OP_READ);
 251 
 252             // write to sink to ensure that the source is readable
 253             sink.write(messageBuffer());
 254 
 255             // test that action is invoked
 256             testActionInvoked(key, SelectionKey.OP_READ);
 257             testActionInvoked(key, SelectionKey.OP_READ);
 258 
 259         } finally {
 260             closePipe(p);
 261         }
 262     }
 263 
 264     /**
 265      * Test calling select twice. An I/O operation is performed after the
 266      * first select so the channel will not be selected by the second select.
 267      */
 268     public void testRepeatedSelect2() throws Exception {
 269         Pipe p = Pipe.open();
 270         try (Selector sel = Selector.open()) {
 271             Pipe.SourceChannel source = p.source();
 272             Pipe.SinkChannel sink = p.sink();
 273             source.configureBlocking(false);
 274             SelectionKey key = source.register(sel, SelectionKey.OP_READ);
 275 
 276             // write to sink to ensure that the source is readable
 277             sink.write(messageBuffer());
 278 
 279             // test that action is invoked
 280             testActionInvoked(key, SelectionKey.OP_READ);
 281 
 282             // read all bytes
 283             int n;
 284             ByteBuffer bb = ByteBuffer.allocate(100);
 285             do {
 286                 n = source.read(bb);
 287                 bb.clear();
 288             } while (n > 0);
 289 
 290             // test that action is not invoked
 291             testActionInvoked(key, 0);
 292         } finally {
 293             closePipe(p);
 294         }
 295     }
 296 
 297     /**
 298      * Test timeout
 299      */
 300     public void testTimeout() throws Exception {
 301         Pipe p = Pipe.open();
 302         try (Selector sel = Selector.open()) {
 303             Pipe.SourceChannel source = p.source();
 304             Pipe.SinkChannel sink = p.sink();
 305             source.configureBlocking(false);
 306             source.register(sel, SelectionKey.OP_READ);
 307             long start = System.currentTimeMillis();
 308             int n = sel.select(k -> assertTrue(false), 1000L);
 309             long duration = System.currentTimeMillis() - start;
 310             assertTrue(n == 0);
 311             assertTrue(duration > 500, "select took " + duration + " ms");
 312         } finally {
 313             closePipe(p);
 314         }
 315     }
 316 
 317     /**
 318      * Test wakeup prior to select
 319      */
 320     public void testWakeupBeforeSelect() throws Exception {
 321         // select(Consumer)
 322         try (Selector sel = Selector.open()) {
 323             sel.wakeup();
 324             int n = sel.select(k -> assertTrue(false));
 325             assertTrue(n == 0);
 326         }
 327 
 328         // select(Consumer, timeout)
 329         try (Selector sel = Selector.open()) {
 330             sel.wakeup();
 331             long start = System.currentTimeMillis();
 332             int n = sel.select(k -> assertTrue(false), 60*1000);
 333             long duration = System.currentTimeMillis() - start;
 334             assertTrue(n == 0);
 335             assertTrue(duration < 5000, "select took " + duration + " ms");
 336         }
 337     }
 338 
 339     /**
 340      * Test wakeup during select
 341      */
 342     public void testWakeupDuringSelect() throws Exception {
 343         // select(Consumer)
 344         try (Selector sel = Selector.open()) {
 345             scheduleWakeup(sel, 1, SECONDS);
 346             int n = sel.select(k -> assertTrue(false));
 347             assertTrue(n == 0);
 348         }
 349 
 350         // select(Consumer, timeout)
 351         try (Selector sel = Selector.open()) {
 352             scheduleWakeup(sel, 1, SECONDS);
 353             long start = System.currentTimeMillis();
 354             int n = sel.select(k -> assertTrue(false), 60*1000);
 355             long duration = System.currentTimeMillis() - start;
 356             assertTrue(n == 0);
 357             assertTrue(duration > 500 && duration < 10*1000,
 358                     "select took " + duration + " ms");
 359         }
 360     }
 361 
 362     /**
 363      * Test invoking select with interrupt status set
 364      */
 365     public void testInterruptBeforeSelect() throws Exception {
 366         // select(Consumer)
 367         try (Selector sel = Selector.open()) {
 368             Thread.currentThread().interrupt();
 369             int n = sel.select(k -> assertTrue(false));
 370             assertTrue(n == 0);
 371             assertTrue(Thread.currentThread().isInterrupted());
 372             assertTrue(sel.isOpen());
 373         } finally {
 374             Thread.currentThread().interrupted();  // clear interrupt status
 375         }
 376 
 377         // select(Consumer, timeout)
 378         try (Selector sel = Selector.open()) {
 379             Thread.currentThread().interrupt();
 380             long start = System.currentTimeMillis();
 381             int n = sel.select(k -> assertTrue(false), 60*1000);
 382             long duration = System.currentTimeMillis() - start;
 383             assertTrue(n == 0);
 384             assertTrue(duration < 5000, "select took " + duration + " ms");
 385             assertTrue(Thread.currentThread().isInterrupted());
 386             assertTrue(sel.isOpen());
 387         } finally {
 388             Thread.currentThread().interrupted();  // clear interrupt status
 389         }
 390     }
 391 
 392     /**
 393      * Test interrupt thread during select
 394      */
 395     public void testInterruptDuringSelect() throws Exception {
 396         // select(Consumer)
 397         try (Selector sel = Selector.open()) {
 398             scheduleInterrupt(Thread.currentThread(), 1, SECONDS);
 399             int n = sel.select(k -> assertTrue(false));
 400             assertTrue(n == 0);
 401             assertTrue(Thread.currentThread().isInterrupted());
 402             assertTrue(sel.isOpen());
 403         } finally {
 404             Thread.currentThread().interrupted();  // clear interrupt status
 405         }
 406 
 407         // select(Consumer, timeout)
 408         try (Selector sel = Selector.open()) {
 409             scheduleInterrupt(Thread.currentThread(), 1, SECONDS);
 410             long start = System.currentTimeMillis();
 411             int n = sel.select(k -> assertTrue(false), 60*1000);
 412             long duration = System.currentTimeMillis() - start;
 413             assertTrue(n == 0);
 414             assertTrue(Thread.currentThread().isInterrupted());
 415             assertTrue(sel.isOpen());
 416         } finally {
 417             Thread.currentThread().interrupted();  // clear interrupt status
 418         }
 419     }
 420 
 421     /**
 422      * Test invoking select on a closed selector
 423      */
 424     @Test(expectedExceptions = ClosedSelectorException.class)
 425     public void testClosedSelector1() throws Exception {
 426         Selector sel = Selector.open();
 427         sel.close();
 428         sel.select(k -> assertTrue(false));
 429     }
 430     @Test(expectedExceptions = ClosedSelectorException.class)
 431     public void testClosedSelector2() throws Exception {
 432         Selector sel = Selector.open();
 433         sel.close();
 434         sel.select(k -> assertTrue(false), 1000);
 435     }
 436     @Test(expectedExceptions = ClosedSelectorException.class)
 437     public void testClosedSelector3() throws Exception {
 438         Selector sel = Selector.open();
 439         sel.close();
 440         sel.selectNow(k -> assertTrue(false));
 441     }
 442 
 443     /**
 444      * Test closing selector while in a selection operation
 445      */
 446     public void testCloseDuringSelect() throws Exception {
 447         // select(Consumer)
 448         try (Selector sel = Selector.open()) {
 449             scheduleClose(sel, 3, SECONDS);
 450             int n = sel.select(k -> assertTrue(false));
 451             assertTrue(n == 0);
 452             assertFalse(sel.isOpen());
 453         }
 454 
 455         // select(Consumer, timeout)
 456         try (Selector sel = Selector.open()) {
 457             scheduleClose(sel, 3, SECONDS);
 458             long start = System.currentTimeMillis();
 459             int n = sel.select(k -> assertTrue(false), 60*1000);
 460             long duration = System.currentTimeMillis() - start;
 461             assertTrue(n == 0);
 462             assertTrue(duration > 2000 && duration < 10*1000,
 463                     "select took " + duration + " ms");
 464             assertFalse(sel.isOpen());
 465         }
 466     }
 467 
 468     /**
 469      * Test action closing selector
 470      */
 471     @Test(expectedExceptions = ClosedSelectorException.class)
 472     public void testActionClosingSelector() throws Exception {
 473         Pipe p = Pipe.open();
 474         try (Selector sel = Selector.open()) {
 475             Pipe.SourceChannel source = p.source();
 476             Pipe.SinkChannel sink = p.sink();
 477             source.configureBlocking(false);
 478             SelectionKey key = source.register(sel, SelectionKey.OP_READ);
 479 
 480             // write to sink to ensure that the source is readable
 481             sink.write(messageBuffer());
 482 
 483             // should relay ClosedSelectorException
 484             sel.select(k -> {
 485                 assertTrue(k == key);
 486                 try {
 487                     sel.close();
 488                 } catch (IOException ioe) { }
 489             });
 490         } finally {
 491             closePipe(p);
 492         }
 493     }
 494 
 495     /**
 496      * Test that the action is invoked while synchronized on the selector and
 497      * its selected-key set.
 498      */
 499     public void testLocks() throws Exception {
 500         Pipe p = Pipe.open();
 501         try (Selector sel = Selector.open()) {
 502             Pipe.SourceChannel source = p.source();
 503             Pipe.SinkChannel sink = p.sink();
 504             source.configureBlocking(false);
 505             SelectionKey key = source.register(sel, SelectionKey.OP_READ);
 506 
 507             // write to sink to ensure that the source is readable
 508             sink.write(messageBuffer());
 509 
 510             // select(Consumer)
 511             sel.select(k -> {
 512                 assertTrue(k == key);
 513                 assertTrue(Thread.holdsLock(sel));
 514                 assertFalse(Thread.holdsLock(sel.keys()));
 515                 assertTrue(Thread.holdsLock(sel.selectedKeys()));
 516             });
 517 
 518             // select(Consumer, timeout)
 519             sel.select(k -> {
 520                 assertTrue(k == key);
 521                 assertTrue(Thread.holdsLock(sel));
 522                 assertFalse(Thread.holdsLock(sel.keys()));
 523                 assertTrue(Thread.holdsLock(sel.selectedKeys()));
 524             }, 1000L);
 525 
 526             // selectNow(Consumer)
 527             sel.selectNow(k -> {
 528                 assertTrue(k == key);
 529                 assertTrue(Thread.holdsLock(sel));
 530                 assertFalse(Thread.holdsLock(sel.keys()));
 531                 assertTrue(Thread.holdsLock(sel.selectedKeys()));
 532             });
 533         } finally {
 534             closePipe(p);
 535         }
 536     }
 537 
 538     /**
 539      * Test that selection operations remove cancelled keys from the selector's
 540      * key and selected-key sets.
 541      */
 542     public void testCancel() throws Exception {
 543         Pipe p = Pipe.open();
 544         try (Selector sel = Selector.open()) {
 545             Pipe.SinkChannel sink = p.sink();
 546             Pipe.SourceChannel source = p.source();
 547 
 548             // write to sink to ensure that the source is readable
 549             sink.write(messageBuffer());
 550 
 551             source.configureBlocking(false);
 552             SelectionKey key1 = source.register(sel, SelectionKey.OP_READ);
 553             // make sure pipe source is readable before we do following checks.
 554             // this is sometime necessary on windows where pipe is implemented
 555             // as a pair of connected socket, so there is no guarantee that written
 556             // bytes on sink side is immediately available on source side.
 557             sel.select();
 558 
 559             sink.configureBlocking(false);
 560             SelectionKey key2 = sink.register(sel, SelectionKey.OP_WRITE);
 561             sel.selectNow();
 562 
 563             assertTrue(sel.keys().contains(key1));
 564             assertTrue(sel.keys().contains(key2));
 565             assertTrue(sel.selectedKeys().contains(key1));
 566             assertTrue(sel.selectedKeys().contains(key2));
 567 
 568             // cancel key1
 569             key1.cancel();
 570             int n = sel.selectNow(k -> assertTrue(k == key2));
 571             assertTrue(n == 1);
 572             assertFalse(sel.keys().contains(key1));
 573             assertTrue(sel.keys().contains(key2));
 574             assertFalse(sel.selectedKeys().contains(key1));
 575             assertTrue(sel.selectedKeys().contains(key2));
 576 
 577             // cancel key2
 578             key2.cancel();
 579             n = sel.selectNow(k -> assertTrue(false));
 580             assertTrue(n == 0);
 581             assertFalse(sel.keys().contains(key1));
 582             assertFalse(sel.keys().contains(key2));
 583             assertFalse(sel.selectedKeys().contains(key1));
 584             assertFalse(sel.selectedKeys().contains(key2));
 585         } finally {
 586             closePipe(p);
 587         }
 588     }
 589 
 590     /**
 591      * Test an action invoking select()
 592      */
 593     public void testReentrantSelect1() throws Exception {
 594         Pipe p = Pipe.open();
 595         try (Selector sel = Selector.open()) {
 596             Pipe.SinkChannel sink = p.sink();
 597             Pipe.SourceChannel source = p.source();
 598             source.configureBlocking(false);
 599             source.register(sel, SelectionKey.OP_READ);
 600 
 601             // write to sink to ensure that the source is readable
 602             scheduleWrite(sink, messageBuffer(), 100, MILLISECONDS);
 603 
 604             int n = sel.select(k -> {
 605                 try {
 606                     sel.select();
 607                     assertTrue(false);
 608                 } catch (IOException ioe) {
 609                     throw new RuntimeException(ioe);
 610                 } catch (IllegalStateException expected) {
 611                 }
 612             });
 613             assertTrue(n == 1);
 614         } finally {
 615             closePipe(p);
 616         }
 617     }
 618 
 619     /**
 620      * Test an action invoking selectNow()
 621      */
 622     public void testReentrantSelect2() throws Exception {
 623         Pipe p = Pipe.open();
 624         try (Selector sel = Selector.open()) {
 625             Pipe.SinkChannel sink = p.sink();
 626             Pipe.SourceChannel source = p.source();
 627 
 628             // write to sink to ensure that the source is readable
 629             scheduleWrite(sink, messageBuffer(), 100, MILLISECONDS);
 630 
 631             source.configureBlocking(false);
 632             source.register(sel, SelectionKey.OP_READ);
 633             int n = sel.select(k -> {
 634                 try {
 635                     sel.selectNow();
 636                     assertTrue(false);
 637                 } catch (IOException ioe) {
 638                     throw new RuntimeException(ioe);
 639                 } catch (IllegalStateException expected) {
 640                 }
 641             });
 642             assertTrue(n == 1);
 643         } finally {
 644             closePipe(p);
 645         }
 646     }
 647 
 648     /**
 649      * Test an action invoking select(Consumer)
 650      */
 651     public void testReentrantSelect3() throws Exception {
 652         Pipe p = Pipe.open();
 653         try (Selector sel = Selector.open()) {
 654             Pipe.SinkChannel sink = p.sink();
 655             Pipe.SourceChannel source = p.source();
 656 
 657             // write to sink to ensure that the source is readable
 658             scheduleWrite(sink, messageBuffer(), 100, MILLISECONDS);
 659 
 660             source.configureBlocking(false);
 661             source.register(sel, SelectionKey.OP_READ);
 662             int n = sel.select(k -> {
 663                 try {
 664                     sel.select(x -> assertTrue(false));
 665                     assertTrue(false);
 666                 } catch (IOException ioe) {
 667                     throw new RuntimeException(ioe);
 668                 } catch (IllegalStateException expected) {
 669                 }
 670             });
 671             assertTrue(n == 1);
 672         } finally {
 673             closePipe(p);
 674         }
 675     }
 676 
 677     /**
 678      * Negative timeout
 679      */
 680     @Test(expectedExceptions = IllegalArgumentException.class)
 681     public void testNegativeTimeout() throws Exception {
 682         try (Selector sel = Selector.open()) {
 683             sel.select(k -> { }, -1L);
 684         }
 685     }
 686 
 687     /**
 688      * Null action
 689      */
 690     @Test(expectedExceptions = NullPointerException.class)
 691     public void testNull1() throws Exception {
 692         try (Selector sel = Selector.open()) {
 693             sel.select(null);
 694         }
 695     }
 696     @Test(expectedExceptions = NullPointerException.class)
 697     public void testNull2() throws Exception {
 698         try (Selector sel = Selector.open()) {
 699             sel.select(null, 1000);
 700         }
 701     }
 702     @Test(expectedExceptions = NullPointerException.class)
 703     public void testNull3() throws Exception {
 704         try (Selector sel = Selector.open()) {
 705             sel.selectNow(null);
 706         }
 707     }
 708 
 709 
 710     // -- support methods ---
 711 
 712     private final ScheduledExecutorService POOL = Executors.newScheduledThreadPool(1);
 713 
 714     @AfterTest
 715     void shutdownThreadPool() {
 716         POOL.shutdown();
 717     }
 718 
 719     void scheduleWakeup(Selector sel, long delay, TimeUnit unit) {
 720         POOL.schedule(() -> sel.wakeup(), delay, unit);
 721     }
 722 
 723     void scheduleInterrupt(Thread t, long delay, TimeUnit unit) {
 724         POOL.schedule(() -> t.interrupt(), delay, unit);
 725     }
 726 
 727     void scheduleClose(Closeable c, long delay, TimeUnit unit) {
 728         POOL.schedule(() -> {
 729             try {
 730                 c.close();
 731             } catch (IOException ioe) {
 732                 ioe.printStackTrace();
 733             }
 734         }, delay, unit);
 735     }
 736 
 737     void scheduleWrite(WritableByteChannel sink, ByteBuffer buf, long delay, TimeUnit unit) {
 738         POOL.schedule(() -> {
 739             try {
 740                 sink.write(buf);
 741             } catch (IOException ioe) {
 742                 ioe.printStackTrace();
 743             }
 744         }, delay, unit);
 745     }
 746 
 747     static void closePipe(Pipe p) {
 748         try { p.sink().close(); } catch (IOException ignore) { }
 749         try { p.source().close(); } catch (IOException ignore) { }
 750     }
 751 
 752     static ByteBuffer messageBuffer() {
 753         try {
 754             return ByteBuffer.wrap("message".getBytes("UTF-8"));
 755         } catch (Exception e) {
 756             throw new RuntimeException(e);
 757         }
 758     }
 759 }