1 /*
   2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   3  *
   4  * This code is free software; you can redistribute it and/or modify it
   5  * under the terms of the GNU General Public License version 2 only, as
   6  * published by the Free Software Foundation.
   7  *
   8  * This code is distributed in the hope that it will be useful, but WITHOUT
   9  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  10  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  11  * version 2 for more details (a copy is included in the LICENSE file that
  12  * accompanied this code).
  13  *
  14  * You should have received a copy of the GNU General Public License version
  15  * 2 along with this work; if not, write to the Free Software Foundation,
  16  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  17  *
  18  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  19  * or visit www.oracle.com if you need additional information or have any
  20  * questions.
  21  */
  22 
  23 /*
  24  * This file is available under and governed by the GNU General Public
  25  * License version 2 only, as published by the Free Software Foundation.
  26  * However, the following notice accompanied the original version of this
  27  * file:
  28  *
  29  * Written by Doug Lea with assistance from members of JCP JSR-166
  30  * Expert Group and released to the public domain, as explained at
  31  * http://creativecommons.org/publicdomain/zero/1.0/
  32  * Other contributors include Andrew Wright, Jeffrey Hayes,
  33  * Pat Fisher, Mike Judd.
  34  */
  35 
  36 import static java.util.concurrent.TimeUnit.MILLISECONDS;
  37 
  38 import java.util.concurrent.BrokenBarrierException;
  39 import java.util.concurrent.CountDownLatch;
  40 import java.util.concurrent.CyclicBarrier;
  41 import java.util.concurrent.ExecutorService;
  42 import java.util.concurrent.Executors;
  43 import java.util.concurrent.ThreadLocalRandom;
  44 import java.util.concurrent.TimeoutException;
  45 import java.util.concurrent.atomic.AtomicInteger;
  46 
  47 import junit.framework.Test;
  48 import junit.framework.TestSuite;
  49 
  50 public class CyclicBarrierTest extends JSR166TestCase {
  51     public static void main(String[] args) {
  52         main(suite(), args);
  53     }
  54     public static Test suite() {
  55         return new TestSuite(CyclicBarrierTest.class);
  56     }
  57 
  58     /**
  59      * Spin-waits till the number of waiters == numberOfWaiters.
  60      */
  61     void awaitNumberWaiting(CyclicBarrier barrier, int numberOfWaiters) {
  62         long startTime = System.nanoTime();
  63         while (barrier.getNumberWaiting() != numberOfWaiters) {
  64             if (millisElapsedSince(startTime) > LONG_DELAY_MS)
  65                 fail("timed out");
  66             Thread.yield();
  67         }
  68     }
  69 
  70     /**
  71      * Creating with negative parties throws IllegalArgumentException
  72      */
  73     public void testConstructor1() {
  74         try {
  75             new CyclicBarrier(-1, (Runnable)null);
  76             shouldThrow();
  77         } catch (IllegalArgumentException success) {}
  78     }
  79 
  80     /**
  81      * Creating with negative parties and no action throws
  82      * IllegalArgumentException
  83      */
  84     public void testConstructor2() {
  85         try {
  86             new CyclicBarrier(-1);
  87             shouldThrow();
  88         } catch (IllegalArgumentException success) {}
  89     }
  90 
  91     /**
  92      * getParties returns the number of parties given in constructor
  93      */
  94     public void testGetParties() {
  95         CyclicBarrier b = new CyclicBarrier(2);
  96         assertEquals(2, b.getParties());
  97         assertEquals(0, b.getNumberWaiting());
  98     }
  99 
 100     /**
 101      * A 1-party barrier triggers after single await
 102      */
 103     public void testSingleParty() throws Exception {
 104         CyclicBarrier b = new CyclicBarrier(1);
 105         assertEquals(1, b.getParties());
 106         assertEquals(0, b.getNumberWaiting());
 107         b.await();
 108         b.await();
 109         assertEquals(0, b.getNumberWaiting());
 110     }
 111 
 112     /**
 113      * The supplied barrier action is run at barrier
 114      */
 115     public void testBarrierAction() throws Exception {
 116         final AtomicInteger count = new AtomicInteger(0);
 117         final Runnable incCount = new Runnable() { public void run() {
 118             count.getAndIncrement(); }};
 119         CyclicBarrier b = new CyclicBarrier(1, incCount);
 120         assertEquals(1, b.getParties());
 121         assertEquals(0, b.getNumberWaiting());
 122         b.await();
 123         b.await();
 124         assertEquals(0, b.getNumberWaiting());
 125         assertEquals(2, count.get());
 126     }
 127 
 128     /**
 129      * A 2-party/thread barrier triggers after both threads invoke await
 130      */
 131     public void testTwoParties() throws Exception {
 132         final CyclicBarrier b = new CyclicBarrier(2);
 133         Thread t = newStartedThread(new CheckedRunnable() {
 134             public void realRun() throws Exception {
 135                 b.await();
 136                 b.await();
 137                 b.await();
 138                 b.await();
 139             }});
 140 
 141         b.await();
 142         b.await();
 143         b.await();
 144         b.await();
 145         awaitTermination(t);
 146     }
 147 
 148     /**
 149      * An interruption in one party causes others waiting in await to
 150      * throw BrokenBarrierException
 151      */
 152     public void testAwait1_Interrupted_BrokenBarrier() {
 153         final CyclicBarrier c = new CyclicBarrier(3);
 154         final CountDownLatch pleaseInterrupt = new CountDownLatch(2);
 155         Thread t1 = new ThreadShouldThrow(InterruptedException.class) {
 156             public void realRun() throws Exception {
 157                 pleaseInterrupt.countDown();
 158                 c.await();
 159             }};
 160         Thread t2 = new ThreadShouldThrow(BrokenBarrierException.class) {
 161             public void realRun() throws Exception {
 162                 pleaseInterrupt.countDown();
 163                 c.await();
 164             }};
 165 
 166         t1.start();
 167         t2.start();
 168         await(pleaseInterrupt);
 169         t1.interrupt();
 170         awaitTermination(t1);
 171         awaitTermination(t2);
 172     }
 173 
 174     /**
 175      * An interruption in one party causes others waiting in timed await to
 176      * throw BrokenBarrierException
 177      */
 178     public void testAwait2_Interrupted_BrokenBarrier() throws Exception {
 179         final CyclicBarrier c = new CyclicBarrier(3);
 180         final CountDownLatch pleaseInterrupt = new CountDownLatch(2);
 181         Thread t1 = new ThreadShouldThrow(InterruptedException.class) {
 182             public void realRun() throws Exception {
 183                 pleaseInterrupt.countDown();
 184                 c.await(LONG_DELAY_MS, MILLISECONDS);
 185             }};
 186         Thread t2 = new ThreadShouldThrow(BrokenBarrierException.class) {
 187             public void realRun() throws Exception {
 188                 pleaseInterrupt.countDown();
 189                 c.await(LONG_DELAY_MS, MILLISECONDS);
 190             }};
 191 
 192         t1.start();
 193         t2.start();
 194         await(pleaseInterrupt);
 195         t1.interrupt();
 196         awaitTermination(t1);
 197         awaitTermination(t2);
 198     }
 199 
 200     /**
 201      * A timeout in timed await throws TimeoutException
 202      */
 203     public void testAwait3_TimeoutException() throws InterruptedException {
 204         final CyclicBarrier c = new CyclicBarrier(2);
 205         Thread t = newStartedThread(new CheckedRunnable() {
 206             public void realRun() throws Exception {
 207                 long startTime = System.nanoTime();
 208                 try {
 209                     c.await(timeoutMillis(), MILLISECONDS);
 210                     shouldThrow();
 211                 } catch (TimeoutException success) {}
 212                 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
 213             }});
 214 
 215         awaitTermination(t);
 216     }
 217 
 218     /**
 219      * A timeout in one party causes others waiting in timed await to
 220      * throw BrokenBarrierException
 221      */
 222     public void testAwait4_Timeout_BrokenBarrier() throws InterruptedException {
 223         final CyclicBarrier c = new CyclicBarrier(3);
 224         Thread t1 = newStartedThread(new CheckedRunnable() {
 225             public void realRun() throws Exception {
 226                 try {
 227                     c.await(LONG_DELAY_MS, MILLISECONDS);
 228                     shouldThrow();
 229                 } catch (BrokenBarrierException success) {}
 230             }});
 231         Thread t2 = newStartedThread(new CheckedRunnable() {
 232             public void realRun() throws Exception {
 233                 awaitNumberWaiting(c, 1);
 234                 long startTime = System.nanoTime();
 235                 try {
 236                     c.await(timeoutMillis(), MILLISECONDS);
 237                     shouldThrow();
 238                 } catch (TimeoutException success) {}
 239                 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
 240             }});
 241 
 242         awaitTermination(t1);
 243         awaitTermination(t2);
 244     }
 245 
 246     /**
 247      * A timeout in one party causes others waiting in await to
 248      * throw BrokenBarrierException
 249      */
 250     public void testAwait5_Timeout_BrokenBarrier() throws InterruptedException {
 251         final CyclicBarrier c = new CyclicBarrier(3);
 252         Thread t1 = newStartedThread(new CheckedRunnable() {
 253             public void realRun() throws Exception {
 254                 try {
 255                     c.await();
 256                     shouldThrow();
 257                 } catch (BrokenBarrierException success) {}
 258             }});
 259         Thread t2 = newStartedThread(new CheckedRunnable() {
 260             public void realRun() throws Exception {
 261                 awaitNumberWaiting(c, 1);
 262                 long startTime = System.nanoTime();
 263                 try {
 264                     c.await(timeoutMillis(), MILLISECONDS);
 265                     shouldThrow();
 266                 } catch (TimeoutException success) {}
 267                 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
 268             }});
 269 
 270         awaitTermination(t1);
 271         awaitTermination(t2);
 272     }
 273 
 274     /**
 275      * A reset of an active barrier causes waiting threads to throw
 276      * BrokenBarrierException
 277      */
 278     public void testReset_BrokenBarrier() throws InterruptedException {
 279         final CyclicBarrier c = new CyclicBarrier(3);
 280         final CountDownLatch pleaseReset = new CountDownLatch(2);
 281         Thread t1 = new ThreadShouldThrow(BrokenBarrierException.class) {
 282             public void realRun() throws Exception {
 283                 pleaseReset.countDown();
 284                 c.await();
 285             }};
 286         Thread t2 = new ThreadShouldThrow(BrokenBarrierException.class) {
 287             public void realRun() throws Exception {
 288                 pleaseReset.countDown();
 289                 c.await();
 290             }};
 291 
 292         t1.start();
 293         t2.start();
 294         await(pleaseReset);
 295 
 296         awaitNumberWaiting(c, 2);
 297         c.reset();
 298         awaitTermination(t1);
 299         awaitTermination(t2);
 300     }
 301 
 302     /**
 303      * A reset before threads enter barrier does not throw
 304      * BrokenBarrierException
 305      */
 306     public void testReset_NoBrokenBarrier() throws Exception {
 307         final CyclicBarrier c = new CyclicBarrier(3);
 308         c.reset();
 309 
 310         Thread t1 = newStartedThread(new CheckedRunnable() {
 311             public void realRun() throws Exception {
 312                 c.await();
 313             }});
 314         Thread t2 = newStartedThread(new CheckedRunnable() {
 315             public void realRun() throws Exception {
 316                 c.await();
 317             }});
 318 
 319         c.await();
 320         awaitTermination(t1);
 321         awaitTermination(t2);
 322     }
 323 
 324     /**
 325      * Reset of a non-broken barrier does not break barrier
 326      */
 327     public void testResetWithoutBreakage() throws Exception {
 328         final CyclicBarrier barrier = new CyclicBarrier(3);
 329         for (int i = 0; i < 3; i++) {
 330             final CyclicBarrier start = new CyclicBarrier(3);
 331             Thread t1 = newStartedThread(new CheckedRunnable() {
 332                 public void realRun() throws Exception {
 333                     start.await();
 334                     barrier.await();
 335                 }});
 336 
 337             Thread t2 = newStartedThread(new CheckedRunnable() {
 338                 public void realRun() throws Exception {
 339                     start.await();
 340                     barrier.await();
 341                 }});
 342 
 343             start.await();
 344             barrier.await();
 345             awaitTermination(t1);
 346             awaitTermination(t2);
 347             assertFalse(barrier.isBroken());
 348             assertEquals(0, barrier.getNumberWaiting());
 349             if (i == 1) barrier.reset();
 350             assertFalse(barrier.isBroken());
 351             assertEquals(0, barrier.getNumberWaiting());
 352         }
 353     }
 354 
 355     /**
 356      * Reset of a barrier after interruption reinitializes it.
 357      */
 358     public void testResetAfterInterrupt() throws Exception {
 359         final CyclicBarrier barrier = new CyclicBarrier(3);
 360         for (int i = 0; i < 2; i++) {
 361             final CyclicBarrier start = new CyclicBarrier(3);
 362             Thread t1 = new ThreadShouldThrow(InterruptedException.class) {
 363                 public void realRun() throws Exception {
 364                     start.await();
 365                     barrier.await();
 366                 }};
 367 
 368             Thread t2 = new ThreadShouldThrow(BrokenBarrierException.class) {
 369                 public void realRun() throws Exception {
 370                     start.await();
 371                     barrier.await();
 372                 }};
 373 
 374             t1.start();
 375             t2.start();
 376             start.await();
 377             t1.interrupt();
 378             awaitTermination(t1);
 379             awaitTermination(t2);
 380             assertTrue(barrier.isBroken());
 381             assertEquals(0, barrier.getNumberWaiting());
 382             barrier.reset();
 383             assertFalse(barrier.isBroken());
 384             assertEquals(0, barrier.getNumberWaiting());
 385         }
 386     }
 387 
 388     /**
 389      * Reset of a barrier after timeout reinitializes it.
 390      */
 391     public void testResetAfterTimeout() throws Exception {
 392         final CyclicBarrier barrier = new CyclicBarrier(3);
 393         for (int i = 0; i < 2; i++) {
 394             assertEquals(0, barrier.getNumberWaiting());
 395             Thread t1 = newStartedThread(new CheckedRunnable() {
 396                 public void realRun() throws Exception {
 397                     try {
 398                         barrier.await();
 399                         shouldThrow();
 400                     } catch (BrokenBarrierException success) {}
 401                 }});
 402             Thread t2 = newStartedThread(new CheckedRunnable() {
 403                 public void realRun() throws Exception {
 404                     awaitNumberWaiting(barrier, 1);
 405                     long startTime = System.nanoTime();
 406                     try {
 407                         barrier.await(timeoutMillis(), MILLISECONDS);
 408                         shouldThrow();
 409                     } catch (TimeoutException success) {}
 410                     assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
 411                 }});
 412 
 413             awaitTermination(t1);
 414             awaitTermination(t2);
 415             assertEquals(0, barrier.getNumberWaiting());
 416             assertTrue(barrier.isBroken());
 417             assertEquals(0, barrier.getNumberWaiting());
 418             barrier.reset();
 419             assertFalse(barrier.isBroken());
 420             assertEquals(0, barrier.getNumberWaiting());
 421         }
 422     }
 423 
 424     /**
 425      * Reset of a barrier after a failed command reinitializes it.
 426      */
 427     public void testResetAfterCommandException() throws Exception {
 428         final CyclicBarrier barrier =
 429             new CyclicBarrier(3, new Runnable() {
 430                     public void run() {
 431                         throw new NullPointerException(); }});
 432         for (int i = 0; i < 2; i++) {
 433             final CyclicBarrier start = new CyclicBarrier(3);
 434             Thread t1 = new ThreadShouldThrow(BrokenBarrierException.class) {
 435                 public void realRun() throws Exception {
 436                     start.await();
 437                     barrier.await();
 438                 }};
 439 
 440             Thread t2 = new ThreadShouldThrow(BrokenBarrierException.class) {
 441                 public void realRun() throws Exception {
 442                     start.await();
 443                     barrier.await();
 444                 }};
 445 
 446             t1.start();
 447             t2.start();
 448             start.await();
 449             awaitNumberWaiting(barrier, 2);
 450             try {
 451                 barrier.await();
 452                 shouldThrow();
 453             } catch (NullPointerException success) {}
 454             awaitTermination(t1);
 455             awaitTermination(t2);
 456             assertTrue(barrier.isBroken());
 457             assertEquals(0, barrier.getNumberWaiting());
 458             barrier.reset();
 459             assertFalse(barrier.isBroken());
 460             assertEquals(0, barrier.getNumberWaiting());
 461         }
 462     }
 463 
 464     /**
 465      * There can be more threads calling await() than parties, as long as each
 466      * task only calls await once and the task count is a multiple of parties.
 467      */
 468     public void testMoreTasksThanParties() throws Exception {
 469         final ThreadLocalRandom rnd = ThreadLocalRandom.current();
 470         final int parties = rnd.nextInt(1, 5);
 471         final int nTasks = rnd.nextInt(1, 5) * parties;
 472         final AtomicInteger tripCount = new AtomicInteger(0);
 473         final AtomicInteger awaitCount = new AtomicInteger(0);
 474         final CyclicBarrier barrier =
 475             new CyclicBarrier(parties, () -> tripCount.getAndIncrement());
 476         final ExecutorService e = Executors.newFixedThreadPool(nTasks);
 477         final Runnable awaiter = () -> {
 478             try {
 479                 if (randomBoolean())
 480                     barrier.await();
 481                 else
 482                     barrier.await(LONG_DELAY_MS, MILLISECONDS);
 483                 awaitCount.getAndIncrement();
 484             } catch (Throwable fail) { threadUnexpectedException(fail); }};
 485         try (PoolCleaner cleaner = cleaner(e)) {
 486             for (int i = nTasks; i--> 0; )
 487                 e.execute(awaiter);
 488         }
 489         assertEquals(nTasks / parties, tripCount.get());
 490         assertEquals(nTasks, awaitCount.get());
 491         assertEquals(0, barrier.getNumberWaiting());
 492     }
 493 }