1 /*
   2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   3  *
   4  * This code is free software; you can redistribute it and/or modify it
   5  * under the terms of the GNU General Public License version 2 only, as
   6  * published by the Free Software Foundation.  Oracle designates this
   7  * particular file as subject to the "Classpath" exception as provided
   8  * by Oracle in the LICENSE file that accompanied this code.
   9  *
  10  * This code is distributed in the hope that it will be useful, but WITHOUT
  11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  13  * version 2 for more details (a copy is included in the LICENSE file that
  14  * accompanied this code).
  15  *
  16  * You should have received a copy of the GNU General Public License version
  17  * 2 along with this work; if not, write to the Free Software Foundation,
  18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  19  *
  20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  21  * or visit www.oracle.com if you need additional information or have any
  22  * questions.
  23  */
  24 
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/publicdomain/zero/1.0/
  34  */
  35 
  36 package java.util.concurrent;
  37 
  38 import java.util.concurrent.locks.Condition;
  39 import java.util.concurrent.locks.ReentrantLock;
  40 
  41 /**
  42  * A synchronization aid that allows a set of threads to all wait for
  43  * each other to reach a common barrier point.  CyclicBarriers are
  44  * useful in programs involving a fixed sized party of threads that
  45  * must occasionally wait for each other. The barrier is called
  46  * <em>cyclic</em> because it can be re-used after the waiting threads
  47  * are released.
  48  *
  49  * <p>A {@code CyclicBarrier} supports an optional {@link Runnable} command
  50  * that is run once per barrier point, after the last thread in the party
  51  * arrives, but before any threads are released.
  52  * This <em>barrier action</em> is useful
  53  * for updating shared-state before any of the parties continue.
  54  *
  55  * <p><b>Sample usage:</b> Here is an example of using a barrier in a
  56  * parallel decomposition design:
  57  *
  58  * <pre> {@code
  59  * class Solver {
  60  *   final int N;
  61  *   final float[][] data;
  62  *   final CyclicBarrier barrier;
  63  *
  64  *   class Worker implements Runnable {
  65  *     int myRow;
  66  *     Worker(int row) { myRow = row; }
  67  *     public void run() {
  68  *       while (!done()) {
  69  *         processRow(myRow);
  70  *
  71  *         try {
  72  *           barrier.await();
  73  *         } catch (InterruptedException ex) {
  74  *           return;
  75  *         } catch (BrokenBarrierException ex) {
  76  *           return;
  77  *         }
  78  *       }
  79  *     }
  80  *   }
  81  *
  82  *   public Solver(float[][] matrix) {
  83  *     data = matrix;
  84  *     N = matrix.length;
  85  *     Runnable barrierAction = () -> mergeRows(...);
  86  *     barrier = new CyclicBarrier(N, barrierAction);
  87  *
  88  *     List<Thread> threads = new ArrayList<>(N);
  89  *     for (int i = 0; i < N; i++) {
  90  *       Thread thread = new Thread(new Worker(i));
  91  *       threads.add(thread);
  92  *       thread.start();
  93  *     }
  94  *
  95  *     // wait until done
  96  *     for (Thread thread : threads)
  97  *       thread.join();
  98  *   }
  99  * }}</pre>
 100  *
 101  * Here, each worker thread processes a row of the matrix, then waits at the
 102  * barrier until all rows have been processed. When all rows are processed the
 103  * supplied {@link Runnable} barrier action is executed and merges the rows.
 104  * If the merger determines that a solution has been found then {@code done()}
 105  * will return {@code true} and each worker will terminate.
 106  *
 107  * <p>If the barrier action does not rely on the parties being suspended when
 108  * it is executed, then any of the threads in the party could execute that
 109  * action when it is released. To facilitate this, each invocation of
 110  * {@link #await} returns the arrival index of that thread at the barrier.
 111  * You can then choose which thread should execute the barrier action, for
 112  * example:
 113  * <pre> {@code
 114  * if (barrier.await() == 0) {
 115  *   // log the completion of this iteration
 116  * }}</pre>
 117  *
 118  * <p>The {@code CyclicBarrier} uses an all-or-none breakage model
 119  * for failed synchronization attempts: If a thread leaves a barrier
 120  * point prematurely because of interruption, failure, or timeout, all
 121  * other threads waiting at that barrier point will also leave
 122  * abnormally via {@link BrokenBarrierException} (or
 123  * {@link InterruptedException} if they too were interrupted at about
 124  * the same time).
 125  *
 126  * <p>Memory consistency effects: Actions in a thread prior to calling
 127  * {@code await()}
 128  * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
 129  * actions that are part of the barrier action, which in turn
 130  * <i>happen-before</i> actions following a successful return from the
 131  * corresponding {@code await()} in other threads.
 132  *
 133  * @see CountDownLatch
 134  * @see Phaser
 135  *
 136  * @author Doug Lea
 137  * @since 1.5
 138  */
 139 public class CyclicBarrier {
 140     /**
 141      * Each use of the barrier is represented as a generation instance.
 142      * The generation changes whenever the barrier is tripped, or
 143      * is reset. There can be many generations associated with threads
 144      * using the barrier - due to the non-deterministic way the lock
 145      * may be allocated to waiting threads - but only one of these
 146      * can be active at a time (the one to which {@code count} applies)
 147      * and all the rest are either broken or tripped.
 148      * There need not be an active generation if there has been a break
 149      * but no subsequent reset.
 150      */
 151     private static class Generation {
 152         Generation() {}                 // prevent access constructor creation
 153         boolean broken;                 // initially false
 154     }
 155 
 156     /** The lock for guarding barrier entry */
 157     private final ReentrantLock lock = new ReentrantLock();
 158     /** Condition to wait on until tripped */
 159     private final Condition trip = lock.newCondition();
 160     /** The number of parties */
 161     private final int parties;
 162     /** The command to run when tripped */
 163     private final Runnable barrierCommand;
 164     /** The current generation */
 165     private Generation generation = new Generation();
 166 
 167     /**
 168      * Number of parties still waiting. Counts down from parties to 0
 169      * on each generation.  It is reset to parties on each new
 170      * generation or when broken.
 171      */
 172     private int count;
 173 
 174     /**
 175      * Updates state on barrier trip and wakes up everyone.
 176      * Called only while holding lock.
 177      */
 178     private void nextGeneration() {
 179         // signal completion of last generation
 180         trip.signalAll();
 181         // set up next generation
 182         count = parties;
 183         generation = new Generation();
 184     }
 185 
 186     /**
 187      * Sets current barrier generation as broken and wakes up everyone.
 188      * Called only while holding lock.
 189      */
 190     private void breakBarrier() {
 191         generation.broken = true;
 192         count = parties;
 193         trip.signalAll();
 194     }
 195 
 196     /**
 197      * Main barrier code, covering the various policies.
 198      */
 199     private int dowait(boolean timed, long nanos)
 200         throws InterruptedException, BrokenBarrierException,
 201                TimeoutException {
 202         final ReentrantLock lock = this.lock;
 203         lock.lock();
 204         try {
 205             final Generation g = generation;
 206 
 207             if (g.broken)
 208                 throw new BrokenBarrierException();
 209 
 210             if (Thread.interrupted()) {
 211                 breakBarrier();
 212                 throw new InterruptedException();
 213             }
 214 
 215             int index = --count;
 216             if (index == 0) {  // tripped
 217                 Runnable command = barrierCommand;
 218                 if (command != null) {
 219                     try {
 220                         command.run();
 221                     } catch (Throwable ex) {
 222                         breakBarrier();
 223                         throw ex;
 224                     }
 225                 }
 226                 nextGeneration();
 227                 return 0;
 228             }
 229 
 230             // loop until tripped, broken, interrupted, or timed out
 231             for (;;) {
 232                 try {
 233                     if (!timed)
 234                         trip.await();
 235                     else if (nanos > 0L)
 236                         nanos = trip.awaitNanos(nanos);
 237                 } catch (InterruptedException ie) {
 238                     if (g == generation && ! g.broken) {
 239                         breakBarrier();
 240                         throw ie;
 241                     } else {
 242                         // We're about to finish waiting even if we had not
 243                         // been interrupted, so this interrupt is deemed to
 244                         // "belong" to subsequent execution.
 245                         Thread.currentThread().interrupt();
 246                     }
 247                 }
 248 
 249                 if (g.broken)
 250                     throw new BrokenBarrierException();
 251 
 252                 if (g != generation)
 253                     return index;
 254 
 255                 if (timed && nanos <= 0L) {
 256                     breakBarrier();
 257                     throw new TimeoutException();
 258                 }
 259             }
 260         } finally {
 261             lock.unlock();
 262         }
 263     }
 264 
 265     /**
 266      * Creates a new {@code CyclicBarrier} that will trip when the
 267      * given number of parties (threads) are waiting upon it, and which
 268      * will execute the given barrier action when the barrier is tripped,
 269      * performed by the last thread entering the barrier.
 270      *
 271      * @param parties the number of threads that must invoke {@link #await}
 272      *        before the barrier is tripped
 273      * @param barrierAction the command to execute when the barrier is
 274      *        tripped, or {@code null} if there is no action
 275      * @throws IllegalArgumentException if {@code parties} is less than 1
 276      */
 277     public CyclicBarrier(int parties, Runnable barrierAction) {
 278         if (parties <= 0) throw new IllegalArgumentException();
 279         this.parties = parties;
 280         this.count = parties;
 281         this.barrierCommand = barrierAction;
 282     }
 283 
 284     /**
 285      * Creates a new {@code CyclicBarrier} that will trip when the
 286      * given number of parties (threads) are waiting upon it, and
 287      * does not perform a predefined action when the barrier is tripped.
 288      *
 289      * @param parties the number of threads that must invoke {@link #await}
 290      *        before the barrier is tripped
 291      * @throws IllegalArgumentException if {@code parties} is less than 1
 292      */
 293     public CyclicBarrier(int parties) {
 294         this(parties, null);
 295     }
 296 
 297     /**
 298      * Returns the number of parties required to trip this barrier.
 299      *
 300      * @return the number of parties required to trip this barrier
 301      */
 302     public int getParties() {
 303         return parties;
 304     }
 305 
 306     /**
 307      * Waits until all {@linkplain #getParties parties} have invoked
 308      * {@code await} on this barrier.
 309      *
 310      * <p>If the current thread is not the last to arrive then it is
 311      * disabled for thread scheduling purposes and lies dormant until
 312      * one of the following things happens:
 313      * <ul>
 314      * <li>The last thread arrives; or
 315      * <li>Some other thread {@linkplain Thread#interrupt interrupts}
 316      * the current thread; or
 317      * <li>Some other thread {@linkplain Thread#interrupt interrupts}
 318      * one of the other waiting threads; or
 319      * <li>Some other thread times out while waiting for barrier; or
 320      * <li>Some other thread invokes {@link #reset} on this barrier.
 321      * </ul>
 322      *
 323      * <p>If the current thread:
 324      * <ul>
 325      * <li>has its interrupted status set on entry to this method; or
 326      * <li>is {@linkplain Thread#interrupt interrupted} while waiting
 327      * </ul>
 328      * then {@link InterruptedException} is thrown and the current thread's
 329      * interrupted status is cleared.
 330      *
 331      * <p>If the barrier is {@link #reset} while any thread is waiting,
 332      * or if the barrier {@linkplain #isBroken is broken} when
 333      * {@code await} is invoked, or while any thread is waiting, then
 334      * {@link BrokenBarrierException} is thrown.
 335      *
 336      * <p>If any thread is {@linkplain Thread#interrupt interrupted} while waiting,
 337      * then all other waiting threads will throw
 338      * {@link BrokenBarrierException} and the barrier is placed in the broken
 339      * state.
 340      *
 341      * <p>If the current thread is the last thread to arrive, and a
 342      * non-null barrier action was supplied in the constructor, then the
 343      * current thread runs the action before allowing the other threads to
 344      * continue.
 345      * If an exception occurs during the barrier action then that exception
 346      * will be propagated in the current thread and the barrier is placed in
 347      * the broken state.
 348      *
 349      * @return the arrival index of the current thread, where index
 350      *         {@code getParties() - 1} indicates the first
 351      *         to arrive and zero indicates the last to arrive
 352      * @throws InterruptedException if the current thread was interrupted
 353      *         while waiting
 354      * @throws BrokenBarrierException if <em>another</em> thread was
 355      *         interrupted or timed out while the current thread was
 356      *         waiting, or the barrier was reset, or the barrier was
 357      *         broken when {@code await} was called, or the barrier
 358      *         action (if present) failed due to an exception
 359      */
 360     public int await() throws InterruptedException, BrokenBarrierException {
 361         try {
 362             return dowait(false, 0L);
 363         } catch (TimeoutException toe) {
 364             throw new Error(toe); // cannot happen
 365         }
 366     }
 367 
 368     /**
 369      * Waits until all {@linkplain #getParties parties} have invoked
 370      * {@code await} on this barrier, or the specified waiting time elapses.
 371      *
 372      * <p>If the current thread is not the last to arrive then it is
 373      * disabled for thread scheduling purposes and lies dormant until
 374      * one of the following things happens:
 375      * <ul>
 376      * <li>The last thread arrives; or
 377      * <li>The specified timeout elapses; or
 378      * <li>Some other thread {@linkplain Thread#interrupt interrupts}
 379      * the current thread; or
 380      * <li>Some other thread {@linkplain Thread#interrupt interrupts}
 381      * one of the other waiting threads; or
 382      * <li>Some other thread times out while waiting for barrier; or
 383      * <li>Some other thread invokes {@link #reset} on this barrier.
 384      * </ul>
 385      *
 386      * <p>If the current thread:
 387      * <ul>
 388      * <li>has its interrupted status set on entry to this method; or
 389      * <li>is {@linkplain Thread#interrupt interrupted} while waiting
 390      * </ul>
 391      * then {@link InterruptedException} is thrown and the current thread's
 392      * interrupted status is cleared.
 393      *
 394      * <p>If the specified waiting time elapses then {@link TimeoutException}
 395      * is thrown. If the time is less than or equal to zero, the
 396      * method will not wait at all.
 397      *
 398      * <p>If the barrier is {@link #reset} while any thread is waiting,
 399      * or if the barrier {@linkplain #isBroken is broken} when
 400      * {@code await} is invoked, or while any thread is waiting, then
 401      * {@link BrokenBarrierException} is thrown.
 402      *
 403      * <p>If any thread is {@linkplain Thread#interrupt interrupted} while
 404      * waiting, then all other waiting threads will throw {@link
 405      * BrokenBarrierException} and the barrier is placed in the broken
 406      * state.
 407      *
 408      * <p>If the current thread is the last thread to arrive, and a
 409      * non-null barrier action was supplied in the constructor, then the
 410      * current thread runs the action before allowing the other threads to
 411      * continue.
 412      * If an exception occurs during the barrier action then that exception
 413      * will be propagated in the current thread and the barrier is placed in
 414      * the broken state.
 415      *
 416      * @param timeout the time to wait for the barrier
 417      * @param unit the time unit of the timeout parameter
 418      * @return the arrival index of the current thread, where index
 419      *         {@code getParties() - 1} indicates the first
 420      *         to arrive and zero indicates the last to arrive
 421      * @throws InterruptedException if the current thread was interrupted
 422      *         while waiting
 423      * @throws TimeoutException if the specified timeout elapses.
 424      *         In this case the barrier will be broken.
 425      * @throws BrokenBarrierException if <em>another</em> thread was
 426      *         interrupted or timed out while the current thread was
 427      *         waiting, or the barrier was reset, or the barrier was broken
 428      *         when {@code await} was called, or the barrier action (if
 429      *         present) failed due to an exception
 430      */
 431     public int await(long timeout, TimeUnit unit)
 432         throws InterruptedException,
 433                BrokenBarrierException,
 434                TimeoutException {
 435         return dowait(true, unit.toNanos(timeout));
 436     }
 437 
 438     /**
 439      * Queries if this barrier is in a broken state.
 440      *
 441      * @return {@code true} if one or more parties broke out of this
 442      *         barrier due to interruption or timeout since
 443      *         construction or the last reset, or a barrier action
 444      *         failed due to an exception; {@code false} otherwise.
 445      */
 446     public boolean isBroken() {
 447         final ReentrantLock lock = this.lock;
 448         lock.lock();
 449         try {
 450             return generation.broken;
 451         } finally {
 452             lock.unlock();
 453         }
 454     }
 455 
 456     /**
 457      * Resets the barrier to its initial state.  If any parties are
 458      * currently waiting at the barrier, they will return with a
 459      * {@link BrokenBarrierException}. Note that resets <em>after</em>
 460      * a breakage has occurred for other reasons can be complicated to
 461      * carry out; threads need to re-synchronize in some other way,
 462      * and choose one to perform the reset.  It may be preferable to
 463      * instead create a new barrier for subsequent use.
 464      */
 465     public void reset() {
 466         final ReentrantLock lock = this.lock;
 467         lock.lock();
 468         try {
 469             breakBarrier();   // break the current generation
 470             nextGeneration(); // start a new generation
 471         } finally {
 472             lock.unlock();
 473         }
 474     }
 475 
 476     /**
 477      * Returns the number of parties currently waiting at the barrier.
 478      * This method is primarily useful for debugging and assertions.
 479      *
 480      * @return the number of parties currently blocked in {@link #await}
 481      */
 482     public int getNumberWaiting() {
 483         final ReentrantLock lock = this.lock;
 484         lock.lock();
 485         try {
 486             return parties - count;
 487         } finally {
 488             lock.unlock();
 489         }
 490     }
 491 }