1 /*
   2  * Copyright (c) 2002, 2019, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 /* @test
  25  * @bug 4460583 4470470 4840199 6419424 6710579 6596323 6824135 6395224 7142919
  26  *      8151582 8068693 8153209
  27  * @run main/othervm AsyncCloseAndInterrupt
  28  * @key intermittent
  29  * @summary Comprehensive test of asynchronous closing and interruption
  30  * @author Mark Reinhold
  31  */
  32 
  33 import java.io.*;
  34 import java.net.*;
  35 import java.nio.channels.*;
  36 import java.nio.ByteBuffer;
  37 import java.util.ArrayList;
  38 import java.util.List;
  39 import java.util.concurrent.ExecutorService;
  40 import java.util.concurrent.Executors;
  41 import java.util.concurrent.ThreadFactory;
  42 import java.util.concurrent.Callable;
  43 import java.util.concurrent.Future;
  44 import java.util.concurrent.TimeUnit;
  45 
  46 public class AsyncCloseAndInterrupt {
  47 
  48     static PrintStream log = System.err;
  49 
  50     static void sleep(int ms) {
  51         try {
  52             Thread.sleep(ms);
  53         } catch (InterruptedException x) { }
  54     }
  55 
  56     // Wildcard address localized to this machine -- Windoze doesn't allow
  57     // connecting to a server socket that was previously bound to a true
  58     // wildcard, namely new InetSocketAddress((InetAddress)null, 0).
  59     //
  60     private static InetSocketAddress wildcardAddress;
  61 
  62 
  63     // Server socket that blindly accepts all connections
  64 
  65     static ServerSocketChannel acceptor;
  66 
  67     private static void initAcceptor() throws IOException {
  68         acceptor = ServerSocketChannel.open();
  69         acceptor.socket().bind(wildcardAddress);
  70 
  71         Thread th = new Thread("Acceptor") {
  72                 public void run() {
  73                     try {
  74                         for (;;) {
  75                             SocketChannel sc = acceptor.accept();
  76                         }
  77                     } catch (IOException x) {
  78                         x.printStackTrace();
  79                     }
  80                 }
  81             };
  82 
  83         th.setDaemon(true);
  84         th.start();
  85     }
  86 
  87 
  88     // Server socket that refuses all connections
  89 
  90     static ServerSocketChannel refuser;
  91 
  92     private static void initRefuser() throws IOException {
  93         refuser = ServerSocketChannel.open();
  94         refuser.bind(wildcardAddress, 1);      // use minimum backlog
  95     }
  96 
  97     // Dead pipe source and sink
  98 
  99     static Pipe.SourceChannel deadSource;
 100     static Pipe.SinkChannel deadSink;
 101 
 102     private static void initPipes() throws IOException {
 103         if (deadSource != null)
 104             deadSource.close();
 105         deadSource = Pipe.open().source();
 106         if (deadSink != null)
 107             deadSink.close();
 108         deadSink = Pipe.open().sink();
 109     }
 110 
 111 
 112     // Files
 113 
 114     private static File fifoFile = null; // File that blocks on reads and writes
 115     private static File diskFile = null; // Disk file
 116 
 117     private static void initFile() throws Exception {
 118 
 119         diskFile = File.createTempFile("aci", ".tmp");
 120         diskFile.deleteOnExit();
 121         FileChannel fc = new FileOutputStream(diskFile).getChannel();
 122         buffer.clear();
 123         if (fc.write(buffer) != buffer.capacity())
 124             throw new RuntimeException("Cannot create disk file");
 125         fc.close();
 126 
 127         if (TestUtil.onWindows()) {
 128             log.println("WARNING: Cannot completely test FileChannels on Windows");
 129             return;
 130         }
 131         fifoFile = new File("x.fifo");
 132         if (fifoFile.exists()) {
 133             if (!fifoFile.delete())
 134                 throw new IOException("Cannot delete existing fifo " + fifoFile);
 135         }
 136         Process p = Runtime.getRuntime().exec("mkfifo " + fifoFile);
 137         if (p.waitFor() != 0)
 138             throw new IOException("Error creating fifo");
 139         new RandomAccessFile(fifoFile, "rw").close();
 140 
 141     }
 142 
 143 
 144     // Channel factories
 145 
 146     static abstract class ChannelFactory {
 147         private final String name;
 148         ChannelFactory(String name) {
 149             this.name = name;
 150         }
 151         public String toString() {
 152             return name;
 153         }
 154         abstract InterruptibleChannel create() throws IOException;
 155     }
 156 
 157     static ChannelFactory socketChannelFactory
 158         = new ChannelFactory("SocketChannel") {
 159                 InterruptibleChannel create() throws IOException {
 160                     return SocketChannel.open();
 161                 }
 162             };
 163 
 164     static ChannelFactory connectedSocketChannelFactory
 165         = new ChannelFactory("SocketChannel") {
 166                 InterruptibleChannel create() throws IOException {
 167                     SocketAddress sa = acceptor.socket().getLocalSocketAddress();
 168                     return SocketChannel.open(sa);
 169                 }
 170             };
 171 
 172     static ChannelFactory serverSocketChannelFactory
 173         = new ChannelFactory("ServerSocketChannel") {
 174                 InterruptibleChannel create() throws IOException {
 175                     ServerSocketChannel ssc = ServerSocketChannel.open();
 176                     ssc.socket().bind(wildcardAddress);
 177                     return ssc;
 178                 }
 179             };
 180 
 181     static ChannelFactory datagramChannelFactory
 182         = new ChannelFactory("DatagramChannel") {
 183                 InterruptibleChannel create() throws IOException {
 184                     DatagramChannel dc = DatagramChannel.open();
 185                     InetAddress lb = InetAddress.getLoopbackAddress();
 186                     dc.bind(new InetSocketAddress(lb, 0));
 187                     dc.connect(new InetSocketAddress(lb, 80));
 188                     return dc;
 189                 }
 190             };
 191 
 192     static ChannelFactory pipeSourceChannelFactory
 193         = new ChannelFactory("Pipe.SourceChannel") {
 194                 InterruptibleChannel create() throws IOException {
 195                     // ## arrange to close sink
 196                     return Pipe.open().source();
 197                 }
 198             };
 199 
 200     static ChannelFactory pipeSinkChannelFactory
 201         = new ChannelFactory("Pipe.SinkChannel") {
 202                 InterruptibleChannel create() throws IOException {
 203                     // ## arrange to close source
 204                     return Pipe.open().sink();
 205                 }
 206             };
 207 
 208     static ChannelFactory fifoFileChannelFactory
 209         = new ChannelFactory("FileChannel") {
 210                 InterruptibleChannel create() throws IOException {
 211                     return new RandomAccessFile(fifoFile, "rw").getChannel();
 212                 }
 213             };
 214 
 215     static ChannelFactory diskFileChannelFactory
 216         = new ChannelFactory("FileChannel") {
 217                 InterruptibleChannel create() throws IOException {
 218                     return new RandomAccessFile(diskFile, "rw").getChannel();
 219                 }
 220             };
 221 
 222 
 223     // I/O operations
 224 
 225     static abstract class Op {
 226         private final String name;
 227         protected Op(String name) {
 228             this.name = name;
 229         }
 230         abstract void doIO(InterruptibleChannel ich) throws IOException;
 231         void setup() throws IOException { }
 232         public String toString() { return name; }
 233     }
 234 
 235     static ByteBuffer buffer = ByteBuffer.allocateDirect(1 << 20);
 236 
 237     static ByteBuffer[] buffers = new ByteBuffer[] {
 238         ByteBuffer.allocateDirect(1 << 19),
 239         ByteBuffer.allocateDirect(1 << 19)
 240     };
 241 
 242     static void clearBuffers() {
 243         buffers[0].clear();
 244         buffers[1].clear();
 245     }
 246 
 247     static void show(Channel ch) {
 248         log.print("Channel " + (ch.isOpen() ? "open" : "closed"));
 249         if (ch.isOpen() && (ch instanceof SocketChannel)) {
 250             SocketChannel sc = (SocketChannel)ch;
 251             if (sc.socket().isInputShutdown())
 252                 log.print(", input shutdown");
 253             if (sc.socket().isOutputShutdown())
 254                 log.print(", output shutdown");
 255         }
 256         log.println();
 257     }
 258 
 259     static final Op READ = new Op("read") {
 260             void doIO(InterruptibleChannel ich) throws IOException {
 261                 ReadableByteChannel rbc = (ReadableByteChannel)ich;
 262                 buffer.clear();
 263                 int n = rbc.read(buffer);
 264                 log.println("Read returned " + n);
 265                 show(rbc);
 266                 if     (rbc.isOpen()
 267                         && (n == -1)
 268                         && (rbc instanceof SocketChannel)
 269                         && ((SocketChannel)rbc).socket().isInputShutdown()) {
 270                     return;
 271                 }
 272                 throw new RuntimeException("Read succeeded");
 273             }
 274         };
 275 
 276     static final Op READV = new Op("readv") {
 277             void doIO(InterruptibleChannel ich) throws IOException {
 278                 ScatteringByteChannel sbc = (ScatteringByteChannel)ich;
 279                 clearBuffers();
 280                 int n = (int)sbc.read(buffers);
 281                 log.println("Read returned " + n);
 282                 show(sbc);
 283                 if     (sbc.isOpen()
 284                         && (n == -1)
 285                         && (sbc instanceof SocketChannel)
 286                         && ((SocketChannel)sbc).socket().isInputShutdown()) {
 287                     return;
 288                 }
 289                 throw new RuntimeException("Read succeeded");
 290             }
 291         };
 292 
 293     static final Op RECEIVE = new Op("receive") {
 294             void doIO(InterruptibleChannel ich) throws IOException {
 295                 DatagramChannel dc = (DatagramChannel)ich;
 296                 buffer.clear();
 297                 dc.receive(buffer);
 298                 show(dc);
 299                 throw new RuntimeException("Read succeeded");
 300             }
 301         };
 302 
 303     static final Op WRITE = new Op("write") {
 304             void doIO(InterruptibleChannel ich) throws IOException {
 305 
 306                 WritableByteChannel wbc = (WritableByteChannel)ich;
 307 
 308                 SocketChannel sc = null;
 309                 if (wbc instanceof SocketChannel)
 310                     sc = (SocketChannel)wbc;
 311 
 312                 int n = 0;
 313                 for (;;) {
 314                     buffer.clear();
 315                     int d = wbc.write(buffer);
 316                     n += d;
 317                     if (!wbc.isOpen())
 318                         break;
 319                     if ((sc != null) && sc.socket().isOutputShutdown())
 320                         break;
 321                 }
 322                 log.println("Wrote " + n + " bytes");
 323                 show(wbc);
 324             }
 325         };
 326 
 327     static final Op WRITEV = new Op("writev") {
 328             void doIO(InterruptibleChannel ich) throws IOException {
 329 
 330                 GatheringByteChannel gbc = (GatheringByteChannel)ich;
 331 
 332                 SocketChannel sc = null;
 333                 if (gbc instanceof SocketChannel)
 334                     sc = (SocketChannel)gbc;
 335 
 336                 int n = 0;
 337                 for (;;) {
 338                     clearBuffers();
 339                     int d = (int)gbc.write(buffers);
 340                     n += d;
 341                     if (!gbc.isOpen())
 342                         break;
 343                     if ((sc != null) && sc.socket().isOutputShutdown())
 344                         break;
 345                 }
 346                 log.println("Wrote " + n + " bytes");
 347                 show(gbc);
 348 
 349             }
 350         };
 351 
 352     static final Op CONNECT = new Op("connect") {
 353             void setup() {
 354                 waitPump("connect waiting for pumping refuser ...");
 355             }
 356             void doIO(InterruptibleChannel ich) throws IOException {
 357                 SocketChannel sc = (SocketChannel)ich;
 358                 if (sc.connect(refuser.socket().getLocalSocketAddress()))
 359                     throw new RuntimeException("Connection succeeded");
 360                 throw new RuntimeException("Connection did not block");
 361             }
 362         };
 363 
 364     static final Op FINISH_CONNECT = new Op("finishConnect") {
 365             void setup() {
 366                 waitPump("finishConnect waiting for pumping refuser ...");
 367             }
 368             void doIO(InterruptibleChannel ich) throws IOException {
 369                 SocketChannel sc = (SocketChannel)ich;
 370                 sc.configureBlocking(false);
 371                 SocketAddress sa = refuser.socket().getLocalSocketAddress();
 372                 if (sc.connect(sa))
 373                     throw new RuntimeException("Connection succeeded");
 374                 sc.configureBlocking(true);
 375                 if (sc.finishConnect())
 376                     throw new RuntimeException("Connection succeeded");
 377                 throw new RuntimeException("Connection did not block");
 378             }
 379         };
 380 
 381     static final Op ACCEPT = new Op("accept") {
 382             void doIO(InterruptibleChannel ich) throws IOException {
 383                 ServerSocketChannel ssc = (ServerSocketChannel)ich;
 384                 ssc.accept();
 385                 throw new RuntimeException("Accept succeeded");
 386             }
 387         };
 388 
 389     // Use only with diskFileChannelFactory
 390     static final Op TRANSFER_TO = new Op("transferTo") {
 391             void doIO(InterruptibleChannel ich) throws IOException {
 392                 FileChannel fc = (FileChannel)ich;
 393                 long n = fc.transferTo(0, fc.size(), deadSink);
 394                 log.println("Transferred " + n + " bytes");
 395                 show(fc);
 396             }
 397         };
 398 
 399     // Use only with diskFileChannelFactory
 400     static final Op TRANSFER_FROM = new Op("transferFrom") {
 401             void doIO(InterruptibleChannel ich) throws IOException {
 402                 FileChannel fc = (FileChannel)ich;
 403                 long n = fc.transferFrom(deadSource, 0, 1 << 20);
 404                 log.println("Transferred " + n + " bytes");
 405                 show(fc);
 406             }
 407         };
 408 
 409 
 410 
 411     // Test modes
 412 
 413     static final int TEST_PREINTR = 0;  // Interrupt thread before I/O
 414     static final int TEST_INTR = 1;     // Interrupt thread during I/O
 415     static final int TEST_CLOSE = 2;    // Close channel during I/O
 416     static final int TEST_SHUTI = 3;    // Shutdown input during I/O
 417     static final int TEST_SHUTO = 4;    // Shutdown output during I/O
 418 
 419     static final String[] testName = new String[] {
 420         "pre-interrupt", "interrupt", "close",
 421         "shutdown-input", "shutdown-output"
 422     };
 423 
 424 
 425     static class Tester extends TestThread {
 426 
 427         private InterruptibleChannel ch;
 428         private Op op;
 429         private int test;
 430         volatile boolean ready = false;
 431 
 432         protected Tester(ChannelFactory cf, InterruptibleChannel ch,
 433                          Op op, int test)
 434         {
 435             super(cf + "/" + op + "/" + testName[test]);
 436             this.ch = ch;
 437             this.op = op;
 438             this.test = test;
 439         }
 440 
 441         @SuppressWarnings("fallthrough")
 442         private void caught(Channel ch, IOException x) {
 443             String xn = x.getClass().getName();
 444             switch (test) {
 445 
 446             case TEST_PREINTR:
 447             case TEST_INTR:
 448                 if (!xn.equals("java.nio.channels.ClosedByInterruptException"))
 449                     throw new RuntimeException("Wrong exception thrown: " + x);
 450                 break;
 451 
 452             case TEST_CLOSE:
 453             case TEST_SHUTO:
 454                 if (!xn.equals("java.nio.channels.AsynchronousCloseException"))
 455                     throw new RuntimeException("Wrong exception thrown: " + x);
 456                 break;
 457 
 458             case TEST_SHUTI:
 459                 if (TestUtil.onWindows())
 460                     break;
 461                 // FALL THROUGH
 462 
 463             default:
 464                 throw new Error(x);
 465             }
 466 
 467             if (ch.isOpen()) {
 468                 if (test == TEST_SHUTO) {
 469                     SocketChannel sc = (SocketChannel)ch;
 470                     if (!sc.socket().isOutputShutdown())
 471                         throw new RuntimeException("Output not shutdown");
 472                 } else if ((test == TEST_INTR) && (op == TRANSFER_FROM)) {
 473                     // Let this case pass -- CBIE applies to other channel
 474                 } else {
 475                     throw new RuntimeException("Channel still open");
 476                 }
 477             }
 478 
 479             log.println("Thrown as expected: " + x);
 480         }
 481 
 482         final void go() throws Exception {
 483             if (test == TEST_PREINTR)
 484                 Thread.currentThread().interrupt();
 485             ready = true;
 486             try {
 487                 op.doIO(ch);
 488             } catch (ClosedByInterruptException x) {
 489                 caught(ch, x);
 490             } catch (AsynchronousCloseException x) {
 491                 caught(ch, x);
 492             } finally {
 493                 ch.close();
 494             }
 495         }
 496 
 497     }
 498 
 499     private static volatile boolean pumpDone = false;
 500     private static volatile boolean pumpReady = false;
 501 
 502     private static void waitPump(String msg){
 503         log.println(msg);
 504         while (!pumpReady){
 505             sleep(200);
 506         }
 507         log.println(msg + " done");
 508     }
 509 
 510     // Create a pump thread dedicated to saturate refuser's connection backlog
 511     private static Future<Integer> pumpRefuser(ExecutorService pumperExecutor) {
 512 
 513         Callable<Integer> pumpTask = new Callable<Integer>() {
 514 
 515             @Override
 516             public Integer call() throws IOException {
 517                 // Can't reliably saturate connection backlog on Windows Server editions
 518                 assert !TestUtil.onWindows();
 519                 log.println("Start pumping refuser ...");
 520                 List<SocketChannel> refuserClients = new ArrayList<>();
 521 
 522                 // Saturate the refuser's connection backlog so that further connection
 523                 // attempts will be blocked
 524                 pumpReady = false;
 525                 while (!pumpDone) {
 526                     SocketChannel sc = SocketChannel.open();
 527                     sc.configureBlocking(false);
 528                     boolean connected = sc.connect(refuser.socket().getLocalSocketAddress());
 529 
 530                     // Assume that the connection backlog is saturated if a
 531                     // client cannot connect to the refuser within 50 milliseconds
 532                     long start = System.currentTimeMillis();
 533                     while (!pumpReady && !connected
 534                             && (System.currentTimeMillis() - start < 50)) {
 535                         connected = sc.finishConnect();
 536                     }
 537 
 538                     if (connected) {
 539                         // Retain so that finalizer doesn't close
 540                         refuserClients.add(sc);
 541                     } else {
 542                         sc.close();
 543                         pumpReady = true;
 544                     }
 545                 }
 546 
 547                 for (SocketChannel sc : refuserClients) {
 548                     sc.close();
 549                 }
 550                 refuser.close();
 551 
 552                 log.println("Stop pumping refuser ...");
 553                 return refuserClients.size();
 554             }
 555         };
 556 
 557         return pumperExecutor.submit(pumpTask);
 558     }
 559 
 560     // Test
 561     static void test(ChannelFactory cf, Op op, int test) throws Exception {
 562         test(cf, op, test, true);
 563     }
 564 
 565     static void test(ChannelFactory cf, Op op, int test, boolean extraSleep)
 566         throws Exception
 567     {
 568         log.println();
 569         initPipes();
 570         InterruptibleChannel ch = cf.create();
 571         Tester t = new Tester(cf, ch, op, test);
 572         log.println(t);
 573         op.setup();
 574         t.start();
 575         do {
 576             sleep(50);
 577         } while (!t.ready);
 578 
 579         if (extraSleep) {
 580             sleep(100);
 581         }
 582 
 583         switch (test) {
 584 
 585         case TEST_INTR:
 586             t.interrupt();
 587             break;
 588 
 589         case TEST_CLOSE:
 590             ch.close();
 591             break;
 592 
 593         case TEST_SHUTI:
 594             if (TestUtil.onWindows()) {
 595                 log.println("WARNING: Asynchronous shutdown not working on Windows");
 596                 ch.close();
 597             } else {
 598                 ((SocketChannel)ch).socket().shutdownInput();
 599             }
 600             break;
 601 
 602         case TEST_SHUTO:
 603             if (TestUtil.onWindows()) {
 604                 log.println("WARNING: Asynchronous shutdown not working on Windows");
 605                 ch.close();
 606             } else {
 607                 ((SocketChannel)ch).socket().shutdownOutput();
 608             }
 609             break;
 610 
 611         default:
 612             break;
 613         }
 614 
 615         t.finishAndThrow(10000);
 616     }
 617 
 618     static void test(ChannelFactory cf, Op op) throws Exception {
 619         test(cf, op, true);
 620     }
 621 
 622     static void test(ChannelFactory cf, Op op, boolean extraSleep) throws Exception {
 623         // Test INTR cases before PREINTER cases since sometimes
 624         // interrupted threads can't load classes
 625         test(cf, op, TEST_INTR, extraSleep);
 626         test(cf, op, TEST_PREINTR, extraSleep);
 627 
 628         // Bugs, see FileChannelImpl for details
 629         if (op == TRANSFER_FROM) {
 630             log.println("WARNING: transferFrom/close not tested");
 631             return;
 632         }
 633         if ((op == TRANSFER_TO) && !TestUtil.onWindows()) {
 634             log.println("WARNING: transferTo/close not tested");
 635             return;
 636         }
 637 
 638         test(cf, op, TEST_CLOSE, extraSleep);
 639     }
 640 
 641     static void test(ChannelFactory cf)
 642         throws Exception
 643     {
 644         InterruptibleChannel ch = cf.create(); // Sample channel
 645         ch.close();
 646 
 647         if (ch instanceof ReadableByteChannel) {
 648             test(cf, READ);
 649             if (ch instanceof SocketChannel)
 650                 test(cf, READ, TEST_SHUTI);
 651         }
 652 
 653         if (ch instanceof ScatteringByteChannel) {
 654             test(cf, READV);
 655             if (ch instanceof SocketChannel)
 656                 test(cf, READV, TEST_SHUTI);
 657         }
 658 
 659         if (ch instanceof DatagramChannel) {
 660             test(cf, RECEIVE);
 661 
 662             // Return here: We can't effectively test writes since, if they
 663             // block, they do so only for a fleeting moment unless the network
 664             // interface is overloaded.
 665             return;
 666 
 667         }
 668 
 669         if (ch instanceof WritableByteChannel) {
 670             test(cf, WRITE);
 671             if (ch instanceof SocketChannel)
 672                 test(cf, WRITE, TEST_SHUTO);
 673         }
 674 
 675         if (ch instanceof GatheringByteChannel) {
 676             test(cf, WRITEV);
 677             if (ch instanceof SocketChannel)
 678                 test(cf, WRITEV, TEST_SHUTO);
 679         }
 680 
 681     }
 682 
 683     public static void main(String[] args) throws Exception {
 684 
 685         wildcardAddress = new InetSocketAddress(InetAddress.getLocalHost(), 0);
 686         initAcceptor();
 687         if (!TestUtil.onWindows())
 688             initRefuser();
 689         initPipes();
 690         initFile();
 691 
 692         if (TestUtil.onWindows()) {
 693             log.println("WARNING: Cannot test FileChannel transfer operations"
 694                         + " on Windows");
 695         } else {
 696             test(diskFileChannelFactory, TRANSFER_TO);
 697             test(diskFileChannelFactory, TRANSFER_FROM);
 698         }
 699         if (fifoFile != null)
 700             test(fifoFileChannelFactory);
 701 
 702         // Testing positional file reads and writes is impractical: It requires
 703         // access to a large file soft-mounted via NFS, and even then isn't
 704         // completely guaranteed to work.
 705         //
 706         // Testing map is impractical and arguably unnecessary: It's
 707         // unclear under what conditions mmap(2) will actually block.
 708 
 709         test(connectedSocketChannelFactory);
 710 
 711         if (TestUtil.onWindows() || TestUtil.onSolaris()) {
 712             log.println("WARNING Cannot reliably test connect/finishConnect"
 713                 + " operations on this platform");
 714         } else {
 715             // Only the following tests need refuser's connection backlog
 716             // to be saturated
 717             ExecutorService pumperExecutor =
 718                     Executors.newSingleThreadExecutor(
 719                     new ThreadFactory() {
 720 
 721                         @Override
 722                         public Thread newThread(Runnable r) {
 723                             Thread t = new Thread(r);
 724                             t.setDaemon(true);
 725                             t.setName("Pumper");
 726                             return t;
 727                         }
 728                     });
 729 
 730             pumpDone = false;
 731             try {
 732                 Future<Integer> pumpFuture = pumpRefuser(pumperExecutor);
 733                 waitPump("\nWait for initial Pump");
 734 
 735                 test(socketChannelFactory, CONNECT, false);
 736                 test(socketChannelFactory, FINISH_CONNECT, false);
 737 
 738                 pumpDone = true;
 739                 Integer newConn = pumpFuture.get(30, TimeUnit.SECONDS);
 740                 log.println("Pump " + newConn + " connections.");
 741             } finally {
 742                 pumperExecutor.shutdown();
 743             }
 744         }
 745 
 746         test(serverSocketChannelFactory, ACCEPT);
 747         test(datagramChannelFactory);
 748         test(pipeSourceChannelFactory);
 749         test(pipeSinkChannelFactory);
 750     }
 751 }