1 /*
   2  * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 /* @test
  25  * @bug 8199433
  26  * @run testng SelectWithConsumer
  27  * @summary Unit test for Selector select(Consumer), select(Consumer,long) and
  28  *          selectNow(Consumer)
  29  */
  30 
  31 import java.io.Closeable;
  32 import java.io.IOException;
  33 import java.net.InetSocketAddress;
  34 import java.nio.ByteBuffer;
  35 import java.nio.channels.ClosedSelectorException;
  36 import java.nio.channels.Pipe;
  37 import java.nio.channels.SelectionKey;
  38 import java.nio.channels.Selector;
  39 import java.nio.channels.ServerSocketChannel;
  40 import java.nio.channels.SocketChannel;
  41 import java.nio.channels.WritableByteChannel;
  42 import java.util.concurrent.Executors;
  43 import java.util.concurrent.ScheduledExecutorService;
  44 import java.util.concurrent.TimeUnit;
  45 import java.util.concurrent.atomic.AtomicInteger;
  46 import static java.util.concurrent.TimeUnit.*;
  47 
  48 import org.testng.annotations.AfterTest;
  49 import org.testng.annotations.Test;
  50 import static org.testng.Assert.*;
  51 
  52 @Test
  53 public class SelectWithConsumer {
  54 
  55     /**
  56      * Invoke the select methods that take an action and check that the
  57      * accumulated ready ops notified to the action matches the expected ops.
  58      */
  59     void testActionInvoked(SelectionKey key, int expectedOps) throws Exception {
  60         var callerThread = Thread.currentThread();
  61         var sel = key.selector();
  62         var interestOps = key.interestOps();
  63         var notifiedOps = new AtomicInteger();
  64 
  65         // select(Consumer)
  66         if (expectedOps == 0)
  67             sel.wakeup(); // ensure select does not block
  68         notifiedOps.set(0);
  69         int n = sel.select(k -> {
  70             assertTrue(Thread.currentThread() == callerThread);
  71             assertTrue(k == key);
  72             int readyOps = key.readyOps();
  73             assertTrue((readyOps & interestOps) != 0);
  74             assertTrue((readyOps & notifiedOps.get()) == 0);
  75             notifiedOps.set(notifiedOps.get() | readyOps);
  76         });
  77         assertTrue((n == 1) ^ (expectedOps == 0));
  78         assertTrue(notifiedOps.get() == expectedOps);
  79 
  80         // select(Consumer, timeout)
  81         notifiedOps.set(0);
  82         n = sel.select(k -> {
  83             assertTrue(Thread.currentThread() == callerThread);
  84             assertTrue(k == key);
  85             int readyOps = key.readyOps();
  86             assertTrue((readyOps & interestOps) != 0);
  87             assertTrue((readyOps & notifiedOps.get()) == 0);
  88             notifiedOps.set(notifiedOps.get() | readyOps);
  89         }, 1000);
  90         assertTrue((n == 1) ^ (expectedOps == 0));
  91         assertTrue(notifiedOps.get() == expectedOps);
  92 
  93         // selectNow(Consumer)
  94         notifiedOps.set(0);
  95         n = sel.selectNow(k -> {
  96             assertTrue(Thread.currentThread() == callerThread);
  97             assertTrue(k == key);
  98             int readyOps = key.readyOps();
  99             assertTrue((readyOps & interestOps) != 0);
 100             assertTrue((readyOps & notifiedOps.get()) == 0);
 101             notifiedOps.set(notifiedOps.get() | readyOps);
 102         });
 103         assertTrue((n == 1) ^ (expectedOps == 0));
 104         assertTrue(notifiedOps.get() == expectedOps);
 105     }
 106 
 107     /**
 108      * Test that an action is performed when a channel is ready for reading.
 109      */
 110     public void testReadable() throws Exception {
 111         Pipe p = Pipe.open();
 112         try (Selector sel = Selector.open()) {
 113             Pipe.SinkChannel sink = p.sink();
 114             Pipe.SourceChannel source = p.source();
 115             source.configureBlocking(false);
 116             SelectionKey key = source.register(sel, SelectionKey.OP_READ);
 117 
 118             // write to sink to ensure source is readable
 119             scheduleWrite(sink, messageBuffer(), 100, MILLISECONDS);
 120 
 121             // test that action is invoked
 122             testActionInvoked(key, SelectionKey.OP_READ);
 123         } finally {
 124             closePipe(p);
 125         }
 126     }
 127 
 128     /**
 129      * Test that an action is performed when a channel is ready for writing.
 130      */
 131     public void testWritable() throws Exception {
 132         Pipe p = Pipe.open();
 133         try (Selector sel = Selector.open()) {
 134             Pipe.SourceChannel source = p.source();
 135             Pipe.SinkChannel sink = p.sink();
 136             sink.configureBlocking(false);
 137             SelectionKey key = sink.register(sel, SelectionKey.OP_WRITE);
 138 
 139             // test that action is invoked
 140             testActionInvoked(key, SelectionKey.OP_WRITE);
 141         } finally {
 142             closePipe(p);
 143         }
 144     }
 145 
 146     /**
 147      * Test that an action is performed when a channel is ready for both
 148      * reading and writing.
 149      */
 150     public void testReadableAndWriteable() throws Exception {
 151         ServerSocketChannel ssc = null;
 152         SocketChannel sc = null;
 153         SocketChannel peer = null;
 154         try (Selector sel = Selector.open()) {
 155             ssc = ServerSocketChannel.open().bind(new InetSocketAddress(0));
 156             sc = SocketChannel.open(ssc.getLocalAddress());
 157             sc.configureBlocking(false);
 158             SelectionKey key = sc.register(sel, (SelectionKey.OP_READ |
 159                                                  SelectionKey.OP_WRITE));
 160 
 161             // accept connection and write data so the source is readable
 162             peer = ssc.accept();
 163             peer.write(messageBuffer());
 164 
 165             // test that action is invoked
 166             testActionInvoked(key, (SelectionKey.OP_READ | SelectionKey.OP_WRITE));
 167         } finally {
 168             if (ssc != null) ssc.close();
 169             if (sc != null) sc.close();
 170             if (peer != null) peer.close();
 171         }
 172     }
 173 
 174     /**
 175      * Test that the action is called for two selected channels
 176      */
 177     public void testTwoChannels() throws Exception {
 178         Pipe p = Pipe.open();
 179         try (Selector sel = Selector.open()) {
 180             Pipe.SourceChannel source = p.source();
 181             Pipe.SinkChannel sink = p.sink();
 182             source.configureBlocking(false);
 183             sink.configureBlocking(false);
 184             SelectionKey key1 = source.register(sel, SelectionKey.OP_READ);
 185             SelectionKey key2 = sink.register(sel, SelectionKey.OP_WRITE);
 186 
 187             // write to sink to ensure that the source is readable
 188             sink.write(messageBuffer());
 189 
 190             var counter = new AtomicInteger();
 191 
 192             // select(Consumer)
 193             counter.set(0);
 194             int n = sel.select(k -> {
 195                 counter.incrementAndGet();
 196                 if (k == key1) {
 197                     assertTrue(k.isReadable());
 198                 } else if (k == key2) {
 199                     assertTrue(k.isWritable());
 200                 } else {
 201                     assertTrue(false);
 202                 }
 203             });
 204             assertTrue(n == 2);
 205             assertTrue(counter.get() == 2);
 206 
 207             // select(Consumer, timeout)
 208             counter.set(0);
 209             n = sel.select(k -> {
 210                 counter.incrementAndGet();
 211                 if (k == key1) {
 212                     assertTrue(k.isReadable());
 213                 } else if (k == key2) {
 214                     assertTrue(k.isWritable());
 215                 } else {
 216                     assertTrue(false);
 217                 }
 218             }, 1000);
 219             assertTrue(n == 2);
 220             assertTrue(counter.get() == 2);
 221 
 222             // selectNow(Consumer)
 223             counter.set(0);
 224             n = sel.selectNow(k -> {
 225                 counter.incrementAndGet();
 226                 if (k == key1) {
 227                     assertTrue(k.isReadable());
 228                 } else if (k == key2) {
 229                     assertTrue(k.isWritable());
 230                 } else {
 231                     assertTrue(false);
 232                 }
 233             });
 234             assertTrue(n == 2);
 235             assertTrue(counter.get() == 2);
 236         } finally {
 237             closePipe(p);
 238         }
 239     }
 240 
 241     /**
 242      * Test calling select twice, the action should be invoked each time
 243      */
 244     public void testRepeatedSelect1() throws Exception {
 245         Pipe p = Pipe.open();
 246         try (Selector sel = Selector.open()) {
 247             Pipe.SourceChannel source = p.source();
 248             Pipe.SinkChannel sink = p.sink();
 249             source.configureBlocking(false);
 250             SelectionKey key = source.register(sel, SelectionKey.OP_READ);
 251 
 252             // write to sink to ensure that the source is readable
 253             sink.write(messageBuffer());
 254 
 255             // test that action is invoked
 256             testActionInvoked(key, SelectionKey.OP_READ);
 257             testActionInvoked(key, SelectionKey.OP_READ);
 258 
 259         } finally {
 260             closePipe(p);
 261         }
 262     }
 263 
 264     /**
 265      * Test calling select twice. An I/O operation is performed after the
 266      * first select so the channel will not be selected by the second select.
 267      */
 268     public void testRepeatedSelect2() throws Exception {
 269         Pipe p = Pipe.open();
 270         try (Selector sel = Selector.open()) {
 271             Pipe.SourceChannel source = p.source();
 272             Pipe.SinkChannel sink = p.sink();
 273             source.configureBlocking(false);
 274             SelectionKey key = source.register(sel, SelectionKey.OP_READ);
 275 
 276             // write to sink to ensure that the source is readable
 277             sink.write(messageBuffer());
 278 
 279             // test that action is invoked
 280             testActionInvoked(key, SelectionKey.OP_READ);
 281 
 282             // read all bytes
 283             int n;
 284             ByteBuffer bb = ByteBuffer.allocate(100);
 285             do {
 286                 n = source.read(bb);
 287                 bb.clear();
 288             } while (n > 0);
 289 
 290             // test that action is not invoked
 291             testActionInvoked(key, 0);
 292         } finally {
 293             closePipe(p);
 294         }
 295     }
 296 
 297     /**
 298      * Test timeout
 299      */
 300     public void testTimeout() throws Exception {
 301         Pipe p = Pipe.open();
 302         try (Selector sel = Selector.open()) {
 303             Pipe.SourceChannel source = p.source();
 304             Pipe.SinkChannel sink = p.sink();
 305             source.configureBlocking(false);
 306             source.register(sel, SelectionKey.OP_READ);
 307             long start = System.currentTimeMillis();
 308             int n = sel.select(k -> assertTrue(false), 1000L);
 309             long duration = System.currentTimeMillis() - start;
 310             assertTrue(n == 0);
 311             assertTrue(duration > 500, "select took " + duration + " ms");
 312         } finally {
 313             closePipe(p);
 314         }
 315     }
 316 
 317     /**
 318      * Test wakeup prior to select
 319      */
 320     public void testWakeupBeforeSelect() throws Exception {
 321         // select(Consumer)
 322         try (Selector sel = Selector.open()) {
 323             sel.wakeup();
 324             int n = sel.select(k -> assertTrue(false));
 325             assertTrue(n == 0);
 326         }
 327 
 328         // select(Consumer, timeout)
 329         try (Selector sel = Selector.open()) {
 330             sel.wakeup();
 331             long start = System.currentTimeMillis();
 332             int n = sel.select(k -> assertTrue(false), 60*1000);
 333             long duration = System.currentTimeMillis() - start;
 334             assertTrue(n == 0);
 335             assertTrue(duration < 5000, "select took " + duration + " ms");
 336         }
 337     }
 338 
 339     /**
 340      * Test wakeup during select
 341      */
 342     public void testWakeupDuringSelect() throws Exception {
 343         // select(Consumer)
 344         try (Selector sel = Selector.open()) {
 345             scheduleWakeup(sel, 1, SECONDS);
 346             int n = sel.select(k -> assertTrue(false));
 347             assertTrue(n == 0);
 348         }
 349 
 350         // select(Consumer, timeout)
 351         try (Selector sel = Selector.open()) {
 352             scheduleWakeup(sel, 1, SECONDS);
 353             long start = System.currentTimeMillis();
 354             int n = sel.select(k -> assertTrue(false), 60*1000);
 355             long duration = System.currentTimeMillis() - start;
 356             assertTrue(n == 0);
 357             assertTrue(duration > 500 && duration < 10*1000,
 358                     "select took " + duration + " ms");
 359         }
 360     }
 361 
 362     /**
 363      * Test invoking select with interrupt status set
 364      */
 365     public void testInterruptBeforeSelect() throws Exception {
 366         // select(Consumer)
 367         try (Selector sel = Selector.open()) {
 368             Thread.currentThread().interrupt();
 369             int n = sel.select(k -> assertTrue(false));
 370             assertTrue(n == 0);
 371             assertTrue(Thread.currentThread().isInterrupted());
 372             assertTrue(sel.isOpen());
 373         } finally {
 374             Thread.currentThread().interrupted();  // clear interrupt status
 375         }
 376 
 377         // select(Consumer, timeout)
 378         try (Selector sel = Selector.open()) {
 379             Thread.currentThread().interrupt();
 380             long start = System.currentTimeMillis();
 381             int n = sel.select(k -> assertTrue(false), 60*1000);
 382             long duration = System.currentTimeMillis() - start;
 383             assertTrue(n == 0);
 384             assertTrue(duration < 5000, "select took " + duration + " ms");
 385             assertTrue(Thread.currentThread().isInterrupted());
 386             assertTrue(sel.isOpen());
 387         } finally {
 388             Thread.currentThread().interrupted();  // clear interrupt status
 389         }
 390     }
 391 
 392     /**
 393      * Test interrupt thread during select
 394      */
 395     public void testInterruptDuringSelect() throws Exception {
 396         // select(Consumer)
 397         try (Selector sel = Selector.open()) {
 398             scheduleInterrupt(Thread.currentThread(), 1, SECONDS);
 399             int n = sel.select(k -> assertTrue(false));
 400             assertTrue(n == 0);
 401             assertTrue(Thread.currentThread().isInterrupted());
 402             assertTrue(sel.isOpen());
 403         } finally {
 404             Thread.currentThread().interrupted();  // clear interrupt status
 405         }
 406 
 407         // select(Consumer, timeout)
 408         try (Selector sel = Selector.open()) {
 409             scheduleInterrupt(Thread.currentThread(), 1, SECONDS);
 410             long start = System.currentTimeMillis();
 411             int n = sel.select(k -> assertTrue(false), 60*1000);
 412             long duration = System.currentTimeMillis() - start;
 413             assertTrue(n == 0);


 414             assertTrue(Thread.currentThread().isInterrupted());
 415             assertTrue(sel.isOpen());
 416         } finally {
 417             Thread.currentThread().interrupted();  // clear interrupt status
 418         }
 419     }
 420 
 421     /**
 422      * Test invoking select on a closed selector
 423      */
 424     @Test(expectedExceptions = ClosedSelectorException.class)
 425     public void testClosedSelector1() throws Exception {
 426         Selector sel = Selector.open();
 427         sel.close();
 428         sel.select(k -> assertTrue(false));
 429     }
 430     @Test(expectedExceptions = ClosedSelectorException.class)
 431     public void testClosedSelector2() throws Exception {
 432         Selector sel = Selector.open();
 433         sel.close();
 434         sel.select(k -> assertTrue(false), 1000);
 435     }
 436     @Test(expectedExceptions = ClosedSelectorException.class)
 437     public void testClosedSelector3() throws Exception {
 438         Selector sel = Selector.open();
 439         sel.close();
 440         sel.selectNow(k -> assertTrue(false));
 441     }
 442 
 443     /**
 444      * Test closing selector while in a selection operation
 445      */
 446     public void testCloseDuringSelect() throws Exception {
 447         // select(Consumer)
 448         try (Selector sel = Selector.open()) {
 449             scheduleClose(sel, 3, SECONDS);
 450             int n = sel.select(k -> assertTrue(false));
 451             assertTrue(n == 0);
 452             assertFalse(sel.isOpen());
 453         }
 454 
 455         // select(Consumer, timeout)
 456         try (Selector sel = Selector.open()) {
 457             scheduleClose(sel, 3, SECONDS);
 458             long start = System.currentTimeMillis();
 459             int n = sel.select(k -> assertTrue(false), 60*1000);
 460             long duration = System.currentTimeMillis() - start;
 461             assertTrue(n == 0);
 462             assertTrue(duration > 2000 && duration < 10*1000,
 463                     "select took " + duration + " ms");
 464             assertFalse(sel.isOpen());
 465         }
 466     }
 467 
 468     /**
 469      * Test action closing selector
 470      */
 471     @Test(expectedExceptions = ClosedSelectorException.class)
 472     public void testActionClosingSelector() throws Exception {
 473         Pipe p = Pipe.open();
 474         try (Selector sel = Selector.open()) {
 475             Pipe.SourceChannel source = p.source();
 476             Pipe.SinkChannel sink = p.sink();
 477             source.configureBlocking(false);
 478             SelectionKey key = source.register(sel, SelectionKey.OP_READ);
 479 
 480             // write to sink to ensure that the source is readable
 481             sink.write(messageBuffer());
 482 
 483             // should relay ClosedSelectorException
 484             sel.select(k -> {
 485                 assertTrue(k == key);
 486                 try {
 487                     sel.close();
 488                 } catch (IOException ioe) { }
 489             });
 490         } finally {
 491             closePipe(p);
 492         }
 493     }
 494 
 495     /**
 496      * Test that the action is invoked while synchronized on the selector and
 497      * its selected-key set.
 498      */
 499     public void testLocks() throws Exception {
 500         Pipe p = Pipe.open();
 501         try (Selector sel = Selector.open()) {
 502             Pipe.SourceChannel source = p.source();
 503             Pipe.SinkChannel sink = p.sink();
 504             source.configureBlocking(false);
 505             SelectionKey key = source.register(sel, SelectionKey.OP_READ);
 506 
 507             // write to sink to ensure that the source is readable
 508             sink.write(messageBuffer());
 509 
 510             // select(Consumer)
 511             sel.select(k -> {
 512                 assertTrue(k == key);
 513                 assertTrue(Thread.holdsLock(sel));
 514                 assertFalse(Thread.holdsLock(sel.keys()));
 515                 assertTrue(Thread.holdsLock(sel.selectedKeys()));
 516             });
 517 
 518             // select(Consumer, timeout)
 519             sel.select(k -> {
 520                 assertTrue(k == key);
 521                 assertTrue(Thread.holdsLock(sel));
 522                 assertFalse(Thread.holdsLock(sel.keys()));
 523                 assertTrue(Thread.holdsLock(sel.selectedKeys()));
 524             }, 1000L);
 525 
 526             // selectNow(Consumer)
 527             sel.selectNow(k -> {
 528                 assertTrue(k == key);
 529                 assertTrue(Thread.holdsLock(sel));
 530                 assertFalse(Thread.holdsLock(sel.keys()));
 531                 assertTrue(Thread.holdsLock(sel.selectedKeys()));
 532             });
 533         } finally {
 534             closePipe(p);
 535         }
 536     }
 537 
 538     /**
 539      * Test that selection operations remove cancelled keys from the selector's
 540      * key and selected-key sets.
 541      */
 542     public void testCancel() throws Exception {
 543         Pipe p = Pipe.open();
 544         try (Selector sel = Selector.open()) {
 545             Pipe.SinkChannel sink = p.sink();
 546             Pipe.SourceChannel source = p.source();
 547 
 548             // write to sink to ensure that the source is readable
 549             sink.write(messageBuffer());
 550 
 551             sink.configureBlocking(false);
 552             source.configureBlocking(false);
 553             SelectionKey key1 = sink.register(sel, SelectionKey.OP_WRITE);
 554             SelectionKey key2 = source.register(sel, SelectionKey.OP_READ);
 555 
 556             sel.selectNow();
 557             assertTrue(sel.keys().contains(key1));
 558             assertTrue(sel.keys().contains(key2));
 559             assertTrue(sel.selectedKeys().contains(key1));
 560             assertTrue(sel.selectedKeys().contains(key2));
 561 
 562             // cancel key1
 563             key1.cancel();
 564             int n = sel.selectNow(k -> assertTrue(k == key2));
 565             assertTrue(n == 1);
 566             assertFalse(sel.keys().contains(key1));
 567             assertTrue(sel.keys().contains(key2));
 568             assertFalse(sel.selectedKeys().contains(key1));
 569             assertTrue(sel.selectedKeys().contains(key2));
 570 
 571             // cancel key2
 572             key2.cancel();
 573             n = sel.selectNow(k -> assertTrue(false));
 574             assertTrue(n == 0);
 575             assertFalse(sel.keys().contains(key1));
 576             assertFalse(sel.keys().contains(key2));
 577             assertFalse(sel.selectedKeys().contains(key1));
 578             assertFalse(sel.selectedKeys().contains(key2));
 579         } finally {
 580             closePipe(p);
 581         }
 582     }
 583 
 584     /**
 585      * Test an action invoking select()
 586      */
 587     public void testReentrantSelect1() throws Exception {
 588         Pipe p = Pipe.open();
 589         try (Selector sel = Selector.open()) {
 590             Pipe.SinkChannel sink = p.sink();
 591             Pipe.SourceChannel source = p.source();
 592             source.configureBlocking(false);
 593             source.register(sel, SelectionKey.OP_READ);
 594 
 595             // write to sink to ensure that the source is readable
 596             scheduleWrite(sink, messageBuffer(), 100, MILLISECONDS);
 597 
 598             int n = sel.select(k -> {
 599                 try {
 600                     sel.select();
 601                     assertTrue(false);
 602                 } catch (IOException ioe) {
 603                     throw new RuntimeException(ioe);
 604                 } catch (IllegalStateException expected) {
 605                 }
 606             });
 607             assertTrue(n == 1);
 608         } finally {
 609             closePipe(p);
 610         }
 611     }
 612 
 613     /**
 614      * Test an action invoking selectNow()
 615      */
 616     public void testReentrantSelect2() throws Exception {
 617         Pipe p = Pipe.open();
 618         try (Selector sel = Selector.open()) {
 619             Pipe.SinkChannel sink = p.sink();
 620             Pipe.SourceChannel source = p.source();
 621 
 622             // write to sink to ensure that the source is readable
 623             scheduleWrite(sink, messageBuffer(), 100, MILLISECONDS);
 624 
 625             source.configureBlocking(false);
 626             source.register(sel, SelectionKey.OP_READ);
 627             int n = sel.select(k -> {
 628                 try {
 629                     sel.selectNow();
 630                     assertTrue(false);
 631                 } catch (IOException ioe) {
 632                     throw new RuntimeException(ioe);
 633                 } catch (IllegalStateException expected) {
 634                 }
 635             });
 636             assertTrue(n == 1);
 637         } finally {
 638             closePipe(p);
 639         }
 640     }
 641 
 642     /**
 643      * Test an action invoking select(Consumer)
 644      */
 645     public void testReentrantSelect3() throws Exception {
 646         Pipe p = Pipe.open();
 647         try (Selector sel = Selector.open()) {
 648             Pipe.SinkChannel sink = p.sink();
 649             Pipe.SourceChannel source = p.source();
 650 
 651             // write to sink to ensure that the source is readable
 652             scheduleWrite(sink, messageBuffer(), 100, MILLISECONDS);
 653 
 654             source.configureBlocking(false);
 655             source.register(sel, SelectionKey.OP_READ);
 656             int n = sel.select(k -> {
 657                 try {
 658                     sel.select(x -> assertTrue(false));
 659                     assertTrue(false);
 660                 } catch (IOException ioe) {
 661                     throw new RuntimeException(ioe);
 662                 } catch (IllegalStateException expected) {
 663                 }
 664             });
 665             assertTrue(n == 1);
 666         } finally {
 667             closePipe(p);
 668         }
 669     }
 670 
 671     /**
 672      * Negative timeout
 673      */
 674     @Test(expectedExceptions = IllegalArgumentException.class)
 675     public void testNegativeTimeout() throws Exception {
 676         try (Selector sel = Selector.open()) {
 677             sel.select(k -> { }, -1L);
 678         }
 679     }
 680 
 681     /**
 682      * Null action
 683      */
 684     @Test(expectedExceptions = NullPointerException.class)
 685     public void testNull1() throws Exception {
 686         try (Selector sel = Selector.open()) {
 687             sel.select(null);
 688         }
 689     }
 690     @Test(expectedExceptions = NullPointerException.class)
 691     public void testNull2() throws Exception {
 692         try (Selector sel = Selector.open()) {
 693             sel.select(null, 1000);
 694         }
 695     }
 696     @Test(expectedExceptions = NullPointerException.class)
 697     public void testNull3() throws Exception {
 698         try (Selector sel = Selector.open()) {
 699             sel.selectNow(null);
 700         }
 701     }
 702 
 703 
 704     // -- support methods ---
 705 
 706     private final ScheduledExecutorService POOL = Executors.newScheduledThreadPool(1);
 707 
 708     @AfterTest
 709     void shutdownThreadPool() {
 710         POOL.shutdown();
 711     }
 712 
 713     void scheduleWakeup(Selector sel, long delay, TimeUnit unit) {
 714         POOL.schedule(() -> sel.wakeup(), delay, unit);
 715     }
 716 
 717     void scheduleInterrupt(Thread t, long delay, TimeUnit unit) {
 718         POOL.schedule(() -> t.interrupt(), delay, unit);
 719     }
 720 
 721     void scheduleClose(Closeable c, long delay, TimeUnit unit) {
 722         POOL.schedule(() -> {
 723             try {
 724                 c.close();
 725             } catch (IOException ioe) {
 726                 ioe.printStackTrace();
 727             }
 728         }, delay, unit);
 729     }
 730 
 731     void scheduleWrite(WritableByteChannel sink, ByteBuffer buf, long delay, TimeUnit unit) {
 732         POOL.schedule(() -> {
 733             try {
 734                 sink.write(buf);
 735             } catch (IOException ioe) {
 736                 ioe.printStackTrace();
 737             }
 738         }, delay, unit);
 739     }
 740 
 741     static void closePipe(Pipe p) {
 742         try { p.sink().close(); } catch (IOException ignore) { }
 743         try { p.source().close(); } catch (IOException ignore) { }
 744     }
 745 
 746     static ByteBuffer messageBuffer() {
 747         try {
 748             return ByteBuffer.wrap("message".getBytes("UTF-8"));
 749         } catch (Exception e) {
 750             throw new RuntimeException(e);
 751         }
 752     }
 753 }
--- EOF ---