1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. 7 * 8 * This code is distributed in the hope that it will be useful, but WITHOUT 9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 11 * version 2 for more details (a copy is included in the LICENSE file that 12 * accompanied this code). 13 * 14 * You should have received a copy of the GNU General Public License version 15 * 2 along with this work; if not, write to the Free Software Foundation, 16 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 17 * 18 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 19 * or visit www.oracle.com if you need additional information or have any 20 * questions. 21 */ 22 23 /* 24 * This file is available under and governed by the GNU General Public 25 * License version 2 only, as published by the Free Software Foundation. 26 * However, the following notice accompanied the original version of this 27 * file: 28 * 29 * Written by Doug Lea with assistance from members of JCP JSR-166 30 * Expert Group and released to the public domain, as explained at 31 * http://creativecommons.org/publicdomain/zero/1.0/ 32 * Other contributors include Andrew Wright, Jeffrey Hayes, 33 * Pat Fisher, Mike Judd. 34 */ 35 36 import static java.util.concurrent.TimeUnit.MILLISECONDS; 37 38 import java.util.concurrent.BrokenBarrierException; 39 import java.util.concurrent.CountDownLatch; 40 import java.util.concurrent.CyclicBarrier; 41 import java.util.concurrent.ExecutorService; 42 import java.util.concurrent.Executors; 43 import java.util.concurrent.ThreadLocalRandom; 44 import java.util.concurrent.TimeoutException; 45 import java.util.concurrent.atomic.AtomicBoolean; 46 import java.util.concurrent.atomic.AtomicInteger; 47 48 import junit.framework.Test; 49 import junit.framework.TestSuite; 50 51 public class CyclicBarrierTest extends JSR166TestCase { 52 public static void main(String[] args) { 53 main(suite(), args); 54 } 55 public static Test suite() { 56 return new TestSuite(CyclicBarrierTest.class); 57 } 58 59 /** 60 * Spin-waits till the number of waiters == numberOfWaiters. 61 */ 62 void awaitNumberWaiting(CyclicBarrier barrier, int numberOfWaiters) { 63 long startTime = System.nanoTime(); 64 while (barrier.getNumberWaiting() != numberOfWaiters) { 65 if (millisElapsedSince(startTime) > LONG_DELAY_MS) 66 fail("timed out"); 67 Thread.yield(); 68 } 69 } 70 71 /** 72 * Creating with negative parties throws IllegalArgumentException 73 */ 74 public void testConstructor1() { 75 try { 76 new CyclicBarrier(-1, (Runnable)null); 77 shouldThrow(); 78 } catch (IllegalArgumentException success) {} 79 } 80 81 /** 82 * Creating with negative parties and no action throws 83 * IllegalArgumentException 84 */ 85 public void testConstructor2() { 86 try { 87 new CyclicBarrier(-1); 88 shouldThrow(); 89 } catch (IllegalArgumentException success) {} 90 } 91 92 /** 93 * getParties returns the number of parties given in constructor 94 */ 95 public void testGetParties() { 96 CyclicBarrier b = new CyclicBarrier(2); 97 assertEquals(2, b.getParties()); 98 assertEquals(0, b.getNumberWaiting()); 99 } 100 101 /** 102 * A 1-party barrier triggers after single await 103 */ 104 public void testSingleParty() throws Exception { 105 CyclicBarrier b = new CyclicBarrier(1); 106 assertEquals(1, b.getParties()); 107 assertEquals(0, b.getNumberWaiting()); 108 b.await(); 109 b.await(); 110 assertEquals(0, b.getNumberWaiting()); 111 } 112 113 /** 114 * The supplied barrier action is run at barrier 115 */ 116 public void testBarrierAction() throws Exception { 117 final AtomicInteger count = new AtomicInteger(0); 118 final Runnable incCount = new Runnable() { public void run() { 119 count.getAndIncrement(); }}; 120 CyclicBarrier b = new CyclicBarrier(1, incCount); 121 assertEquals(1, b.getParties()); 122 assertEquals(0, b.getNumberWaiting()); 123 b.await(); 124 b.await(); 125 assertEquals(0, b.getNumberWaiting()); 126 assertEquals(2, count.get()); 127 } 128 129 /** 130 * A 2-party/thread barrier triggers after both threads invoke await 131 */ 132 public void testTwoParties() throws Exception { 133 final CyclicBarrier b = new CyclicBarrier(2); 134 Thread t = newStartedThread(new CheckedRunnable() { 135 public void realRun() throws Exception { 136 b.await(); 137 b.await(); 138 b.await(); 139 b.await(); 140 }}); 141 142 b.await(); 143 b.await(); 144 b.await(); 145 b.await(); 146 awaitTermination(t); 147 } 148 149 /** 150 * An interruption in one party causes others waiting in await to 151 * throw BrokenBarrierException 152 */ 153 public void testAwait1_Interrupted_BrokenBarrier() { 154 final CyclicBarrier c = new CyclicBarrier(3); 155 final CountDownLatch pleaseInterrupt = new CountDownLatch(2); 156 Thread t1 = new ThreadShouldThrow(InterruptedException.class) { 157 public void realRun() throws Exception { 158 pleaseInterrupt.countDown(); 159 c.await(); 160 }}; 161 Thread t2 = new ThreadShouldThrow(BrokenBarrierException.class) { 162 public void realRun() throws Exception { 163 pleaseInterrupt.countDown(); 164 c.await(); 165 }}; 166 167 t1.start(); 168 t2.start(); 169 await(pleaseInterrupt); 170 t1.interrupt(); 171 awaitTermination(t1); 172 awaitTermination(t2); 173 } 174 175 /** 176 * An interruption in one party causes others waiting in timed await to 177 * throw BrokenBarrierException 178 */ 179 public void testAwait2_Interrupted_BrokenBarrier() throws Exception { 180 final CyclicBarrier c = new CyclicBarrier(3); 181 final CountDownLatch pleaseInterrupt = new CountDownLatch(2); 182 Thread t1 = new ThreadShouldThrow(InterruptedException.class) { 183 public void realRun() throws Exception { 184 pleaseInterrupt.countDown(); 185 c.await(LONG_DELAY_MS, MILLISECONDS); 186 }}; 187 Thread t2 = new ThreadShouldThrow(BrokenBarrierException.class) { 188 public void realRun() throws Exception { 189 pleaseInterrupt.countDown(); 190 c.await(LONG_DELAY_MS, MILLISECONDS); 191 }}; 192 193 t1.start(); 194 t2.start(); 195 await(pleaseInterrupt); 196 t1.interrupt(); 197 awaitTermination(t1); 198 awaitTermination(t2); 199 } 200 201 /** 202 * A timeout in timed await throws TimeoutException 203 */ 204 public void testAwait3_TimeoutException() throws InterruptedException { 205 final CyclicBarrier c = new CyclicBarrier(2); 206 Thread t = newStartedThread(new CheckedRunnable() { 207 public void realRun() throws Exception { 208 long startTime = System.nanoTime(); 209 try { 210 c.await(timeoutMillis(), MILLISECONDS); 211 shouldThrow(); 212 } catch (TimeoutException success) {} 213 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 214 }}); 215 216 awaitTermination(t); 217 } 218 219 /** 220 * A timeout in one party causes others waiting in timed await to 221 * throw BrokenBarrierException 222 */ 223 public void testAwait4_Timeout_BrokenBarrier() throws InterruptedException { 224 final CyclicBarrier c = new CyclicBarrier(3); 225 Thread t1 = newStartedThread(new CheckedRunnable() { 226 public void realRun() throws Exception { 227 try { 228 c.await(LONG_DELAY_MS, MILLISECONDS); 229 shouldThrow(); 230 } catch (BrokenBarrierException success) {} 231 }}); 232 Thread t2 = newStartedThread(new CheckedRunnable() { 233 public void realRun() throws Exception { 234 awaitNumberWaiting(c, 1); 235 long startTime = System.nanoTime(); 236 try { 237 c.await(timeoutMillis(), MILLISECONDS); 238 shouldThrow(); 239 } catch (TimeoutException success) {} 240 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 241 }}); 242 243 awaitTermination(t1); 244 awaitTermination(t2); 245 } 246 247 /** 248 * A timeout in one party causes others waiting in await to 249 * throw BrokenBarrierException 250 */ 251 public void testAwait5_Timeout_BrokenBarrier() throws InterruptedException { 252 final CyclicBarrier c = new CyclicBarrier(3); 253 Thread t1 = newStartedThread(new CheckedRunnable() { 254 public void realRun() throws Exception { 255 try { 256 c.await(); 257 shouldThrow(); 258 } catch (BrokenBarrierException success) {} 259 }}); 260 Thread t2 = newStartedThread(new CheckedRunnable() { 261 public void realRun() throws Exception { 262 awaitNumberWaiting(c, 1); 263 long startTime = System.nanoTime(); 264 try { 265 c.await(timeoutMillis(), MILLISECONDS); 266 shouldThrow(); 267 } catch (TimeoutException success) {} 268 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 269 }}); 270 271 awaitTermination(t1); 272 awaitTermination(t2); 273 } 274 275 /** 276 * A reset of an active barrier causes waiting threads to throw 277 * BrokenBarrierException 278 */ 279 public void testReset_BrokenBarrier() throws InterruptedException { 280 final CyclicBarrier c = new CyclicBarrier(3); 281 final CountDownLatch pleaseReset = new CountDownLatch(2); 282 Thread t1 = new ThreadShouldThrow(BrokenBarrierException.class) { 283 public void realRun() throws Exception { 284 pleaseReset.countDown(); 285 c.await(); 286 }}; 287 Thread t2 = new ThreadShouldThrow(BrokenBarrierException.class) { 288 public void realRun() throws Exception { 289 pleaseReset.countDown(); 290 c.await(); 291 }}; 292 293 t1.start(); 294 t2.start(); 295 await(pleaseReset); 296 297 awaitNumberWaiting(c, 2); 298 c.reset(); 299 awaitTermination(t1); 300 awaitTermination(t2); 301 } 302 303 /** 304 * A reset before threads enter barrier does not throw 305 * BrokenBarrierException 306 */ 307 public void testReset_NoBrokenBarrier() throws Exception { 308 final CyclicBarrier c = new CyclicBarrier(3); 309 c.reset(); 310 311 Thread t1 = newStartedThread(new CheckedRunnable() { 312 public void realRun() throws Exception { 313 c.await(); 314 }}); 315 Thread t2 = newStartedThread(new CheckedRunnable() { 316 public void realRun() throws Exception { 317 c.await(); 318 }}); 319 320 c.await(); 321 awaitTermination(t1); 322 awaitTermination(t2); 323 } 324 325 /** 326 * All threads block while a barrier is broken. 327 */ 328 public void testReset_Leakage() throws InterruptedException { 329 final CyclicBarrier c = new CyclicBarrier(2); 330 final AtomicBoolean done = new AtomicBoolean(); 331 Thread t = newStartedThread(new CheckedRunnable() { 332 public void realRun() { 333 while (!done.get()) { 334 try { 335 while (c.isBroken()) 336 c.reset(); 337 338 c.await(); 339 shouldThrow(); 340 } 341 catch (BrokenBarrierException | InterruptedException ok) {} 342 }}}); 343 344 for (int i = 0; i < 4; i++) { 345 delay(timeoutMillis()); 346 t.interrupt(); 347 } 348 done.set(true); 349 t.interrupt(); 350 awaitTermination(t); 351 } 352 353 /** 354 * Reset of a non-broken barrier does not break barrier 355 */ 356 public void testResetWithoutBreakage() throws Exception { 357 final CyclicBarrier barrier = new CyclicBarrier(3); 358 for (int i = 0; i < 3; i++) { 359 final CyclicBarrier start = new CyclicBarrier(3); 360 Thread t1 = newStartedThread(new CheckedRunnable() { 361 public void realRun() throws Exception { 362 start.await(); 363 barrier.await(); 364 }}); 365 366 Thread t2 = newStartedThread(new CheckedRunnable() { 367 public void realRun() throws Exception { 368 start.await(); 369 barrier.await(); 370 }}); 371 372 start.await(); 373 barrier.await(); 374 awaitTermination(t1); 375 awaitTermination(t2); 376 assertFalse(barrier.isBroken()); 377 assertEquals(0, barrier.getNumberWaiting()); 378 if (i == 1) barrier.reset(); 379 assertFalse(barrier.isBroken()); 380 assertEquals(0, barrier.getNumberWaiting()); 381 } 382 } 383 384 /** 385 * Reset of a barrier after interruption reinitializes it. 386 */ 387 public void testResetAfterInterrupt() throws Exception { 388 final CyclicBarrier barrier = new CyclicBarrier(3); 389 for (int i = 0; i < 2; i++) { 390 final CyclicBarrier start = new CyclicBarrier(3); 391 Thread t1 = new ThreadShouldThrow(InterruptedException.class) { 392 public void realRun() throws Exception { 393 start.await(); 394 barrier.await(); 395 }}; 396 397 Thread t2 = new ThreadShouldThrow(BrokenBarrierException.class) { 398 public void realRun() throws Exception { 399 start.await(); 400 barrier.await(); 401 }}; 402 403 t1.start(); 404 t2.start(); 405 start.await(); 406 t1.interrupt(); 407 awaitTermination(t1); 408 awaitTermination(t2); 409 assertTrue(barrier.isBroken()); 410 assertEquals(0, barrier.getNumberWaiting()); 411 barrier.reset(); 412 assertFalse(barrier.isBroken()); 413 assertEquals(0, barrier.getNumberWaiting()); 414 } 415 } 416 417 /** 418 * Reset of a barrier after timeout reinitializes it. 419 */ 420 public void testResetAfterTimeout() throws Exception { 421 final CyclicBarrier barrier = new CyclicBarrier(3); 422 for (int i = 0; i < 2; i++) { 423 assertEquals(0, barrier.getNumberWaiting()); 424 Thread t1 = newStartedThread(new CheckedRunnable() { 425 public void realRun() throws Exception { 426 try { 427 barrier.await(); 428 shouldThrow(); 429 } catch (BrokenBarrierException success) {} 430 }}); 431 Thread t2 = newStartedThread(new CheckedRunnable() { 432 public void realRun() throws Exception { 433 awaitNumberWaiting(barrier, 1); 434 long startTime = System.nanoTime(); 435 try { 436 barrier.await(timeoutMillis(), MILLISECONDS); 437 shouldThrow(); 438 } catch (TimeoutException success) {} 439 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 440 }}); 441 442 awaitTermination(t1); 443 awaitTermination(t2); 444 assertEquals(0, barrier.getNumberWaiting()); 445 assertTrue(barrier.isBroken()); 446 assertEquals(0, barrier.getNumberWaiting()); 447 barrier.reset(); 448 assertFalse(barrier.isBroken()); 449 assertEquals(0, barrier.getNumberWaiting()); 450 } 451 } 452 453 /** 454 * Reset of a barrier after a failed command reinitializes it. 455 */ 456 public void testResetAfterCommandException() throws Exception { 457 final CyclicBarrier barrier = 458 new CyclicBarrier(3, new Runnable() { 459 public void run() { 460 throw new NullPointerException(); }}); 461 for (int i = 0; i < 2; i++) { 462 final CyclicBarrier start = new CyclicBarrier(3); 463 Thread t1 = new ThreadShouldThrow(BrokenBarrierException.class) { 464 public void realRun() throws Exception { 465 start.await(); 466 barrier.await(); 467 }}; 468 469 Thread t2 = new ThreadShouldThrow(BrokenBarrierException.class) { 470 public void realRun() throws Exception { 471 start.await(); 472 barrier.await(); 473 }}; 474 475 t1.start(); 476 t2.start(); 477 start.await(); 478 awaitNumberWaiting(barrier, 2); 479 try { 480 barrier.await(); 481 shouldThrow(); 482 } catch (NullPointerException success) {} 483 awaitTermination(t1); 484 awaitTermination(t2); 485 assertTrue(barrier.isBroken()); 486 assertEquals(0, barrier.getNumberWaiting()); 487 barrier.reset(); 488 assertFalse(barrier.isBroken()); 489 assertEquals(0, barrier.getNumberWaiting()); 490 } 491 } 492 493 /** 494 * There can be more threads calling await() than parties, as long as each 495 * task only calls await once and the task count is a multiple of parties. 496 */ 497 public void testMoreTasksThanParties() throws Exception { 498 final ThreadLocalRandom rnd = ThreadLocalRandom.current(); 499 final int parties = rnd.nextInt(1, 5); 500 final int nTasks = rnd.nextInt(1, 5) * parties; 501 final AtomicInteger tripCount = new AtomicInteger(0); 502 final AtomicInteger awaitCount = new AtomicInteger(0); 503 final CyclicBarrier barrier = 504 new CyclicBarrier(parties, () -> tripCount.getAndIncrement()); 505 final ExecutorService e = Executors.newFixedThreadPool(nTasks); 506 final Runnable awaiter = () -> { 507 try { 508 if (ThreadLocalRandom.current().nextBoolean()) 509 barrier.await(); 510 else 511 barrier.await(LONG_DELAY_MS, MILLISECONDS); 512 awaitCount.getAndIncrement(); 513 } catch (Throwable fail) { threadUnexpectedException(fail); }}; 514 try (PoolCleaner cleaner = cleaner(e)) { 515 for (int i = nTasks; i--> 0; ) 516 e.execute(awaiter); 517 } 518 assertEquals(nTasks / parties, tripCount.get()); 519 assertEquals(nTasks, awaitCount.get()); 520 assertEquals(0, barrier.getNumberWaiting()); 521 } 522 }