1 /* 2 * Copyright (c) 2007, 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 /* 25 * @test 26 * @bug 6450200 27 * @summary Test proper handling of pool state changes 28 * @library /test/lib 29 * @run main/othervm ConfigChanges 30 * @key randomness 31 * @author Martin Buchholz 32 */ 33 34 import static java.util.concurrent.TimeUnit.MILLISECONDS; 35 import static java.util.concurrent.TimeUnit.MINUTES; 36 import static java.util.concurrent.TimeUnit.NANOSECONDS; 37 38 import java.security.Permission; 39 import java.util.Random; 40 import java.util.concurrent.ArrayBlockingQueue; 41 import java.util.concurrent.CyclicBarrier; 42 import java.util.concurrent.ExecutorService; 43 import java.util.concurrent.RejectedExecutionException; 44 import java.util.concurrent.ThreadFactory; 45 import java.util.concurrent.ThreadPoolExecutor; 46 import java.util.function.Supplier; 47 import jdk.test.lib.RandomFactory; 48 49 public class ConfigChanges { 50 static final ThreadGroup tg = new ThreadGroup("pool"); 51 52 static final Random rnd = RandomFactory.getRandom(); 53 54 static void report(ThreadPoolExecutor tpe) { 55 try { 56 System.out.printf( 57 "active=%d submitted=%d completed=%d queued=%d sizes=%d/%d/%d%n", 58 tg.activeCount(), 59 tpe.getTaskCount(), 60 tpe.getCompletedTaskCount(), 61 tpe.getQueue().size(), 62 tpe.getPoolSize(), 63 tpe.getCorePoolSize(), 64 tpe.getMaximumPoolSize()); 65 } catch (Throwable t) { unexpected(t); } 66 } 67 68 static void report(String label, ThreadPoolExecutor tpe) { 69 System.out.printf("%10s ", label); 70 report(tpe); 71 } 72 73 static class PermissiveSecurityManger extends SecurityManager { 74 public void checkPermission(Permission p) { /* bien sur, Monsieur */ } 75 } 76 77 static void checkShutdown(final ExecutorService es) { 78 final Runnable nop = new Runnable() {public void run() {}}; 79 try { 80 if (new Random().nextBoolean()) { 81 check(es.isShutdown()); 82 if (es instanceof ThreadPoolExecutor) 83 check(((ThreadPoolExecutor) es).isTerminating() 84 || es.isTerminated()); 85 THROWS(RejectedExecutionException.class, 86 () -> es.execute(nop)); 87 } 88 } catch (Throwable t) { unexpected(t); } 89 } 90 91 static void checkTerminated(final ThreadPoolExecutor tpe) { 92 try { 93 checkShutdown(tpe); 94 check(tpe.getQueue().isEmpty()); 95 check(tpe.isTerminated()); 96 check(! tpe.isTerminating()); 97 equal(0, tpe.getActiveCount()); 98 equal(0, tpe.getPoolSize()); 99 equal(tpe.getTaskCount(), tpe.getCompletedTaskCount()); 100 check(tpe.awaitTermination(0L, MINUTES)); 101 } catch (Throwable t) { unexpected(t); } 102 } 103 104 static Runnable waiter(final CyclicBarrier barrier) { 105 return new Runnable() { public void run() { 106 try { barrier.await(); barrier.await(); } 107 catch (Throwable t) { unexpected(t); }}}; 108 } 109 110 static volatile Runnable runnableDuJour; 111 112 static void awaitIdleness(ThreadPoolExecutor tpe, long taskCount) { 113 restart: for (;;) { 114 // check twice to make chance of race vanishingly small 115 for (int i = 0; i < 2; i++) { 116 if (tpe.getQueue().size() != 0 || 117 tpe.getActiveCount() != 0 || 118 tpe.getCompletedTaskCount() != taskCount) { 119 Thread.yield(); 120 continue restart; 121 } 122 } 123 return; 124 } 125 } 126 127 /** 128 * Waits for condition to become true, first spin-polling, then sleep-polling. 129 */ 130 static void spinAwait(Supplier<Boolean> waitingForGodot) { 131 for (int spins = 0; !waitingForGodot.get(); ) { 132 if ((spins = (spins + 1) & 3) > 0) { 133 Thread.yield(); 134 } else { 135 try { Thread.sleep(4); } 136 catch (InterruptedException unexpected) { 137 throw new AssertionError(unexpected); 138 } 139 } 140 } 141 } 142 143 private static void realMain(String[] args) throws Throwable { 144 if (rnd.nextBoolean()) 145 System.setSecurityManager(new PermissiveSecurityManger()); 146 147 final boolean prestart = rnd.nextBoolean(); 148 149 final Thread.UncaughtExceptionHandler handler 150 = new Thread.UncaughtExceptionHandler() { 151 public void uncaughtException(Thread t, Throwable e) { 152 check(! Thread.currentThread().isInterrupted()); 153 unexpected(e); 154 }}; 155 156 final int n = 3; 157 final ThreadPoolExecutor tpe 158 = new ThreadPoolExecutor(n, 3*n, 159 3L, MINUTES, 160 new ArrayBlockingQueue<Runnable>(3*n)); 161 tpe.setThreadFactory(new ThreadFactory() { 162 public Thread newThread(Runnable r) { 163 Thread t = new Thread(tg, r); 164 t.setUncaughtExceptionHandler(handler); 165 return t; 166 }}); 167 168 if (prestart) { 169 tpe.prestartAllCoreThreads(); 170 equal(n, tg.activeCount()); 171 equal(n, tpe.getCorePoolSize()); 172 equal(n, tpe.getLargestPoolSize()); 173 } 174 175 final Runnable runRunnableDuJour = 176 new Runnable() { public void run() { 177 // Delay choice of action till last possible moment. 178 runnableDuJour.run(); }}; 179 final CyclicBarrier pumpedUp = new CyclicBarrier(3*n + 1); 180 runnableDuJour = waiter(pumpedUp); 181 182 if (prestart) { 183 for (int i = 0; i < 1*n; i++) 184 tpe.execute(runRunnableDuJour); 185 // Wait for prestarted threads to dequeue their initial tasks. 186 while (! tpe.getQueue().isEmpty()) 187 Thread.sleep(1); 188 for (int i = 0; i < 5*n; i++) 189 tpe.execute(runRunnableDuJour); 190 } else { 191 for (int i = 0; i < 6*n; i++) 192 tpe.execute(runRunnableDuJour); 193 } 194 195 //report("submitted", tpe); 196 pumpedUp.await(); 197 equal(3*n, tg.activeCount()); 198 equal(3*n, tpe.getMaximumPoolSize()); 199 equal(3*n, tpe.getLargestPoolSize()); 200 equal(n, tpe.getCorePoolSize()); 201 equal(3*n, tpe.getActiveCount()); 202 equal(6L*n, tpe.getTaskCount()); 203 equal(0L, tpe.getCompletedTaskCount()); 204 205 //report("pumped up", tpe); 206 tpe.setMaximumPoolSize(4*n); 207 equal(4*n, tpe.getMaximumPoolSize()); 208 //report("pumped up2", tpe); 209 final CyclicBarrier pumpedUp2 = new CyclicBarrier(n + 1); 210 runnableDuJour = waiter(pumpedUp2); 211 for (int i = 0; i < 1*n; i++) 212 tpe.execute(runRunnableDuJour); 213 pumpedUp2.await(); 214 equal(4*n, tg.activeCount()); 215 equal(4*n, tpe.getMaximumPoolSize()); 216 equal(4*n, tpe.getLargestPoolSize()); 217 equal(4*n, tpe.getActiveCount()); 218 equal(7L*n, tpe.getTaskCount()); 219 equal(0L, tpe.getCompletedTaskCount()); 220 //report("pumped up2", tpe); 221 runnableDuJour = new Runnable() { public void run() {}}; 222 223 tpe.setMaximumPoolSize(2*n); 224 //report("after setMaximumPoolSize", tpe); 225 226 pumpedUp2.await(); 227 pumpedUp.await(); 228 229 spinAwait(() -> tg.activeCount() == 2*n); 230 equal(2*n, tpe.getMaximumPoolSize()); 231 equal(4*n, tpe.getLargestPoolSize()); 232 233 //report("draining", tpe); 234 awaitIdleness(tpe, 7L*n); 235 236 equal(2*n, tg.activeCount()); 237 equal(2*n, tpe.getMaximumPoolSize()); 238 equal(4*n, tpe.getLargestPoolSize()); 239 240 equal(7L*n, tpe.getTaskCount()); 241 equal(7L*n, tpe.getCompletedTaskCount()); 242 equal(0, tpe.getActiveCount()); 243 244 equal(3L, tpe.getKeepAliveTime(MINUTES)); 245 long t0 = System.nanoTime(); 246 tpe.setKeepAliveTime(7L, MILLISECONDS); 247 equal(7L, tpe.getKeepAliveTime(MILLISECONDS)); 248 spinAwait(() -> tg.activeCount() == n); 249 check(System.nanoTime() - t0 >= tpe.getKeepAliveTime(NANOSECONDS)); 250 251 //report("idle", tpe); 252 check(! tpe.allowsCoreThreadTimeOut()); 253 t0 = System.nanoTime(); 254 tpe.allowCoreThreadTimeOut(true); 255 check(tpe.allowsCoreThreadTimeOut()); 256 spinAwait(() -> tg.activeCount() == 0); 257 258 // The following assertion is almost always true, but may 259 // exceptionally not be during a transition from core count 260 // too high to allowCoreThreadTimeOut. Users will never 261 // notice, and we accept the small loss of testability. 262 // 263 // check(System.nanoTime() - t0 >= tpe.getKeepAliveTime(NANOSECONDS)); 264 265 //report("idle", tpe); 266 267 tpe.shutdown(); 268 checkShutdown(tpe); 269 check(tpe.awaitTermination(3L, MINUTES)); 270 checkTerminated(tpe); 271 } 272 273 //--------------------- Infrastructure --------------------------- 274 static volatile int passed = 0, failed = 0; 275 static void pass() {passed++;} 276 static void fail() {failed++; Thread.dumpStack();} 277 static void fail(String msg) {System.out.println(msg); fail();} 278 static void unexpected(Throwable t) {failed++; t.printStackTrace();} 279 static void check(boolean cond) {if (cond) pass(); else fail();} 280 static void equal(Object x, Object y) { 281 if (x == null ? y == null : x.equals(y)) pass(); 282 else fail(x + " not equal to " + y);} 283 public static void main(String[] args) throws Throwable { 284 try {realMain(args);} catch (Throwable t) {unexpected(t);} 285 System.out.printf("%nPassed = %d, failed = %d%n%n", passed, failed); 286 if (failed > 0) throw new AssertionError("Some tests failed");} 287 interface Fun {void f() throws Throwable;} 288 static void THROWS(Class<? extends Throwable> k, Fun... fs) { 289 for (Fun f : fs) 290 try { f.f(); fail("Expected " + k.getName() + " not thrown"); } 291 catch (Throwable t) { 292 if (k.isAssignableFrom(t.getClass())) pass(); 293 else unexpected(t);}} 294 }