1 /*
   2  * Copyright (c) 2008, 2012, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 /* @test
  25  * @bug 4607272 6842687
  26  * @summary Unit test for AsynchronousChannelGroup
  27  * @key randomness
  28  */
  29 
  30 import java.nio.channels.*;
  31 import java.net.*;
  32 import java.util.*;
  33 import java.util.concurrent.*;
  34 import java.util.concurrent.atomic.*;
  35 import java.io.IOException;
  36 
  37 /**
  38  * Exercise replacement of threads in the thread pool when completion handlers
  39  * terminate due to errors or runtime exceptions.
  40  */
  41 
  42 public class Restart {
  43     static final Random rand = new Random();
  44 
  45     public static void main(String[] args) throws Exception {
  46         // thread group for thread pools
  47         final ThreadGroup tg = new ThreadGroup("test");
  48 
  49         // keep track of the number of threads that terminate
  50         final AtomicInteger exceptionCount = new AtomicInteger(0);
  51         final Thread.UncaughtExceptionHandler ueh =
  52             new Thread.UncaughtExceptionHandler() {
  53                 public void uncaughtException(Thread t, Throwable e) {
  54                     exceptionCount.incrementAndGet();
  55                 }
  56             };
  57         ThreadFactory factory = new ThreadFactory() {
  58             @Override
  59             public Thread newThread(Runnable r) {
  60                 Thread t = new Thread(tg, r);
  61                 t.setUncaughtExceptionHandler(ueh);
  62                 return t;
  63             }
  64         };
  65 
  66         // group with fixed thread pool
  67         int nThreads = 1 + rand.nextInt(4);
  68         AsynchronousChannelGroup group =
  69             AsynchronousChannelGroup.withFixedThreadPool(nThreads, factory);
  70         testRestart(group, 100);
  71         group.shutdown();
  72 
  73         // group with cached thread pool
  74         ExecutorService pool = Executors.newCachedThreadPool(factory);
  75         group = AsynchronousChannelGroup.withCachedThreadPool(pool, rand.nextInt(5));
  76         testRestart(group, 100);
  77         group.shutdown();
  78 
  79         // group with custom thread pool
  80         group = AsynchronousChannelGroup
  81                 .withThreadPool(Executors.newFixedThreadPool(1+rand.nextInt(5), factory));
  82         testRestart(group, 100);
  83         group.shutdown();
  84 
  85         // give time for threads to terminate
  86         Thread.sleep(3000);
  87         int actual = exceptionCount.get();
  88         if (actual != 300)
  89             throw new RuntimeException(actual + " exceptions, expected: " + 300);
  90     }
  91 
  92     static void testRestart(AsynchronousChannelGroup group, int count)
  93         throws Exception
  94     {
  95         AsynchronousServerSocketChannel listener =
  96             AsynchronousServerSocketChannel.open(group)
  97                 .bind(new InetSocketAddress(0));
  98 
  99         for (int i=0; i<count; i++) {
 100             final CountDownLatch latch = new CountDownLatch(1);
 101 
 102             listener.accept((Void)null, new CompletionHandler<AsynchronousSocketChannel,Void>() {
 103                 public void completed(AsynchronousSocketChannel ch, Void att) {
 104                     try {
 105                         ch.close();
 106                     } catch (IOException ignore) { }
 107 
 108                     latch.countDown();
 109 
 110                     // throw error or runtime exception
 111                     if (rand.nextBoolean()) {
 112                         throw new Error();
 113                     } else {
 114                         throw new RuntimeException();
 115                     }
 116                 }
 117                 public void failed(Throwable exc, Void att) {
 118                 }
 119             });
 120 
 121             // establish loopback connection which should cause completion
 122             // handler to be invoked.
 123             int port = ((InetSocketAddress)(listener.getLocalAddress())).getPort();
 124             AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 125             InetAddress lh = InetAddress.getLocalHost();
 126             ch.connect(new InetSocketAddress(lh, port)).get();
 127             ch.close();
 128 
 129             // wait for handler to be invoked
 130             latch.await();
 131         }
 132 
 133         // clean-up
 134         listener.close();
 135     }
 136 }