1 /*
   2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   3  *
   4  * This code is free software; you can redistribute it and/or modify it
   5  * under the terms of the GNU General Public License version 2 only, as
   6  * published by the Free Software Foundation.
   7  *
   8  * This code is distributed in the hope that it will be useful, but WITHOUT
   9  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  10  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  11  * version 2 for more details (a copy is included in the LICENSE file that
  12  * accompanied this code).
  13  *
  14  * You should have received a copy of the GNU General Public License version
  15  * 2 along with this work; if not, write to the Free Software Foundation,
  16  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  17  *
  18  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  19  * or visit www.oracle.com if you need additional information or have any
  20  * questions.
  21  */
  22 
  23 /*
  24  * This file is available under and governed by the GNU General Public
  25  * License version 2 only, as published by the Free Software Foundation.
  26  * However, the following notice accompanied the original version of this
  27  * file:
  28  *
  29  * Written by Doug Lea with assistance from members of JCP JSR-166
  30  * Expert Group and released to the public domain, as explained at
  31  * http://creativecommons.org/publicdomain/zero/1.0/
  32  * Other contributors include Andrew Wright, Jeffrey Hayes,
  33  * Pat Fisher, Mike Judd.
  34  */
  35 
  36 import static java.util.concurrent.TimeUnit.MILLISECONDS;
  37 
  38 import java.util.concurrent.BrokenBarrierException;
  39 import java.util.concurrent.CountDownLatch;
  40 import java.util.concurrent.CyclicBarrier;
  41 import java.util.concurrent.ExecutorService;
  42 import java.util.concurrent.Executors;
  43 import java.util.concurrent.ThreadLocalRandom;
  44 import java.util.concurrent.TimeoutException;
  45 import java.util.concurrent.atomic.AtomicBoolean;
  46 import java.util.concurrent.atomic.AtomicInteger;
  47 
  48 import junit.framework.Test;
  49 import junit.framework.TestSuite;
  50 
  51 public class CyclicBarrierTest extends JSR166TestCase {
  52     public static void main(String[] args) {
  53         main(suite(), args);
  54     }
  55     public static Test suite() {
  56         return new TestSuite(CyclicBarrierTest.class);
  57     }
  58 
  59     /**
  60      * Spin-waits till the number of waiters == numberOfWaiters.
  61      */
  62     void awaitNumberWaiting(CyclicBarrier barrier, int numberOfWaiters) {
  63         long startTime = System.nanoTime();
  64         while (barrier.getNumberWaiting() != numberOfWaiters) {
  65             if (millisElapsedSince(startTime) > LONG_DELAY_MS)
  66                 fail("timed out");
  67             Thread.yield();
  68         }
  69     }
  70 
  71     /**
  72      * Creating with negative parties throws IllegalArgumentException
  73      */
  74     public void testConstructor1() {
  75         try {
  76             new CyclicBarrier(-1, (Runnable)null);
  77             shouldThrow();
  78         } catch (IllegalArgumentException success) {}
  79     }
  80 
  81     /**
  82      * Creating with negative parties and no action throws
  83      * IllegalArgumentException
  84      */
  85     public void testConstructor2() {
  86         try {
  87             new CyclicBarrier(-1);
  88             shouldThrow();
  89         } catch (IllegalArgumentException success) {}
  90     }
  91 
  92     /**
  93      * getParties returns the number of parties given in constructor
  94      */
  95     public void testGetParties() {
  96         CyclicBarrier b = new CyclicBarrier(2);
  97         assertEquals(2, b.getParties());
  98         assertEquals(0, b.getNumberWaiting());
  99     }
 100 
 101     /**
 102      * A 1-party barrier triggers after single await
 103      */
 104     public void testSingleParty() throws Exception {
 105         CyclicBarrier b = new CyclicBarrier(1);
 106         assertEquals(1, b.getParties());
 107         assertEquals(0, b.getNumberWaiting());
 108         b.await();
 109         b.await();
 110         assertEquals(0, b.getNumberWaiting());
 111     }
 112 
 113     /**
 114      * The supplied barrier action is run at barrier
 115      */
 116     public void testBarrierAction() throws Exception {
 117         final AtomicInteger count = new AtomicInteger(0);
 118         final Runnable incCount = new Runnable() { public void run() {
 119             count.getAndIncrement(); }};
 120         CyclicBarrier b = new CyclicBarrier(1, incCount);
 121         assertEquals(1, b.getParties());
 122         assertEquals(0, b.getNumberWaiting());
 123         b.await();
 124         b.await();
 125         assertEquals(0, b.getNumberWaiting());
 126         assertEquals(2, count.get());
 127     }
 128 
 129     /**
 130      * A 2-party/thread barrier triggers after both threads invoke await
 131      */
 132     public void testTwoParties() throws Exception {
 133         final CyclicBarrier b = new CyclicBarrier(2);
 134         Thread t = newStartedThread(new CheckedRunnable() {
 135             public void realRun() throws Exception {
 136                 b.await();
 137                 b.await();
 138                 b.await();
 139                 b.await();
 140             }});
 141 
 142         b.await();
 143         b.await();
 144         b.await();
 145         b.await();
 146         awaitTermination(t);
 147     }
 148 
 149     /**
 150      * An interruption in one party causes others waiting in await to
 151      * throw BrokenBarrierException
 152      */
 153     public void testAwait1_Interrupted_BrokenBarrier() {
 154         final CyclicBarrier c = new CyclicBarrier(3);
 155         final CountDownLatch pleaseInterrupt = new CountDownLatch(2);
 156         Thread t1 = new ThreadShouldThrow(InterruptedException.class) {
 157             public void realRun() throws Exception {
 158                 pleaseInterrupt.countDown();
 159                 c.await();
 160             }};
 161         Thread t2 = new ThreadShouldThrow(BrokenBarrierException.class) {
 162             public void realRun() throws Exception {
 163                 pleaseInterrupt.countDown();
 164                 c.await();
 165             }};
 166 
 167         t1.start();
 168         t2.start();
 169         await(pleaseInterrupt);
 170         t1.interrupt();
 171         awaitTermination(t1);
 172         awaitTermination(t2);
 173     }
 174 
 175     /**
 176      * An interruption in one party causes others waiting in timed await to
 177      * throw BrokenBarrierException
 178      */
 179     public void testAwait2_Interrupted_BrokenBarrier() throws Exception {
 180         final CyclicBarrier c = new CyclicBarrier(3);
 181         final CountDownLatch pleaseInterrupt = new CountDownLatch(2);
 182         Thread t1 = new ThreadShouldThrow(InterruptedException.class) {
 183             public void realRun() throws Exception {
 184                 pleaseInterrupt.countDown();
 185                 c.await(LONG_DELAY_MS, MILLISECONDS);
 186             }};
 187         Thread t2 = new ThreadShouldThrow(BrokenBarrierException.class) {
 188             public void realRun() throws Exception {
 189                 pleaseInterrupt.countDown();
 190                 c.await(LONG_DELAY_MS, MILLISECONDS);
 191             }};
 192 
 193         t1.start();
 194         t2.start();
 195         await(pleaseInterrupt);
 196         t1.interrupt();
 197         awaitTermination(t1);
 198         awaitTermination(t2);
 199     }
 200 
 201     /**
 202      * A timeout in timed await throws TimeoutException
 203      */
 204     public void testAwait3_TimeoutException() throws InterruptedException {
 205         final CyclicBarrier c = new CyclicBarrier(2);
 206         Thread t = newStartedThread(new CheckedRunnable() {
 207             public void realRun() throws Exception {
 208                 long startTime = System.nanoTime();
 209                 try {
 210                     c.await(timeoutMillis(), MILLISECONDS);
 211                     shouldThrow();
 212                 } catch (TimeoutException success) {}
 213                 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
 214             }});
 215 
 216         awaitTermination(t);
 217     }
 218 
 219     /**
 220      * A timeout in one party causes others waiting in timed await to
 221      * throw BrokenBarrierException
 222      */
 223     public void testAwait4_Timeout_BrokenBarrier() throws InterruptedException {
 224         final CyclicBarrier c = new CyclicBarrier(3);
 225         Thread t1 = newStartedThread(new CheckedRunnable() {
 226             public void realRun() throws Exception {
 227                 try {
 228                     c.await(LONG_DELAY_MS, MILLISECONDS);
 229                     shouldThrow();
 230                 } catch (BrokenBarrierException success) {}
 231             }});
 232         Thread t2 = newStartedThread(new CheckedRunnable() {
 233             public void realRun() throws Exception {
 234                 awaitNumberWaiting(c, 1);
 235                 long startTime = System.nanoTime();
 236                 try {
 237                     c.await(timeoutMillis(), MILLISECONDS);
 238                     shouldThrow();
 239                 } catch (TimeoutException success) {}
 240                 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
 241             }});
 242 
 243         awaitTermination(t1);
 244         awaitTermination(t2);
 245     }
 246 
 247     /**
 248      * A timeout in one party causes others waiting in await to
 249      * throw BrokenBarrierException
 250      */
 251     public void testAwait5_Timeout_BrokenBarrier() throws InterruptedException {
 252         final CyclicBarrier c = new CyclicBarrier(3);
 253         Thread t1 = newStartedThread(new CheckedRunnable() {
 254             public void realRun() throws Exception {
 255                 try {
 256                     c.await();
 257                     shouldThrow();
 258                 } catch (BrokenBarrierException success) {}
 259             }});
 260         Thread t2 = newStartedThread(new CheckedRunnable() {
 261             public void realRun() throws Exception {
 262                 awaitNumberWaiting(c, 1);
 263                 long startTime = System.nanoTime();
 264                 try {
 265                     c.await(timeoutMillis(), MILLISECONDS);
 266                     shouldThrow();
 267                 } catch (TimeoutException success) {}
 268                 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
 269             }});
 270 
 271         awaitTermination(t1);
 272         awaitTermination(t2);
 273     }
 274 
 275     /**
 276      * A reset of an active barrier causes waiting threads to throw
 277      * BrokenBarrierException
 278      */
 279     public void testReset_BrokenBarrier() throws InterruptedException {
 280         final CyclicBarrier c = new CyclicBarrier(3);
 281         final CountDownLatch pleaseReset = new CountDownLatch(2);
 282         Thread t1 = new ThreadShouldThrow(BrokenBarrierException.class) {
 283             public void realRun() throws Exception {
 284                 pleaseReset.countDown();
 285                 c.await();
 286             }};
 287         Thread t2 = new ThreadShouldThrow(BrokenBarrierException.class) {
 288             public void realRun() throws Exception {
 289                 pleaseReset.countDown();
 290                 c.await();
 291             }};
 292 
 293         t1.start();
 294         t2.start();
 295         await(pleaseReset);
 296 
 297         awaitNumberWaiting(c, 2);
 298         c.reset();
 299         awaitTermination(t1);
 300         awaitTermination(t2);
 301     }
 302 
 303     /**
 304      * A reset before threads enter barrier does not throw
 305      * BrokenBarrierException
 306      */
 307     public void testReset_NoBrokenBarrier() throws Exception {
 308         final CyclicBarrier c = new CyclicBarrier(3);
 309         c.reset();
 310 
 311         Thread t1 = newStartedThread(new CheckedRunnable() {
 312             public void realRun() throws Exception {
 313                 c.await();
 314             }});
 315         Thread t2 = newStartedThread(new CheckedRunnable() {
 316             public void realRun() throws Exception {
 317                 c.await();
 318             }});
 319 
 320         c.await();
 321         awaitTermination(t1);
 322         awaitTermination(t2);
 323     }
 324 
 325     /**
 326      * All threads block while a barrier is broken.
 327      */
 328     public void testReset_Leakage() throws InterruptedException {
 329         final CyclicBarrier c = new CyclicBarrier(2);
 330         final AtomicBoolean done = new AtomicBoolean();
 331         Thread t = newStartedThread(new CheckedRunnable() {
 332             public void realRun() {
 333                 while (!done.get()) {
 334                     try {
 335                         while (c.isBroken())
 336                             c.reset();
 337 
 338                         c.await();
 339                         shouldThrow();
 340                     }
 341                     catch (BrokenBarrierException | InterruptedException ok) {}
 342                 }}});
 343 
 344         for (int i = 0; i < 4; i++) {
 345             delay(timeoutMillis());
 346             t.interrupt();
 347         }
 348         done.set(true);
 349         t.interrupt();
 350         awaitTermination(t);
 351     }
 352 
 353     /**
 354      * Reset of a non-broken barrier does not break barrier
 355      */
 356     public void testResetWithoutBreakage() throws Exception {
 357         final CyclicBarrier barrier = new CyclicBarrier(3);
 358         for (int i = 0; i < 3; i++) {
 359             final CyclicBarrier start = new CyclicBarrier(3);
 360             Thread t1 = newStartedThread(new CheckedRunnable() {
 361                 public void realRun() throws Exception {
 362                     start.await();
 363                     barrier.await();
 364                 }});
 365 
 366             Thread t2 = newStartedThread(new CheckedRunnable() {
 367                 public void realRun() throws Exception {
 368                     start.await();
 369                     barrier.await();
 370                 }});
 371 
 372             start.await();
 373             barrier.await();
 374             awaitTermination(t1);
 375             awaitTermination(t2);
 376             assertFalse(barrier.isBroken());
 377             assertEquals(0, barrier.getNumberWaiting());
 378             if (i == 1) barrier.reset();
 379             assertFalse(barrier.isBroken());
 380             assertEquals(0, barrier.getNumberWaiting());
 381         }
 382     }
 383 
 384     /**
 385      * Reset of a barrier after interruption reinitializes it.
 386      */
 387     public void testResetAfterInterrupt() throws Exception {
 388         final CyclicBarrier barrier = new CyclicBarrier(3);
 389         for (int i = 0; i < 2; i++) {
 390             final CyclicBarrier start = new CyclicBarrier(3);
 391             Thread t1 = new ThreadShouldThrow(InterruptedException.class) {
 392                 public void realRun() throws Exception {
 393                     start.await();
 394                     barrier.await();
 395                 }};
 396 
 397             Thread t2 = new ThreadShouldThrow(BrokenBarrierException.class) {
 398                 public void realRun() throws Exception {
 399                     start.await();
 400                     barrier.await();
 401                 }};
 402 
 403             t1.start();
 404             t2.start();
 405             start.await();
 406             t1.interrupt();
 407             awaitTermination(t1);
 408             awaitTermination(t2);
 409             assertTrue(barrier.isBroken());
 410             assertEquals(0, barrier.getNumberWaiting());
 411             barrier.reset();
 412             assertFalse(barrier.isBroken());
 413             assertEquals(0, barrier.getNumberWaiting());
 414         }
 415     }
 416 
 417     /**
 418      * Reset of a barrier after timeout reinitializes it.
 419      */
 420     public void testResetAfterTimeout() throws Exception {
 421         final CyclicBarrier barrier = new CyclicBarrier(3);
 422         for (int i = 0; i < 2; i++) {
 423             assertEquals(0, barrier.getNumberWaiting());
 424             Thread t1 = newStartedThread(new CheckedRunnable() {
 425                 public void realRun() throws Exception {
 426                     try {
 427                         barrier.await();
 428                         shouldThrow();
 429                     } catch (BrokenBarrierException success) {}
 430                 }});
 431             Thread t2 = newStartedThread(new CheckedRunnable() {
 432                 public void realRun() throws Exception {
 433                     awaitNumberWaiting(barrier, 1);
 434                     long startTime = System.nanoTime();
 435                     try {
 436                         barrier.await(timeoutMillis(), MILLISECONDS);
 437                         shouldThrow();
 438                     } catch (TimeoutException success) {}
 439                     assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
 440                 }});
 441 
 442             awaitTermination(t1);
 443             awaitTermination(t2);
 444             assertEquals(0, barrier.getNumberWaiting());
 445             assertTrue(barrier.isBroken());
 446             assertEquals(0, barrier.getNumberWaiting());
 447             barrier.reset();
 448             assertFalse(barrier.isBroken());
 449             assertEquals(0, barrier.getNumberWaiting());
 450         }
 451     }
 452 
 453     /**
 454      * Reset of a barrier after a failed command reinitializes it.
 455      */
 456     public void testResetAfterCommandException() throws Exception {
 457         final CyclicBarrier barrier =
 458             new CyclicBarrier(3, new Runnable() {
 459                     public void run() {
 460                         throw new NullPointerException(); }});
 461         for (int i = 0; i < 2; i++) {
 462             final CyclicBarrier start = new CyclicBarrier(3);
 463             Thread t1 = new ThreadShouldThrow(BrokenBarrierException.class) {
 464                 public void realRun() throws Exception {
 465                     start.await();
 466                     barrier.await();
 467                 }};
 468 
 469             Thread t2 = new ThreadShouldThrow(BrokenBarrierException.class) {
 470                 public void realRun() throws Exception {
 471                     start.await();
 472                     barrier.await();
 473                 }};
 474 
 475             t1.start();
 476             t2.start();
 477             start.await();
 478             awaitNumberWaiting(barrier, 2);
 479             try {
 480                 barrier.await();
 481                 shouldThrow();
 482             } catch (NullPointerException success) {}
 483             awaitTermination(t1);
 484             awaitTermination(t2);
 485             assertTrue(barrier.isBroken());
 486             assertEquals(0, barrier.getNumberWaiting());
 487             barrier.reset();
 488             assertFalse(barrier.isBroken());
 489             assertEquals(0, barrier.getNumberWaiting());
 490         }
 491     }
 492 
 493     /**
 494      * There can be more threads calling await() than parties, as long as each
 495      * task only calls await once and the task count is a multiple of parties.
 496      */
 497     public void testMoreTasksThanParties() throws Exception {
 498         final ThreadLocalRandom rnd = ThreadLocalRandom.current();
 499         final int parties = rnd.nextInt(1, 5);
 500         final int nTasks = rnd.nextInt(1, 5) * parties;
 501         final AtomicInteger tripCount = new AtomicInteger(0);
 502         final AtomicInteger awaitCount = new AtomicInteger(0);
 503         final CyclicBarrier barrier =
 504             new CyclicBarrier(parties, () -> tripCount.getAndIncrement());
 505         final ExecutorService e = Executors.newFixedThreadPool(nTasks);
 506         final Runnable awaiter = () -> {
 507             try {
 508                 if (ThreadLocalRandom.current().nextBoolean())
 509                     barrier.await();
 510                 else
 511                     barrier.await(LONG_DELAY_MS, MILLISECONDS);
 512                 awaitCount.getAndIncrement();
 513             } catch (Throwable fail) { threadUnexpectedException(fail); }};
 514         try (PoolCleaner cleaner = cleaner(e)) {
 515             for (int i = nTasks; i--> 0; )
 516                 e.execute(awaiter);
 517         }
 518         assertEquals(nTasks / parties, tripCount.get());
 519         assertEquals(nTasks, awaitCount.get());
 520         assertEquals(0, barrier.getNumberWaiting());
 521     }
 522 }