1 /*
   2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   3  *
   4  * This code is free software; you can redistribute it and/or modify it
   5  * under the terms of the GNU General Public License version 2 only, as
   6  * published by the Free Software Foundation.  Oracle designates this
   7  * particular file as subject to the "Classpath" exception as provided
   8  * by Oracle in the LICENSE file that accompanied this code.
   9  *
  10  * This code is distributed in the hope that it will be useful, but WITHOUT
  11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  13  * version 2 for more details (a copy is included in the LICENSE file that
  14  * accompanied this code).
  15  *
  16  * You should have received a copy of the GNU General Public License version
  17  * 2 along with this work; if not, write to the Free Software Foundation,
  18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  19  *
  20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  21  * or visit www.oracle.com if you need additional information or have any
  22  * questions.
  23  */
  24 
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/publicdomain/zero/1.0/
  34  */
  35 
  36 package java.util.concurrent;
  37 
  38 import java.util.concurrent.TimeUnit;
  39 import java.util.concurrent.TimeoutException;
  40 import java.util.concurrent.atomic.AtomicReference;
  41 import java.util.concurrent.locks.LockSupport;
  42 
  43 /**
  44  * A reusable synchronization barrier, similar in functionality to
  45  * {@link java.util.concurrent.CyclicBarrier CyclicBarrier} and
  46  * {@link java.util.concurrent.CountDownLatch CountDownLatch}
  47  * but supporting more flexible usage.
  48  *
  49  * <p> <b>Registration.</b> Unlike the case for other barriers, the
  50  * number of parties <em>registered</em> to synchronize on a phaser
  51  * may vary over time.  Tasks may be registered at any time (using
  52  * methods {@link #register}, {@link #bulkRegister}, or forms of
  53  * constructors establishing initial numbers of parties), and
  54  * optionally deregistered upon any arrival (using {@link
  55  * #arriveAndDeregister}).  As is the case with most basic
  56  * synchronization constructs, registration and deregistration affect
  57  * only internal counts; they do not establish any further internal
  58  * bookkeeping, so tasks cannot query whether they are registered.
  59  * (However, you can introduce such bookkeeping by subclassing this
  60  * class.)
  61  *
  62  * <p> <b>Synchronization.</b> Like a {@code CyclicBarrier}, a {@code
  63  * Phaser} may be repeatedly awaited.  Method {@link
  64  * #arriveAndAwaitAdvance} has effect analogous to {@link
  65  * java.util.concurrent.CyclicBarrier#await CyclicBarrier.await}. Each
  66  * generation of a phaser has an associated phase number. The phase
  67  * number starts at zero, and advances when all parties arrive at the
  68  * phaser, wrapping around to zero after reaching {@code
  69  * Integer.MAX_VALUE}. The use of phase numbers enables independent
  70  * control of actions upon arrival at a phaser and upon awaiting
  71  * others, via two kinds of methods that may be invoked by any
  72  * registered party:
  73  *
  74  * <ul>
  75  *
  76  *   <li> <b>Arrival.</b> Methods {@link #arrive} and
  77  *       {@link #arriveAndDeregister} record arrival.  These methods
  78  *       do not block, but return an associated <em>arrival phase
  79  *       number</em>; that is, the phase number of the phaser to which
  80  *       the arrival applied. When the final party for a given phase
  81  *       arrives, an optional action is performed and the phase
  82  *       advances.  These actions are performed by the party
  83  *       triggering a phase advance, and are arranged by overriding
  84  *       method {@link #onAdvance(int, int)}, which also controls
  85  *       termination. Overriding this method is similar to, but more
  86  *       flexible than, providing a barrier action to a {@code
  87  *       CyclicBarrier}.
  88  *
  89  *   <li> <b>Waiting.</b> Method {@link #awaitAdvance} requires an
  90  *       argument indicating an arrival phase number, and returns when
  91  *       the phaser advances to (or is already at) a different phase.
  92  *       Unlike similar constructions using {@code CyclicBarrier},
  93  *       method {@code awaitAdvance} continues to wait even if the
  94  *       waiting thread is interrupted. Interruptible and timeout
  95  *       versions are also available, but exceptions encountered while
  96  *       tasks wait interruptibly or with timeout do not change the
  97  *       state of the phaser. If necessary, you can perform any
  98  *       associated recovery within handlers of those exceptions,
  99  *       often after invoking {@code forceTermination}.  Phasers may
 100  *       also be used by tasks executing in a {@link ForkJoinPool},
 101  *       which will ensure sufficient parallelism to execute tasks
 102  *       when others are blocked waiting for a phase to advance.
 103  *
 104  * </ul>
 105  *
 106  * <p> <b>Termination.</b> A phaser may enter a <em>termination</em>
 107  * state, that may be checked using method {@link #isTerminated}. Upon
 108  * termination, all synchronization methods immediately return without
 109  * waiting for advance, as indicated by a negative return value.
 110  * Similarly, attempts to register upon termination have no effect.
 111  * Termination is triggered when an invocation of {@code onAdvance}
 112  * returns {@code true}. The default implementation returns {@code
 113  * true} if a deregistration has caused the number of registered
 114  * parties to become zero.  As illustrated below, when phasers control
 115  * actions with a fixed number of iterations, it is often convenient
 116  * to override this method to cause termination when the current phase
 117  * number reaches a threshold. Method {@link #forceTermination} is
 118  * also available to abruptly release waiting threads and allow them
 119  * to terminate.
 120  *
 121  * <p> <b>Tiering.</b> Phasers may be <em>tiered</em> (i.e.,
 122  * constructed in tree structures) to reduce contention. Phasers with
 123  * large numbers of parties that would otherwise experience heavy
 124  * synchronization contention costs may instead be set up so that
 125  * groups of sub-phasers share a common parent.  This may greatly
 126  * increase throughput even though it incurs greater per-operation
 127  * overhead.
 128  *
 129  * <p>In a tree of tiered phasers, registration and deregistration of
 130  * child phasers with their parent are managed automatically.
 131  * Whenever the number of registered parties of a child phaser becomes
 132  * non-zero (as established in the {@link #Phaser(Phaser,int)}
 133  * constructor, {@link #register}, or {@link #bulkRegister}), the
 134  * child phaser is registered with its parent.  Whenever the number of
 135  * registered parties becomes zero as the result of an invocation of
 136  * {@link #arriveAndDeregister}, the child phaser is deregistered
 137  * from its parent.
 138  *
 139  * <p><b>Monitoring.</b> While synchronization methods may be invoked
 140  * only by registered parties, the current state of a phaser may be
 141  * monitored by any caller.  At any given moment there are {@link
 142  * #getRegisteredParties} parties in total, of which {@link
 143  * #getArrivedParties} have arrived at the current phase ({@link
 144  * #getPhase}).  When the remaining ({@link #getUnarrivedParties})
 145  * parties arrive, the phase advances.  The values returned by these
 146  * methods may reflect transient states and so are not in general
 147  * useful for synchronization control.  Method {@link #toString}
 148  * returns snapshots of these state queries in a form convenient for
 149  * informal monitoring.
 150  *
 151  * <p><b>Sample usages:</b>
 152  *
 153  * <p>A {@code Phaser} may be used instead of a {@code CountDownLatch}
 154  * to control a one-shot action serving a variable number of parties.
 155  * The typical idiom is for the method setting this up to first
 156  * register, then start the actions, then deregister, as in:
 157  *
 158  *  <pre> {@code
 159  * void runTasks(List<Runnable> tasks) {
 160  *   final Phaser phaser = new Phaser(1); // "1" to register self
 161  *   // create and start threads
 162  *   for (final Runnable task : tasks) {
 163  *     phaser.register();
 164  *     new Thread() {
 165  *       public void run() {
 166  *         phaser.arriveAndAwaitAdvance(); // await all creation
 167  *         task.run();
 168  *       }
 169  *     }.start();
 170  *   }
 171  *
 172  *   // allow threads to start and deregister self
 173  *   phaser.arriveAndDeregister();
 174  * }}</pre>
 175  *
 176  * <p>One way to cause a set of threads to repeatedly perform actions
 177  * for a given number of iterations is to override {@code onAdvance}:
 178  *
 179  *  <pre> {@code
 180  * void startTasks(List<Runnable> tasks, final int iterations) {
 181  *   final Phaser phaser = new Phaser() {
 182  *     protected boolean onAdvance(int phase, int registeredParties) {
 183  *       return phase >= iterations || registeredParties == 0;
 184  *     }
 185  *   };
 186  *   phaser.register();
 187  *   for (final Runnable task : tasks) {
 188  *     phaser.register();
 189  *     new Thread() {
 190  *       public void run() {
 191  *         do {
 192  *           task.run();
 193  *           phaser.arriveAndAwaitAdvance();
 194  *         } while (!phaser.isTerminated());
 195  *       }
 196  *     }.start();
 197  *   }
 198  *   phaser.arriveAndDeregister(); // deregister self, don't wait
 199  * }}</pre>
 200  *
 201  * If the main task must later await termination, it
 202  * may re-register and then execute a similar loop:
 203  *  <pre> {@code
 204  *   // ...
 205  *   phaser.register();
 206  *   while (!phaser.isTerminated())
 207  *     phaser.arriveAndAwaitAdvance();}</pre>
 208  *
 209  * <p>Related constructions may be used to await particular phase numbers
 210  * in contexts where you are sure that the phase will never wrap around
 211  * {@code Integer.MAX_VALUE}. For example:
 212  *
 213  *  <pre> {@code
 214  * void awaitPhase(Phaser phaser, int phase) {
 215  *   int p = phaser.register(); // assumes caller not already registered
 216  *   while (p < phase) {
 217  *     if (phaser.isTerminated())
 218  *       // ... deal with unexpected termination
 219  *     else
 220  *       p = phaser.arriveAndAwaitAdvance();
 221  *   }
 222  *   phaser.arriveAndDeregister();
 223  * }}</pre>
 224  *
 225  *
 226  * <p>To create a set of {@code n} tasks using a tree of phasers, you
 227  * could use code of the following form, assuming a Task class with a
 228  * constructor accepting a {@code Phaser} that it registers with upon
 229  * construction. After invocation of {@code build(new Task[n], 0, n,
 230  * new Phaser())}, these tasks could then be started, for example by
 231  * submitting to a pool:
 232  *
 233  *  <pre> {@code
 234  * void build(Task[] tasks, int lo, int hi, Phaser ph) {
 235  *   if (hi - lo > TASKS_PER_PHASER) {
 236  *     for (int i = lo; i < hi; i += TASKS_PER_PHASER) {
 237  *       int j = Math.min(i + TASKS_PER_PHASER, hi);
 238  *       build(tasks, i, j, new Phaser(ph));
 239  *     }
 240  *   } else {
 241  *     for (int i = lo; i < hi; ++i)
 242  *       tasks[i] = new Task(ph);
 243  *       // assumes new Task(ph) performs ph.register()
 244  *   }
 245  * }}</pre>
 246  *
 247  * The best value of {@code TASKS_PER_PHASER} depends mainly on
 248  * expected synchronization rates. A value as low as four may
 249  * be appropriate for extremely small per-phase task bodies (thus
 250  * high rates), or up to hundreds for extremely large ones.
 251  *
 252  * <p><b>Implementation notes</b>: This implementation restricts the
 253  * maximum number of parties to 65535. Attempts to register additional
 254  * parties result in {@code IllegalStateException}. However, you can and
 255  * should create tiered phasers to accommodate arbitrarily large sets
 256  * of participants.
 257  *
 258  * @since 1.7
 259  * @author Doug Lea
 260  */
 261 public class Phaser {
 262     /*
 263      * This class implements an extension of X10 "clocks".  Thanks to
 264      * Vijay Saraswat for the idea, and to Vivek Sarkar for
 265      * enhancements to extend functionality.
 266      */
 267 
 268     /**
 269      * Primary state representation, holding four bit-fields:
 270      *
 271      * unarrived  -- the number of parties yet to hit barrier (bits  0-15)
 272      * parties    -- the number of parties to wait            (bits 16-31)
 273      * phase      -- the generation of the barrier            (bits 32-62)
 274      * terminated -- set if barrier is terminated             (bit  63 / sign)
 275      *
 276      * Except that a phaser with no registered parties is
 277      * distinguished by the otherwise illegal state of having zero
 278      * parties and one unarrived parties (encoded as EMPTY below).
 279      *
 280      * To efficiently maintain atomicity, these values are packed into
 281      * a single (atomic) long. Good performance relies on keeping
 282      * state decoding and encoding simple, and keeping race windows
 283      * short.
 284      *
 285      * All state updates are performed via CAS except initial
 286      * registration of a sub-phaser (i.e., one with a non-null
 287      * parent).  In this (relatively rare) case, we use built-in
 288      * synchronization to lock while first registering with its
 289      * parent.
 290      *
 291      * The phase of a subphaser is allowed to lag that of its
 292      * ancestors until it is actually accessed -- see method
 293      * reconcileState.
 294      */
 295     private volatile long state;
 296 
 297     private static final int  MAX_PARTIES     = 0xffff;
 298     private static final int  MAX_PHASE       = Integer.MAX_VALUE;
 299     private static final int  PARTIES_SHIFT   = 16;
 300     private static final int  PHASE_SHIFT     = 32;
 301     private static final int  UNARRIVED_MASK  = 0xffff;      // to mask ints
 302     private static final long PARTIES_MASK    = 0xffff0000L; // to mask longs
 303     private static final long TERMINATION_BIT = 1L << 63;
 304 
 305     // some special values
 306     private static final int  ONE_ARRIVAL     = 1;
 307     private static final int  ONE_PARTY       = 1 << PARTIES_SHIFT;
 308     private static final int  EMPTY           = 1;
 309 
 310     // The following unpacking methods are usually manually inlined
 311 
 312     private static int unarrivedOf(long s) {
 313         int counts = (int)s;
 314         return (counts == EMPTY) ? 0 : counts & UNARRIVED_MASK;
 315     }
 316 
 317     private static int partiesOf(long s) {
 318         return (int)s >>> PARTIES_SHIFT;
 319     }
 320 
 321     private static int phaseOf(long s) {
 322         return (int)(s >>> PHASE_SHIFT);
 323     }
 324 
 325     private static int arrivedOf(long s) {
 326         int counts = (int)s;
 327         return (counts == EMPTY) ? 0 :
 328             (counts >>> PARTIES_SHIFT) - (counts & UNARRIVED_MASK);
 329     }
 330 
 331     /**
 332      * The parent of this phaser, or null if none
 333      */
 334     private final Phaser parent;
 335 
 336     /**
 337      * The root of phaser tree. Equals this if not in a tree.
 338      */
 339     private final Phaser root;
 340 
 341     /**
 342      * Heads of Treiber stacks for waiting threads. To eliminate
 343      * contention when releasing some threads while adding others, we
 344      * use two of them, alternating across even and odd phases.
 345      * Subphasers share queues with root to speed up releases.
 346      */
 347     private final AtomicReference<QNode> evenQ;
 348     private final AtomicReference<QNode> oddQ;
 349 
 350     private AtomicReference<QNode> queueFor(int phase) {
 351         return ((phase & 1) == 0) ? evenQ : oddQ;
 352     }
 353 
 354     /**
 355      * Returns message string for bounds exceptions on arrival.
 356      */
 357     private String badArrive(long s) {
 358         return "Attempted arrival of unregistered party for " +
 359             stateToString(s);
 360     }
 361 
 362     /**
 363      * Returns message string for bounds exceptions on registration.
 364      */
 365     private String badRegister(long s) {
 366         return "Attempt to register more than " +
 367             MAX_PARTIES + " parties for " + stateToString(s);
 368     }
 369 
 370     /**
 371      * Main implementation for methods arrive and arriveAndDeregister.
 372      * Manually tuned to speed up and minimize race windows for the
 373      * common case of just decrementing unarrived field.
 374      *
 375      * @param deregister false for arrive, true for arriveAndDeregister
 376      */
 377     private int doArrive(boolean deregister) {
 378         int adj = deregister ? ONE_ARRIVAL|ONE_PARTY : ONE_ARRIVAL;
 379         final Phaser root = this.root;
 380         for (;;) {
 381             long s = (root == this) ? state : reconcileState();
 382             int phase = (int)(s >>> PHASE_SHIFT);
 383             int counts = (int)s;
 384             int unarrived = (counts & UNARRIVED_MASK) - 1;
 385             if (phase < 0)
 386                 return phase;
 387             else if (counts == EMPTY || unarrived < 0) {
 388                 if (root == this || reconcileState() == s)
 389                     throw new IllegalStateException(badArrive(s));
 390             }
 391             else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adj)) {
 392                 if (unarrived == 0) {
 393                     long n = s & PARTIES_MASK;  // base of next state
 394                     int nextUnarrived = (int)n >>> PARTIES_SHIFT;
 395                     if (root != this)
 396                         return parent.doArrive(nextUnarrived == 0);
 397                     if (onAdvance(phase, nextUnarrived))
 398                         n |= TERMINATION_BIT;
 399                     else if (nextUnarrived == 0)
 400                         n |= EMPTY;
 401                     else
 402                         n |= nextUnarrived;
 403                     n |= (long)((phase + 1) & MAX_PHASE) << PHASE_SHIFT;
 404                     UNSAFE.compareAndSwapLong(this, stateOffset, s, n);
 405                     releaseWaiters(phase);
 406                 }
 407                 return phase;
 408             }
 409         }
 410     }
 411 
 412     /**
 413      * Implementation of register, bulkRegister
 414      *
 415      * @param registrations number to add to both parties and
 416      * unarrived fields. Must be greater than zero.
 417      */
 418     private int doRegister(int registrations) {
 419         // adjustment to state
 420         long adj = ((long)registrations << PARTIES_SHIFT) | registrations;
 421         final Phaser parent = this.parent;
 422         int phase;
 423         for (;;) {
 424             long s = state;
 425             int counts = (int)s;
 426             int parties = counts >>> PARTIES_SHIFT;
 427             int unarrived = counts & UNARRIVED_MASK;
 428             if (registrations > MAX_PARTIES - parties)
 429                 throw new IllegalStateException(badRegister(s));
 430             else if ((phase = (int)(s >>> PHASE_SHIFT)) < 0)
 431                 break;
 432             else if (counts != EMPTY) {             // not 1st registration
 433                 if (parent == null || reconcileState() == s) {
 434                     if (unarrived == 0)             // wait out advance
 435                         root.internalAwaitAdvance(phase, null);
 436                     else if (UNSAFE.compareAndSwapLong(this, stateOffset,
 437                                                        s, s + adj))
 438                         break;
 439                 }
 440             }
 441             else if (parent == null) {              // 1st root registration
 442                 long next = ((long)phase << PHASE_SHIFT) | adj;
 443                 if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))
 444                     break;
 445             }
 446             else {
 447                 synchronized (this) {               // 1st sub registration
 448                     if (state == s) {               // recheck under lock
 449                         parent.doRegister(1);
 450                         do {                        // force current phase
 451                             phase = (int)(root.state >>> PHASE_SHIFT);
 452                             // assert phase < 0 || (int)state == EMPTY;
 453                         } while (!UNSAFE.compareAndSwapLong
 454                                  (this, stateOffset, state,
 455                                   ((long)phase << PHASE_SHIFT) | adj));
 456                         break;
 457                     }
 458                 }
 459             }
 460         }
 461         return phase;
 462     }
 463 
 464     /**
 465      * Resolves lagged phase propagation from root if necessary.
 466      * Reconciliation normally occurs when root has advanced but
 467      * subphasers have not yet done so, in which case they must finish
 468      * their own advance by setting unarrived to parties (or if
 469      * parties is zero, resetting to unregistered EMPTY state).
 470      * However, this method may also be called when "floating"
 471      * subphasers with possibly some unarrived parties are merely
 472      * catching up to current phase, in which case counts are
 473      * unaffected.
 474      *
 475      * @return reconciled state
 476      */
 477     private long reconcileState() {
 478         final Phaser root = this.root;
 479         long s = state;
 480         if (root != this) {
 481             int phase, u, p;
 482             // CAS root phase with current parties; possibly trip unarrived
 483             while ((phase = (int)(root.state >>> PHASE_SHIFT)) !=
 484                    (int)(s >>> PHASE_SHIFT) &&
 485                    !UNSAFE.compareAndSwapLong
 486                    (this, stateOffset, s,
 487                     s = (((long)phase << PHASE_SHIFT) |
 488                          (s & PARTIES_MASK) |
 489                          ((p = (int)s >>> PARTIES_SHIFT) == 0 ? EMPTY :
 490                           (u = (int)s & UNARRIVED_MASK) == 0 ? p : u))))
 491                 s = state;
 492         }
 493         return s;
 494     }
 495 
 496     /**
 497      * Creates a new phaser with no initially registered parties, no
 498      * parent, and initial phase number 0. Any thread using this
 499      * phaser will need to first register for it.
 500      */
 501     public Phaser() {
 502         this(null, 0);
 503     }
 504 
 505     /**
 506      * Creates a new phaser with the given number of registered
 507      * unarrived parties, no parent, and initial phase number 0.
 508      *
 509      * @param parties the number of parties required to advance to the
 510      * next phase
 511      * @throws IllegalArgumentException if parties less than zero
 512      * or greater than the maximum number of parties supported
 513      */
 514     public Phaser(int parties) {
 515         this(null, parties);
 516     }
 517 
 518     /**
 519      * Equivalent to {@link #Phaser(Phaser, int) Phaser(parent, 0)}.
 520      *
 521      * @param parent the parent phaser
 522      */
 523     public Phaser(Phaser parent) {
 524         this(parent, 0);
 525     }
 526 
 527     /**
 528      * Creates a new phaser with the given parent and number of
 529      * registered unarrived parties.  When the given parent is non-null
 530      * and the given number of parties is greater than zero, this
 531      * child phaser is registered with its parent.
 532      *
 533      * @param parent the parent phaser
 534      * @param parties the number of parties required to advance to the
 535      * next phase
 536      * @throws IllegalArgumentException if parties less than zero
 537      * or greater than the maximum number of parties supported
 538      */
 539     public Phaser(Phaser parent, int parties) {
 540         if (parties >>> PARTIES_SHIFT != 0)
 541             throw new IllegalArgumentException("Illegal number of parties");
 542         int phase = 0;
 543         this.parent = parent;
 544         if (parent != null) {
 545             final Phaser root = parent.root;
 546             this.root = root;
 547             this.evenQ = root.evenQ;
 548             this.oddQ = root.oddQ;
 549             if (parties != 0)
 550                 phase = parent.doRegister(1);
 551         }
 552         else {
 553             this.root = this;
 554             this.evenQ = new AtomicReference<QNode>();
 555             this.oddQ = new AtomicReference<QNode>();
 556         }
 557         this.state = (parties == 0) ? (long)EMPTY :
 558             ((long)phase << PHASE_SHIFT) |
 559             ((long)parties << PARTIES_SHIFT) |
 560             ((long)parties);
 561     }
 562 
 563     /**
 564      * Adds a new unarrived party to this phaser.  If an ongoing
 565      * invocation of {@link #onAdvance} is in progress, this method
 566      * may await its completion before returning.  If this phaser has
 567      * a parent, and this phaser previously had no registered parties,
 568      * this child phaser is also registered with its parent. If
 569      * this phaser is terminated, the attempt to register has
 570      * no effect, and a negative value is returned.
 571      *
 572      * @return the arrival phase number to which this registration
 573      * applied.  If this value is negative, then this phaser has
 574      * terminated, in which case registration has no effect.
 575      * @throws IllegalStateException if attempting to register more
 576      * than the maximum supported number of parties
 577      */
 578     public int register() {
 579         return doRegister(1);
 580     }
 581 
 582     /**
 583      * Adds the given number of new unarrived parties to this phaser.
 584      * If an ongoing invocation of {@link #onAdvance} is in progress,
 585      * this method may await its completion before returning.  If this
 586      * phaser has a parent, and the given number of parties is greater
 587      * than zero, and this phaser previously had no registered
 588      * parties, this child phaser is also registered with its parent.
 589      * If this phaser is terminated, the attempt to register has no
 590      * effect, and a negative value is returned.
 591      *
 592      * @param parties the number of additional parties required to
 593      * advance to the next phase
 594      * @return the arrival phase number to which this registration
 595      * applied.  If this value is negative, then this phaser has
 596      * terminated, in which case registration has no effect.
 597      * @throws IllegalStateException if attempting to register more
 598      * than the maximum supported number of parties
 599      * @throws IllegalArgumentException if {@code parties < 0}
 600      */
 601     public int bulkRegister(int parties) {
 602         if (parties < 0)
 603             throw new IllegalArgumentException();
 604         if (parties == 0)
 605             return getPhase();
 606         return doRegister(parties);
 607     }
 608 
 609     /**
 610      * Arrives at this phaser, without waiting for others to arrive.
 611      *
 612      * <p>It is a usage error for an unregistered party to invoke this
 613      * method.  However, this error may result in an {@code
 614      * IllegalStateException} only upon some subsequent operation on
 615      * this phaser, if ever.
 616      *
 617      * @return the arrival phase number, or a negative value if terminated
 618      * @throws IllegalStateException if not terminated and the number
 619      * of unarrived parties would become negative
 620      */
 621     public int arrive() {
 622         return doArrive(false);
 623     }
 624 
 625     /**
 626      * Arrives at this phaser and deregisters from it without waiting
 627      * for others to arrive. Deregistration reduces the number of
 628      * parties required to advance in future phases.  If this phaser
 629      * has a parent, and deregistration causes this phaser to have
 630      * zero parties, this phaser is also deregistered from its parent.
 631      *
 632      * <p>It is a usage error for an unregistered party to invoke this
 633      * method.  However, this error may result in an {@code
 634      * IllegalStateException} only upon some subsequent operation on
 635      * this phaser, if ever.
 636      *
 637      * @return the arrival phase number, or a negative value if terminated
 638      * @throws IllegalStateException if not terminated and the number
 639      * of registered or unarrived parties would become negative
 640      */
 641     public int arriveAndDeregister() {
 642         return doArrive(true);
 643     }
 644 
 645     /**
 646      * Arrives at this phaser and awaits others. Equivalent in effect
 647      * to {@code awaitAdvance(arrive())}.  If you need to await with
 648      * interruption or timeout, you can arrange this with an analogous
 649      * construction using one of the other forms of the {@code
 650      * awaitAdvance} method.  If instead you need to deregister upon
 651      * arrival, use {@code awaitAdvance(arriveAndDeregister())}.
 652      *
 653      * <p>It is a usage error for an unregistered party to invoke this
 654      * method.  However, this error may result in an {@code
 655      * IllegalStateException} only upon some subsequent operation on
 656      * this phaser, if ever.
 657      *
 658      * @return the arrival phase number, or the (negative)
 659      * {@linkplain #getPhase() current phase} if terminated
 660      * @throws IllegalStateException if not terminated and the number
 661      * of unarrived parties would become negative
 662      */
 663     public int arriveAndAwaitAdvance() {
 664         // Specialization of doArrive+awaitAdvance eliminating some reads/paths
 665         final Phaser root = this.root;
 666         for (;;) {
 667             long s = (root == this) ? state : reconcileState();
 668             int phase = (int)(s >>> PHASE_SHIFT);
 669             int counts = (int)s;
 670             int unarrived = (counts & UNARRIVED_MASK) - 1;
 671             if (phase < 0)
 672                 return phase;
 673             else if (counts == EMPTY || unarrived < 0) {
 674                 if (reconcileState() == s)
 675                     throw new IllegalStateException(badArrive(s));
 676             }
 677             else if (UNSAFE.compareAndSwapLong(this, stateOffset, s,
 678                                                s -= ONE_ARRIVAL)) {
 679                 if (unarrived != 0)
 680                     return root.internalAwaitAdvance(phase, null);
 681                 if (root != this)
 682                     return parent.arriveAndAwaitAdvance();
 683                 long n = s & PARTIES_MASK;  // base of next state
 684                 int nextUnarrived = (int)n >>> PARTIES_SHIFT;
 685                 if (onAdvance(phase, nextUnarrived))
 686                     n |= TERMINATION_BIT;
 687                 else if (nextUnarrived == 0)
 688                     n |= EMPTY;
 689                 else
 690                     n |= nextUnarrived;
 691                 int nextPhase = (phase + 1) & MAX_PHASE;
 692                 n |= (long)nextPhase << PHASE_SHIFT;
 693                 if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n))
 694                     return (int)(state >>> PHASE_SHIFT); // terminated
 695                 releaseWaiters(phase);
 696                 return nextPhase;
 697             }
 698         }
 699     }
 700 
 701     /**
 702      * Awaits the phase of this phaser to advance from the given phase
 703      * value, returning immediately if the current phase is not equal
 704      * to the given phase value or this phaser is terminated.
 705      *
 706      * @param phase an arrival phase number, or negative value if
 707      * terminated; this argument is normally the value returned by a
 708      * previous call to {@code arrive} or {@code arriveAndDeregister}.
 709      * @return the next arrival phase number, or the argument if it is
 710      * negative, or the (negative) {@linkplain #getPhase() current phase}
 711      * if terminated
 712      */
 713     public int awaitAdvance(int phase) {
 714         final Phaser root = this.root;
 715         long s = (root == this) ? state : reconcileState();
 716         int p = (int)(s >>> PHASE_SHIFT);
 717         if (phase < 0)
 718             return phase;
 719         if (p == phase)
 720             return root.internalAwaitAdvance(phase, null);
 721         return p;
 722     }
 723 
 724     /**
 725      * Awaits the phase of this phaser to advance from the given phase
 726      * value, throwing {@code InterruptedException} if interrupted
 727      * while waiting, or returning immediately if the current phase is
 728      * not equal to the given phase value or this phaser is
 729      * terminated.
 730      *
 731      * @param phase an arrival phase number, or negative value if
 732      * terminated; this argument is normally the value returned by a
 733      * previous call to {@code arrive} or {@code arriveAndDeregister}.
 734      * @return the next arrival phase number, or the argument if it is
 735      * negative, or the (negative) {@linkplain #getPhase() current phase}
 736      * if terminated
 737      * @throws InterruptedException if thread interrupted while waiting
 738      */
 739     public int awaitAdvanceInterruptibly(int phase)
 740         throws InterruptedException {
 741         final Phaser root = this.root;
 742         long s = (root == this) ? state : reconcileState();
 743         int p = (int)(s >>> PHASE_SHIFT);
 744         if (phase < 0)
 745             return phase;
 746         if (p == phase) {
 747             QNode node = new QNode(this, phase, true, false, 0L);
 748             p = root.internalAwaitAdvance(phase, node);
 749             if (node.wasInterrupted)
 750                 throw new InterruptedException();
 751         }
 752         return p;
 753     }
 754 
 755     /**
 756      * Awaits the phase of this phaser to advance from the given phase
 757      * value or the given timeout to elapse, throwing {@code
 758      * InterruptedException} if interrupted while waiting, or
 759      * returning immediately if the current phase is not equal to the
 760      * given phase value or this phaser is terminated.
 761      *
 762      * @param phase an arrival phase number, or negative value if
 763      * terminated; this argument is normally the value returned by a
 764      * previous call to {@code arrive} or {@code arriveAndDeregister}.
 765      * @param timeout how long to wait before giving up, in units of
 766      *        {@code unit}
 767      * @param unit a {@code TimeUnit} determining how to interpret the
 768      *        {@code timeout} parameter
 769      * @return the next arrival phase number, or the argument if it is
 770      * negative, or the (negative) {@linkplain #getPhase() current phase}
 771      * if terminated
 772      * @throws InterruptedException if thread interrupted while waiting
 773      * @throws TimeoutException if timed out while waiting
 774      */
 775     public int awaitAdvanceInterruptibly(int phase,
 776                                          long timeout, TimeUnit unit)
 777         throws InterruptedException, TimeoutException {
 778         long nanos = unit.toNanos(timeout);
 779         final Phaser root = this.root;
 780         long s = (root == this) ? state : reconcileState();
 781         int p = (int)(s >>> PHASE_SHIFT);
 782         if (phase < 0)
 783             return phase;
 784         if (p == phase) {
 785             QNode node = new QNode(this, phase, true, true, nanos);
 786             p = root.internalAwaitAdvance(phase, node);
 787             if (node.wasInterrupted)
 788                 throw new InterruptedException();
 789             else if (p == phase)
 790                 throw new TimeoutException();
 791         }
 792         return p;
 793     }
 794 
 795     /**
 796      * Forces this phaser to enter termination state.  Counts of
 797      * registered parties are unaffected.  If this phaser is a member
 798      * of a tiered set of phasers, then all of the phasers in the set
 799      * are terminated.  If this phaser is already terminated, this
 800      * method has no effect.  This method may be useful for
 801      * coordinating recovery after one or more tasks encounter
 802      * unexpected exceptions.
 803      */
 804     public void forceTermination() {
 805         // Only need to change root state
 806         final Phaser root = this.root;
 807         long s;
 808         while ((s = root.state) >= 0) {
 809             if (UNSAFE.compareAndSwapLong(root, stateOffset,
 810                                           s, s | TERMINATION_BIT)) {
 811                 // signal all threads
 812                 releaseWaiters(0);
 813                 releaseWaiters(1);
 814                 return;
 815             }
 816         }
 817     }
 818 
 819     /**
 820      * Returns the current phase number. The maximum phase number is
 821      * {@code Integer.MAX_VALUE}, after which it restarts at
 822      * zero. Upon termination, the phase number is negative,
 823      * in which case the prevailing phase prior to termination
 824      * may be obtained via {@code getPhase() + Integer.MIN_VALUE}.
 825      *
 826      * @return the phase number, or a negative value if terminated
 827      */
 828     public final int getPhase() {
 829         return (int)(root.state >>> PHASE_SHIFT);
 830     }
 831 
 832     /**
 833      * Returns the number of parties registered at this phaser.
 834      *
 835      * @return the number of parties
 836      */
 837     public int getRegisteredParties() {
 838         return partiesOf(state);
 839     }
 840 
 841     /**
 842      * Returns the number of registered parties that have arrived at
 843      * the current phase of this phaser. If this phaser has terminated,
 844      * the returned value is meaningless and arbitrary.
 845      *
 846      * @return the number of arrived parties
 847      */
 848     public int getArrivedParties() {
 849         return arrivedOf(reconcileState());
 850     }
 851 
 852     /**
 853      * Returns the number of registered parties that have not yet
 854      * arrived at the current phase of this phaser. If this phaser has
 855      * terminated, the returned value is meaningless and arbitrary.
 856      *
 857      * @return the number of unarrived parties
 858      */
 859     public int getUnarrivedParties() {
 860         return unarrivedOf(reconcileState());
 861     }
 862 
 863     /**
 864      * Returns the parent of this phaser, or {@code null} if none.
 865      *
 866      * @return the parent of this phaser, or {@code null} if none
 867      */
 868     public Phaser getParent() {
 869         return parent;
 870     }
 871 
 872     /**
 873      * Returns the root ancestor of this phaser, which is the same as
 874      * this phaser if it has no parent.
 875      *
 876      * @return the root ancestor of this phaser
 877      */
 878     public Phaser getRoot() {
 879         return root;
 880     }
 881 
 882     /**
 883      * Returns {@code true} if this phaser has been terminated.
 884      *
 885      * @return {@code true} if this phaser has been terminated
 886      */
 887     public boolean isTerminated() {
 888         return root.state < 0L;
 889     }
 890 
 891     /**
 892      * Overridable method to perform an action upon impending phase
 893      * advance, and to control termination. This method is invoked
 894      * upon arrival of the party advancing this phaser (when all other
 895      * waiting parties are dormant).  If this method returns {@code
 896      * true}, this phaser will be set to a final termination state
 897      * upon advance, and subsequent calls to {@link #isTerminated}
 898      * will return true. Any (unchecked) Exception or Error thrown by
 899      * an invocation of this method is propagated to the party
 900      * attempting to advance this phaser, in which case no advance
 901      * occurs.
 902      *
 903      * <p>The arguments to this method provide the state of the phaser
 904      * prevailing for the current transition.  The effects of invoking
 905      * arrival, registration, and waiting methods on this phaser from
 906      * within {@code onAdvance} are unspecified and should not be
 907      * relied on.
 908      *
 909      * <p>If this phaser is a member of a tiered set of phasers, then
 910      * {@code onAdvance} is invoked only for its root phaser on each
 911      * advance.
 912      *
 913      * <p>To support the most common use cases, the default
 914      * implementation of this method returns {@code true} when the
 915      * number of registered parties has become zero as the result of a
 916      * party invoking {@code arriveAndDeregister}.  You can disable
 917      * this behavior, thus enabling continuation upon future
 918      * registrations, by overriding this method to always return
 919      * {@code false}:
 920      *
 921      * <pre> {@code
 922      * Phaser phaser = new Phaser() {
 923      *   protected boolean onAdvance(int phase, int parties) { return false; }
 924      * }}</pre>
 925      *
 926      * @param phase the current phase number on entry to this method,
 927      * before this phaser is advanced
 928      * @param registeredParties the current number of registered parties
 929      * @return {@code true} if this phaser should terminate
 930      */
 931     protected boolean onAdvance(int phase, int registeredParties) {
 932         return registeredParties == 0;
 933     }
 934 
 935     /**
 936      * Returns a string identifying this phaser, as well as its
 937      * state.  The state, in brackets, includes the String {@code
 938      * "phase = "} followed by the phase number, {@code "parties = "}
 939      * followed by the number of registered parties, and {@code
 940      * "arrived = "} followed by the number of arrived parties.
 941      *
 942      * @return a string identifying this phaser, as well as its state
 943      */
 944     public String toString() {
 945         return stateToString(reconcileState());
 946     }
 947 
 948     /**
 949      * Implementation of toString and string-based error messages
 950      */
 951     private String stateToString(long s) {
 952         return super.toString() +
 953             "[phase = " + phaseOf(s) +
 954             " parties = " + partiesOf(s) +
 955             " arrived = " + arrivedOf(s) + "]";
 956     }
 957 
 958     // Waiting mechanics
 959 
 960     /**
 961      * Removes and signals threads from queue for phase.
 962      */
 963     private void releaseWaiters(int phase) {
 964         QNode q;   // first element of queue
 965         Thread t;  // its thread
 966         AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
 967         while ((q = head.get()) != null &&
 968                q.phase != (int)(root.state >>> PHASE_SHIFT)) {
 969             if (head.compareAndSet(q, q.next) &&
 970                 (t = q.thread) != null) {
 971                 q.thread = null;
 972                 LockSupport.unpark(t);
 973             }
 974         }
 975     }
 976 
 977     /**
 978      * Variant of releaseWaiters that additionally tries to remove any
 979      * nodes no longer waiting for advance due to timeout or
 980      * interrupt. Currently, nodes are removed only if they are at
 981      * head of queue, which suffices to reduce memory footprint in
 982      * most usages.
 983      *
 984      * @return current phase on exit
 985      */
 986     private int abortWait(int phase) {
 987         AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
 988         for (;;) {
 989             Thread t;
 990             QNode q = head.get();
 991             int p = (int)(root.state >>> PHASE_SHIFT);
 992             if (q == null || ((t = q.thread) != null && q.phase == p))
 993                 return p;
 994             if (head.compareAndSet(q, q.next) && t != null) {
 995                 q.thread = null;
 996                 LockSupport.unpark(t);
 997             }
 998         }
 999     }
