1 /*
   2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   3  *
   4  * This code is free software; you can redistribute it and/or modify it
   5  * under the terms of the GNU General Public License version 2 only, as
   6  * published by the Free Software Foundation.  Oracle designates this
   7  * particular file as subject to the "Classpath" exception as provided
   8  * by Oracle in the LICENSE file that accompanied this code.
   9  *
  10  * This code is distributed in the hope that it will be useful, but WITHOUT
  11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  13  * version 2 for more details (a copy is included in the LICENSE file that
  14  * accompanied this code).
  15  *
  16  * You should have received a copy of the GNU General Public License version
  17  * 2 along with this work; if not, write to the Free Software Foundation,
  18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  19  *
  20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  21  * or visit www.oracle.com if you need additional information or have any
  22  * questions.
  23  */
  24 
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/publicdomain/zero/1.0/
  34  */
  35 
  36 package java.util.concurrent;
  37 
  38 import java.util.concurrent.locks.Condition;
  39 import java.util.concurrent.locks.ReentrantLock;
  40 
  41 /**
  42  * A synchronization aid that allows a set of threads to all wait for
  43  * each other to reach a common barrier point.  CyclicBarriers are
  44  * useful in programs involving a fixed sized party of threads that
  45  * must occasionally wait for each other. The barrier is called
  46  * <em>cyclic</em> because it can be re-used after the waiting threads
  47  * are released.
  48  *
  49  * <p>A {@code CyclicBarrier} supports an optional {@link Runnable} command
  50  * that is run once per barrier point, after the last thread in the party
  51  * arrives, but before any threads are released.
  52  * This <em>barrier action</em> is useful
  53  * for updating shared-state before any of the parties continue.
  54  *
  55  * <p><b>Sample usage:</b> Here is an example of using a barrier in a
  56  * parallel decomposition design:
  57  *
  58  * <pre> {@code
  59  * class Solver {
  60  *   final int N;
  61  *   final float[][] data;
  62  *   final CyclicBarrier barrier;
  63  *
  64  *   class Worker implements Runnable {
  65  *     int myRow;
  66  *     Worker(int row) { myRow = row; }
  67  *     public void run() {
  68  *       while (!done()) {
  69  *         processRow(myRow);
  70  *
  71  *         try {
  72  *           barrier.await();
  73  *         } catch (InterruptedException ex) {
  74  *           return;
  75  *         } catch (BrokenBarrierException ex) {
  76  *           return;
  77  *         }
  78  *       }
  79  *     }
  80  *   }
  81  *
  82  *   public Solver(float[][] matrix) {
  83  *     data = matrix;
  84  *     N = matrix.length;
  85  *     Runnable barrierAction = () -> mergeRows(...);
  86  *     barrier = new CyclicBarrier(N, barrierAction);
  87  *
  88  *     List<Thread> threads = new ArrayList<>(N);
  89  *     for (int i = 0; i < N; i++) {
  90  *       Thread thread = new Thread(new Worker(i));
  91  *       threads.add(thread);
  92  *       thread.start();
  93  *     }
  94  *
  95  *     // wait until done
  96  *     for (Thread thread : threads)
  97  *       try {
  98  *         thread.join();
  99  *       } catch (InterruptedException ex) { }
 100  *   }
 101  * }}</pre>
 102  *
 103  * Here, each worker thread processes a row of the matrix, then waits at the
 104  * barrier until all rows have been processed. When all rows are processed the
 105  * supplied {@link Runnable} barrier action is executed and merges the rows.
 106  * If the merger determines that a solution has been found then {@code done()}
 107  * will return {@code true} and each worker will terminate.
 108  *
 109  * <p>If the barrier action does not rely on the parties being suspended when
 110  * it is executed, then any of the threads in the party could execute that
 111  * action when it is released. To facilitate this, each invocation of
 112  * {@link #await} returns the arrival index of that thread at the barrier.
 113  * You can then choose which thread should execute the barrier action, for
 114  * example:
 115  * <pre> {@code
 116  * if (barrier.await() == 0) {
 117  *   // log the completion of this iteration
 118  * }}</pre>
 119  *
 120  * <p>The {@code CyclicBarrier} uses an all-or-none breakage model
 121  * for failed synchronization attempts: If a thread leaves a barrier
 122  * point prematurely because of interruption, failure, or timeout, all
 123  * other threads waiting at that barrier point will also leave
 124  * abnormally via {@link BrokenBarrierException} (or
 125  * {@link InterruptedException} if they too were interrupted at about
 126  * the same time).
 127  *
 128  * <p>Memory consistency effects: Actions in a thread prior to calling
 129  * {@code await()}
 130  * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
 131  * actions that are part of the barrier action, which in turn
 132  * <i>happen-before</i> actions following a successful return from the
 133  * corresponding {@code await()} in other threads.
 134  *
 135  * @see CountDownLatch
 136  * @see Phaser
 137  *
 138  * @author Doug Lea
 139  * @since 1.5
 140  */
 141 public class CyclicBarrier {
 142     /**
 143      * Each use of the barrier is represented as a generation instance.
 144      * The generation changes whenever the barrier is tripped, or
 145      * is reset. There can be many generations associated with threads
 146      * using the barrier - due to the non-deterministic way the lock
 147      * may be allocated to waiting threads - but only one of these
 148      * can be active at a time (the one to which {@code count} applies)
 149      * and all the rest are either broken or tripped.
 150      * There need not be an active generation if there has been a break
 151      * but no subsequent reset.
 152      */
 153     private static class Generation {
 154         Generation() {}                 // prevent access constructor creation
 155         boolean broken;                 // initially false
 156     }
 157 
 158     /** The lock for guarding barrier entry */
 159     private final ReentrantLock lock = new ReentrantLock();
 160     /** Condition to wait on until tripped */
 161     private final Condition trip = lock.newCondition();
 162     /** The number of parties */
 163     private final int parties;
 164     /** The command to run when tripped */
 165     private final Runnable barrierCommand;
 166     /** The current generation */
 167     private Generation generation = new Generation();
 168 
 169     /**
 170      * Number of parties still waiting. Counts down from parties to 0
 171      * on each generation.  It is reset to parties on each new
 172      * generation or when broken.
 173      */
 174     private int count;
 175 
 176     /**
 177      * Updates state on barrier trip and wakes up everyone.
 178      * Called only while holding lock.
 179      */
 180     private void nextGeneration() {
 181         // signal completion of last generation
 182         trip.signalAll();
 183         // set up next generation
 184         count = parties;
 185         generation = new Generation();
 186     }
 187 
 188     /**
 189      * Sets current barrier generation as broken and wakes up everyone.
 190      * Called only while holding lock.
 191      */
 192     private void breakBarrier() {
 193         generation.broken = true;
 194         count = parties;
 195         trip.signalAll();
 196     }
 197 
 198     /**
 199      * Main barrier code, covering the various policies.
 200      */
 201     private int dowait(boolean timed, long nanos)
 202         throws InterruptedException, BrokenBarrierException,
 203                TimeoutException {
 204         final ReentrantLock lock = this.lock;
 205         lock.lock();
 206         try {
 207             final Generation g = generation;
 208 
 209             if (g.broken)
 210                 throw new BrokenBarrierException();
 211 
 212             if (Thread.interrupted()) {
 213                 breakBarrier();
 214                 throw new InterruptedException();
 215             }
 216 
 217             int index = --count;
 218             if (index == 0) {  // tripped
 219                 Runnable command = barrierCommand;
 220                 if (command != null) {
 221                     try {
 222                         command.run();
 223                     } catch (Throwable ex) {
 224                         breakBarrier();
 225                         throw ex;
 226                     }
 227                 }
 228                 nextGeneration();
 229                 return 0;
 230             }
 231 
 232             // loop until tripped, broken, interrupted, or timed out
 233             for (;;) {
 234                 try {
 235                     if (!timed)
 236                         trip.await();
 237                     else if (nanos > 0L)
 238                         nanos = trip.awaitNanos(nanos);
 239                 } catch (InterruptedException ie) {
 240                     if (g == generation && ! g.broken) {
 241                         breakBarrier();
 242                         throw ie;
 243                     } else {
 244                         // We're about to finish waiting even if we had not
 245                         // been interrupted, so this interrupt is deemed to
 246                         // "belong" to subsequent execution.
 247                         Thread.currentThread().interrupt();
 248                     }
 249                 }
 250 
 251                 if (g.broken)
 252                     throw new BrokenBarrierException();
 253 
 254                 if (g != generation)
 255                     return index;
 256 
 257                 if (timed && nanos <= 0L) {
 258                     breakBarrier();
 259                     throw new TimeoutException();
 260                 }
 261             }
 262         } finally {
 263             lock.unlock();
 264         }
 265     }
 266 
 267     /**
 268      * Creates a new {@code CyclicBarrier} that will trip when the
 269      * given number of parties (threads) are waiting upon it, and which
 270      * will execute the given barrier action when the barrier is tripped,
 271      * performed by the last thread entering the barrier.
 272      *
 273      * @param parties the number of threads that must invoke {@link #await}
 274      *        before the barrier is tripped
 275      * @param barrierAction the command to execute when the barrier is
 276      *        tripped, or {@code null} if there is no action
 277      * @throws IllegalArgumentException if {@code parties} is less than 1
 278      */
 279     public CyclicBarrier(int parties, Runnable barrierAction) {
 280         if (parties <= 0) throw new IllegalArgumentException();
 281         this.parties = parties;
 282         this.count = parties;
 283         this.barrierCommand = barrierAction;
 284     }
 285 
 286     /**
 287      * Creates a new {@code CyclicBarrier} that will trip when the
 288      * given number of parties (threads) are waiting upon it, and
 289      * does not perform a predefined action when the barrier is tripped.
 290      *
 291      * @param parties the number of threads that must invoke {@link #await}
 292      *        before the barrier is tripped
 293      * @throws IllegalArgumentException if {@code parties} is less than 1
 294      */
 295     public CyclicBarrier(int parties) {
 296         this(parties, null);
 297     }
 298 
 299     /**
 300      * Returns the number of parties required to trip this barrier.
 301      *
 302      * @return the number of parties required to trip this barrier
 303      */
 304     public int getParties() {
 305         return parties;
 306     }
 307 
 308     /**
 309      * Waits until all {@linkplain #getParties parties} have invoked
 310      * {@code await} on this barrier.
 311      *
 312      * <p>If the current thread is not the last to arrive then it is
 313      * disabled for thread scheduling purposes and lies dormant until
 314      * one of the following things happens:
 315      * <ul>
 316      * <li>The last thread arrives; or
 317      * <li>Some other thread {@linkplain Thread#interrupt interrupts}
 318      * the current thread; or
 319      * <li>Some other thread {@linkplain Thread#interrupt interrupts}
 320      * one of the other waiting threads; or
 321      * <li>Some other thread times out while waiting for barrier; or
 322      * <li>Some other thread invokes {@link #reset} on this barrier.
 323      * </ul>
 324      *
 325      * <p>If the current thread:
 326      * <ul>
 327      * <li>has its interrupted status set on entry to this method; or
 328      * <li>is {@linkplain Thread#interrupt interrupted} while waiting
 329      * </ul>
 330      * then {@link InterruptedException} is thrown and the current thread's
 331      * interrupted status is cleared.
 332      *
 333      * <p>If the barrier is {@link #reset} while any thread is waiting,
 334      * or if the barrier {@linkplain #isBroken is broken} when
 335      * {@code await} is invoked, or while any thread is waiting, then
 336      * {@link BrokenBarrierException} is thrown.
 337      *
 338      * <p>If any thread is {@linkplain Thread#interrupt interrupted} while waiting,
 339      * then all other waiting threads will throw
 340      * {@link BrokenBarrierException} and the barrier is placed in the broken
 341      * state.
 342      *
 343      * <p>If the current thread is the last thread to arrive, and a
 344      * non-null barrier action was supplied in the constructor, then the
 345      * current thread runs the action before allowing the other threads to
 346      * continue.
 347      * If an exception occurs during the barrier action then that exception
 348      * will be propagated in the current thread and the barrier is placed in
 349      * the broken state.
 350      *
 351      * @return the arrival index of the current thread, where index
 352      *         {@code getParties() - 1} indicates the first
 353      *         to arrive and zero indicates the last to arrive
 354      * @throws InterruptedException if the current thread was interrupted
 355      *         while waiting
 356      * @throws BrokenBarrierException if <em>another</em> thread was
 357      *         interrupted or timed out while the current thread was
 358      *         waiting, or the barrier was reset, or the barrier was
 359      *         broken when {@code await} was called, or the barrier
 360      *         action (if present) failed due to an exception
 361      */
 362     public int await() throws InterruptedException, BrokenBarrierException {
 363         try {
 364             return dowait(false, 0L);
 365         } catch (TimeoutException toe) {
 366             throw new Error(toe); // cannot happen
 367         }
 368     }
 369 
 370     /**
 371      * Waits until all {@linkplain #getParties parties} have invoked
 372      * {@code await} on this barrier, or the specified waiting time elapses.
 373      *
 374      * <p>If the current thread is not the last to arrive then it is
 375      * disabled for thread scheduling purposes and lies dormant until
 376      * one of the following things happens:
 377      * <ul>
 378      * <li>The last thread arrives; or
 379      * <li>The specified timeout elapses; or
 380      * <li>Some other thread {@linkplain Thread#interrupt interrupts}
 381      * the current thread; or
 382      * <li>Some other thread {@linkplain Thread#interrupt interrupts}
 383      * one of the other waiting threads; or
 384      * <li>Some other thread times out while waiting for barrier; or
 385      * <li>Some other thread invokes {@link #reset} on this barrier.
 386      * </ul>
 387      *
 388      * <p>If the current thread:
 389      * <ul>
 390      * <li>has its interrupted status set on entry to this method; or
 391      * <li>is {@linkplain Thread#interrupt interrupted} while waiting
 392      * </ul>
 393      * then {@link InterruptedException} is thrown and the current thread's
 394      * interrupted status is cleared.
 395      *
 396      * <p>If the specified waiting time elapses then {@link TimeoutException}
 397      * is thrown. If the time is less than or equal to zero, the
 398      * method will not wait at all.
 399      *
 400      * <p>If the barrier is {@link #reset} while any thread is waiting,
 401      * or if the barrier {@linkplain #isBroken is broken} when
 402      * {@code await} is invoked, or while any thread is waiting, then
 403      * {@link BrokenBarrierException} is thrown.
 404      *
 405      * <p>If any thread is {@linkplain Thread#interrupt interrupted} while
 406      * waiting, then all other waiting threads will throw {@link
 407      * BrokenBarrierException} and the barrier is placed in the broken
 408      * state.
 409      *
 410      * <p>If the current thread is the last thread to arrive, and a
 411      * non-null barrier action was supplied in the constructor, then the
 412      * current thread runs the action before allowing the other threads to
 413      * continue.
 414      * If an exception occurs during the barrier action then that exception
 415      * will be propagated in the current thread and the barrier is placed in
 416      * the broken state.
 417      *
 418      * @param timeout the time to wait for the barrier
 419      * @param unit the time unit of the timeout parameter
 420      * @return the arrival index of the current thread, where index
 421      *         {@code getParties() - 1} indicates the first
 422      *         to arrive and zero indicates the last to arrive
 423      * @throws InterruptedException if the current thread was interrupted
 424      *         while waiting
 425      * @throws TimeoutException if the specified timeout elapses.
 426      *         In this case the barrier will be broken.
 427      * @throws BrokenBarrierException if <em>another</em> thread was
 428      *         interrupted or timed out while the current thread was
 429      *         waiting, or the barrier was reset, or the barrier was broken
 430      *         when {@code await} was called, or the barrier action (if
 431      *         present) failed due to an exception
 432      */
 433     public int await(long timeout, TimeUnit unit)
 434         throws InterruptedException,
 435                BrokenBarrierException,
 436                TimeoutException {
 437         return dowait(true, unit.toNanos(timeout));
 438     }
 439 
 440     /**
 441      * Queries if this barrier is in a broken state.
 442      *
 443      * @return {@code true} if one or more parties broke out of this
 444      *         barrier due to interruption or timeout since
 445      *         construction or the last reset, or a barrier action
 446      *         failed due to an exception; {@code false} otherwise.
 447      */
 448     public boolean isBroken() {
 449         final ReentrantLock lock = this.lock;
 450         lock.lock();
 451         try {
 452             return generation.broken;
 453         } finally {
 454             lock.unlock();
 455         }
 456     }
 457 
 458     /**
 459      * Resets the barrier to its initial state.  If any parties are
 460      * currently waiting at the barrier, they will return with a
 461      * {@link BrokenBarrierException}. Note that resets <em>after</em>
 462      * a breakage has occurred for other reasons can be complicated to
 463      * carry out; threads need to re-synchronize in some other way,
 464      * and choose one to perform the reset.  It may be preferable to
 465      * instead create a new barrier for subsequent use.
 466      */
 467     public void reset() {
 468         final ReentrantLock lock = this.lock;
 469         lock.lock();
 470         try {
 471             breakBarrier();   // break the current generation
 472             nextGeneration(); // start a new generation
 473         } finally {
 474             lock.unlock();
 475         }
 476     }
 477 
 478     /**
 479      * Returns the number of parties currently waiting at the barrier.
 480      * This method is primarily useful for debugging and assertions.
 481      *
 482      * @return the number of parties currently blocked in {@link #await}
 483      */
 484     public int getNumberWaiting() {
 485         final ReentrantLock lock = this.lock;
 486         lock.lock();
 487         try {
 488             return parties - count;
 489         } finally {
 490             lock.unlock();
 491         }
 492     }
 493 }