1000 
1001     /** The number of CPUs, for spin control */
1002     private static final int NCPU = Runtime.getRuntime().availableProcessors();
1003 
1004     /**
1005      * The number of times to spin before blocking while waiting for
1006      * advance, per arrival while waiting. On multiprocessors, fully
1007      * blocking and waking up a large number of threads all at once is
1008      * usually a very slow process, so we use rechargeable spins to
1009      * avoid it when threads regularly arrive: When a thread in
1010      * internalAwaitAdvance notices another arrival before blocking,
1011      * and there appear to be enough CPUs available, it spins
1012      * SPINS_PER_ARRIVAL more times before blocking. The value trades
1013      * off good-citizenship vs big unnecessary slowdowns.
1014      */
1015     static final int SPINS_PER_ARRIVAL = (NCPU < 2) ? 1 : 1 << 8;
1016 
1017     /**
1018      * Possibly blocks and waits for phase to advance unless aborted.
1019      * Call only from root node.
1020      *
1021      * @param phase current phase
1022      * @param node if non-null, the wait node to track interrupt and timeout;
1023      * if null, denotes noninterruptible wait
1024      * @return current phase
1025      */
1026     private int internalAwaitAdvance(int phase, QNode node) {
1027         releaseWaiters(phase-1);          // ensure old queue clean
1028         boolean queued = false;           // true when node is enqueued
1029         int lastUnarrived = 0;            // to increase spins upon change
1030         int spins = SPINS_PER_ARRIVAL;
1031         long s;
1032         int p;
1033         while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {
1034             if (node == null) {           // spinning in noninterruptible mode
1035                 int unarrived = (int)s & UNARRIVED_MASK;
1036                 if (unarrived != lastUnarrived &&
1037                     (lastUnarrived = unarrived) < NCPU)
1038                     spins += SPINS_PER_ARRIVAL;
1039                 boolean interrupted = Thread.interrupted();
1040                 if (interrupted || --spins < 0) { // need node to record intr
1041                     node = new QNode(this, phase, false, false, 0L);
1042                     node.wasInterrupted = interrupted;
1043                 }
1044             }
1045             else if (node.isReleasable()) // done or aborted
1046                 break;
1047             else if (!queued) {           // push onto queue
1048                 AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
1049                 QNode q = node.next = head.get();
1050                 if ((q == null || q.phase == phase) &&
1051                     (int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq
1052                     queued = head.compareAndSet(q, node);
1053             }
1054             else {
1055                 try {
1056                     ForkJoinPool.managedBlock(node);
1057                 } catch (InterruptedException ie) {
1058                     node.wasInterrupted = true;
1059                 }
1060             }
1061         }
1062 
1063         if (node != null) {
1064             if (node.thread != null)
1065                 node.thread = null;       // avoid need for unpark()
1066             if (node.wasInterrupted && !node.interruptible)
1067                 Thread.currentThread().interrupt();
1068             if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)
1069                 return abortWait(phase); // possibly clean up on abort
1070         }
1071         releaseWaiters(phase);
1072         return p;
1073     }
1074 
1075     /**
1076      * Wait nodes for Treiber stack representing wait queue
1077      */
1078     static final class QNode implements ForkJoinPool.ManagedBlocker {
1079         final Phaser phaser;
1080         final int phase;
1081         final boolean interruptible;
1082         final boolean timed;
1083         boolean wasInterrupted;
1084         long nanos;
1085         long lastTime;
1086         volatile Thread thread; // nulled to cancel wait
1087         QNode next;
1088 
1089         QNode(Phaser phaser, int phase, boolean interruptible,
1090               boolean timed, long nanos) {
1091             this.phaser = phaser;
1092             this.phase = phase;
1093             this.interruptible = interruptible;
1094             this.nanos = nanos;
1095             this.timed = timed;
1096             this.lastTime = timed ? System.nanoTime() : 0L;
1097             thread = Thread.currentThread();
1098         }
1099 
1100         public boolean isReleasable() {
1101             if (thread == null)
1102                 return true;
1103             if (phaser.getPhase() != phase) {
1104                 thread = null;
1105                 return true;
1106             }
1107             if (Thread.interrupted())
1108                 wasInterrupted = true;
1109             if (wasInterrupted && interruptible) {
1110                 thread = null;
1111                 return true;
1112             }
1113             if (timed) {
1114                 if (nanos > 0L) {
1115                     long now = System.nanoTime();
1116                     nanos -= now - lastTime;
1117                     lastTime = now;
1118                 }
1119                 if (nanos <= 0L) {
1120                     thread = null;
1121                     return true;
1122                 }
1123             }
1124             return false;
1125         }
1126 
1127         public boolean block() {
1128             if (isReleasable())
1129                 return true;
1130             else if (!timed)
1131                 LockSupport.park(this);
1132             else if (nanos > 0)
1133                 LockSupport.parkNanos(this, nanos);
1134             return isReleasable();
1135         }
1136     }
1137 
1138     // Unsafe mechanics
1139 
1140     private static final sun.misc.Unsafe UNSAFE;
1141     private static final long stateOffset;
1142     static {
1143         try {
1144             UNSAFE = sun.misc.Unsafe.getUnsafe();
1145             Class k = Phaser.class;
1146             stateOffset = UNSAFE.objectFieldOffset
1147                 (k.getDeclaredField("state"));
1148         } catch (Exception e) {
1149             throw new Error(e);
1150         }
1151     }
1152 }