Print this page


Split Close
Expand all
Collapse all
          --- old/src/share/classes/java/util/concurrent/Phaser.java
          +++ new/src/share/classes/java/util/concurrent/Phaser.java
↓ open down ↓ 27 lines elided ↑ open up ↑
  28   28   * However, the following notice accompanied the original version of this
  29   29   * file:
  30   30   *
  31   31   * Written by Doug Lea with assistance from members of JCP JSR-166
  32   32   * Expert Group and released to the public domain, as explained at
  33   33   * http://creativecommons.org/licenses/publicdomain
  34   34   */
  35   35  
  36   36  package java.util.concurrent;
  37   37  
       38 +import java.util.concurrent.TimeUnit;
       39 +import java.util.concurrent.TimeoutException;
  38   40  import java.util.concurrent.atomic.AtomicReference;
  39   41  import java.util.concurrent.locks.LockSupport;
  40   42  
  41   43  /**
  42   44   * A reusable synchronization barrier, similar in functionality to
  43   45   * {@link java.util.concurrent.CyclicBarrier CyclicBarrier} and
  44   46   * {@link java.util.concurrent.CountDownLatch CountDownLatch}
  45   47   * but supporting more flexible usage.
  46   48   *
  47   49   * <p> <b>Registration.</b> Unlike the case for other barriers, the
↓ open down ↓ 6 lines elided ↑ open up ↑
  54   56   * synchronization constructs, registration and deregistration affect
  55   57   * only internal counts; they do not establish any further internal
  56   58   * bookkeeping, so tasks cannot query whether they are registered.
  57   59   * (However, you can introduce such bookkeeping by subclassing this
  58   60   * class.)
  59   61   *
  60   62   * <p> <b>Synchronization.</b> Like a {@code CyclicBarrier}, a {@code
  61   63   * Phaser} may be repeatedly awaited.  Method {@link
  62   64   * #arriveAndAwaitAdvance} has effect analogous to {@link
  63   65   * java.util.concurrent.CyclicBarrier#await CyclicBarrier.await}. Each
  64      - * generation of a {@code Phaser} has an associated phase number. The
  65      - * phase number starts at zero, and advances when all parties arrive
  66      - * at the barrier, wrapping around to zero after reaching {@code
       66 + * generation of a phaser has an associated phase number. The phase
       67 + * number starts at zero, and advances when all parties arrive at the
       68 + * phaser, wrapping around to zero after reaching {@code
  67   69   * Integer.MAX_VALUE}. The use of phase numbers enables independent
  68      - * control of actions upon arrival at a barrier and upon awaiting
       70 + * control of actions upon arrival at a phaser and upon awaiting
  69   71   * others, via two kinds of methods that may be invoked by any
  70   72   * registered party:
  71   73   *
  72   74   * <ul>
  73   75   *
  74   76   *   <li> <b>Arrival.</b> Methods {@link #arrive} and
  75      - *       {@link #arriveAndDeregister} record arrival at a
  76      - *       barrier. These methods do not block, but return an associated
  77      - *       <em>arrival phase number</em>; that is, the phase number of
  78      - *       the barrier to which the arrival applied. When the final
  79      - *       party for a given phase arrives, an optional barrier action
  80      - *       is performed and the phase advances.  Barrier actions,
  81      - *       performed by the party triggering a phase advance, are
  82      - *       arranged by overriding method {@link #onAdvance(int, int)},
  83      - *       which also controls termination. Overriding this method is
  84      - *       similar to, but more flexible than, providing a barrier
  85      - *       action to a {@code CyclicBarrier}.
       77 + *       {@link #arriveAndDeregister} record arrival.  These methods
       78 + *       do not block, but return an associated <em>arrival phase
       79 + *       number</em>; that is, the phase number of the phaser to which
       80 + *       the arrival applied. When the final party for a given phase
       81 + *       arrives, an optional action is performed and the phase
       82 + *       advances.  These actions are performed by the party
       83 + *       triggering a phase advance, and are arranged by overriding
       84 + *       method {@link #onAdvance(int, int)}, which also controls
       85 + *       termination. Overriding this method is similar to, but more
       86 + *       flexible than, providing a barrier action to a {@code
       87 + *       CyclicBarrier}.
  86   88   *
  87   89   *   <li> <b>Waiting.</b> Method {@link #awaitAdvance} requires an
  88   90   *       argument indicating an arrival phase number, and returns when
  89      - *       the barrier advances to (or is already at) a different phase.
       91 + *       the phaser advances to (or is already at) a different phase.
  90   92   *       Unlike similar constructions using {@code CyclicBarrier},
  91   93   *       method {@code awaitAdvance} continues to wait even if the
  92   94   *       waiting thread is interrupted. Interruptible and timeout
  93   95   *       versions are also available, but exceptions encountered while
  94   96   *       tasks wait interruptibly or with timeout do not change the
  95      - *       state of the barrier. If necessary, you can perform any
       97 + *       state of the phaser. If necessary, you can perform any
  96   98   *       associated recovery within handlers of those exceptions,
  97   99   *       often after invoking {@code forceTermination}.  Phasers may
  98  100   *       also be used by tasks executing in a {@link ForkJoinPool},
  99  101   *       which will ensure sufficient parallelism to execute tasks
 100  102   *       when others are blocked waiting for a phase to advance.
 101  103   *
 102  104   * </ul>
 103  105   *
 104      - * <p> <b>Termination.</b> A {@code Phaser} may enter a
 105      - * <em>termination</em> state in which all synchronization methods
 106      - * immediately return without updating phaser state or waiting for
 107      - * advance, and indicating (via a negative phase value) that execution
 108      - * is complete.  Termination is triggered when an invocation of {@code
 109      - * onAdvance} returns {@code true}.  As illustrated below, when
 110      - * phasers control actions with a fixed number of iterations, it is
 111      - * often convenient to override this method to cause termination when
 112      - * the current phase number reaches a threshold. Method {@link
 113      - * #forceTermination} is also available to abruptly release waiting
 114      - * threads and allow them to terminate.
      106 + * <p> <b>Termination.</b> A phaser may enter a <em>termination</em>
      107 + * state in which all synchronization methods immediately return
      108 + * without updating phaser state or waiting for advance, and
      109 + * indicating (via a negative phase value) that execution is complete.
      110 + * Termination is triggered when an invocation of {@code onAdvance}
      111 + * returns {@code true}. The default implementation returns {@code
      112 + * true} if a deregistration has caused the number of registered
      113 + * parties to become zero.  As illustrated below, when phasers control
      114 + * actions with a fixed number of iterations, it is often convenient
      115 + * to override this method to cause termination when the current phase
      116 + * number reaches a threshold. Method {@link #forceTermination} is
      117 + * also available to abruptly release waiting threads and allow them
      118 + * to terminate.
 115  119   *
 116      - * <p> <b>Tiering.</b> Phasers may be <em>tiered</em> (i.e., arranged
 117      - * in tree structures) to reduce contention. Phasers with large
 118      - * numbers of parties that would otherwise experience heavy
      120 + * <p> <b>Tiering.</b> Phasers may be <em>tiered</em> (i.e.,
      121 + * constructed in tree structures) to reduce contention. Phasers with
      122 + * large numbers of parties that would otherwise experience heavy
 119  123   * synchronization contention costs may instead be set up so that
 120  124   * groups of sub-phasers share a common parent.  This may greatly
 121  125   * increase throughput even though it incurs greater per-operation
 122  126   * overhead.
 123  127   *
 124  128   * <p><b>Monitoring.</b> While synchronization methods may be invoked
 125  129   * only by registered parties, the current state of a phaser may be
 126  130   * monitored by any caller.  At any given moment there are {@link
 127  131   * #getRegisteredParties} parties in total, of which {@link
 128  132   * #getArrivedParties} have arrived at the current phase ({@link
 129  133   * #getPhase}).  When the remaining ({@link #getUnarrivedParties})
 130  134   * parties arrive, the phase advances.  The values returned by these
 131  135   * methods may reflect transient states and so are not in general
 132  136   * useful for synchronization control.  Method {@link #toString}
 133  137   * returns snapshots of these state queries in a form convenient for
 134  138   * informal monitoring.
 135  139   *
 136  140   * <p><b>Sample usages:</b>
 137  141   *
 138  142   * <p>A {@code Phaser} may be used instead of a {@code CountDownLatch}
 139      - * to control a one-shot action serving a variable number of
 140      - * parties. The typical idiom is for the method setting this up to
 141      - * first register, then start the actions, then deregister, as in:
      143 + * to control a one-shot action serving a variable number of parties.
      144 + * The typical idiom is for the method setting this up to first
      145 + * register, then start the actions, then deregister, as in:
 142  146   *
 143  147   *  <pre> {@code
 144  148   * void runTasks(List<Runnable> tasks) {
 145  149   *   final Phaser phaser = new Phaser(1); // "1" to register self
 146  150   *   // create and start threads
 147  151   *   for (Runnable task : tasks) {
 148  152   *     phaser.register();
 149  153   *     new Thread() {
 150  154   *       public void run() {
 151  155   *         phaser.arriveAndAwaitAdvance(); // await all creation
↓ open down ↓ 51 lines elided ↑ open up ↑
 203  207   *       // ... deal with unexpected termination
 204  208   *     else
 205  209   *       p = phaser.arriveAndAwaitAdvance();
 206  210   *   }
 207  211   *   phaser.arriveAndDeregister();
 208  212   * }}</pre>
 209  213   *
 210  214   *
 211  215   * <p>To create a set of tasks using a tree of phasers,
 212  216   * you could use code of the following form, assuming a
 213      - * Task class with a constructor accepting a phaser that
 214      - * it registers for upon construction:
      217 + * Task class with a constructor accepting a {@code Phaser} that
      218 + * it registers with upon construction:
 215  219   *
 216  220   *  <pre> {@code
 217  221   * void build(Task[] actions, int lo, int hi, Phaser ph) {
 218  222   *   if (hi - lo > TASKS_PER_PHASER) {
 219  223   *     for (int i = lo; i < hi; i += TASKS_PER_PHASER) {
 220  224   *       int j = Math.min(i + TASKS_PER_PHASER, hi);
 221  225   *       build(actions, i, j, new Phaser(ph));
 222  226   *     }
 223  227   *   } else {
 224  228   *     for (int i = lo; i < hi; ++i)
 225  229   *       actions[i] = new Task(ph);
 226  230   *       // assumes new Task(ph) performs ph.register()
 227  231   *   }
 228  232   * }
 229  233   * // .. initially called, for n tasks via
 230  234   * build(new Task[n], 0, n, new Phaser());}</pre>
 231  235   *
 232  236   * The best value of {@code TASKS_PER_PHASER} depends mainly on
 233      - * expected barrier synchronization rates. A value as low as four may
 234      - * be appropriate for extremely small per-barrier task bodies (thus
      237 + * expected synchronization rates. A value as low as four may
      238 + * be appropriate for extremely small per-phase task bodies (thus
 235  239   * high rates), or up to hundreds for extremely large ones.
 236  240   *
 237      - * </pre>
 238      - *
 239  241   * <p><b>Implementation notes</b>: This implementation restricts the
 240  242   * maximum number of parties to 65535. Attempts to register additional
 241  243   * parties result in {@code IllegalStateException}. However, you can and
 242  244   * should create tiered phasers to accommodate arbitrarily large sets
 243  245   * of participants.
 244  246   *
 245  247   * @since 1.7
 246  248   * @author Doug Lea
 247  249   */
 248  250  public class Phaser {
 249  251      /*
 250  252       * This class implements an extension of X10 "clocks".  Thanks to
 251  253       * Vijay Saraswat for the idea, and to Vivek Sarkar for
 252  254       * enhancements to extend functionality.
 253  255       */
 254  256  
 255  257      /**
 256      -     * Barrier state representation. Conceptually, a barrier contains
 257      -     * four values:
      258 +     * Primary state representation, holding four fields:
 258  259       *
 259      -     * * parties -- the number of parties to wait (16 bits)
 260      -     * * unarrived -- the number of parties yet to hit barrier (16 bits)
 261      -     * * phase -- the generation of the barrier (31 bits)
 262      -     * * terminated -- set if barrier is terminated (1 bit)
      260 +     * * unarrived -- the number of parties yet to hit barrier (bits  0-15)
      261 +     * * parties -- the number of parties to wait              (bits 16-31)
      262 +     * * phase -- the generation of the barrier                (bits 32-62)
      263 +     * * terminated -- set if barrier is terminated            (bit  63 / sign)
 263  264       *
 264  265       * However, to efficiently maintain atomicity, these values are
 265  266       * packed into a single (atomic) long. Termination uses the sign
 266  267       * bit of 32 bit representation of phase, so phase is set to -1 on
 267  268       * termination. Good performance relies on keeping state decoding
 268  269       * and encoding simple, and keeping race windows short.
 269      -     *
 270      -     * Note: there are some cheats in arrive() that rely on unarrived
 271      -     * count being lowest 16 bits.
 272  270       */
 273  271      private volatile long state;
 274  272  
 275      -    private static final int ushortMask = 0xffff;
 276      -    private static final int phaseMask  = 0x7fffffff;
      273 +    private static final int  MAX_PARTIES     = 0xffff;
      274 +    private static final int  MAX_PHASE       = 0x7fffffff;
      275 +    private static final int  PARTIES_SHIFT   = 16;
      276 +    private static final int  PHASE_SHIFT     = 32;
      277 +    private static final int  UNARRIVED_MASK  = 0xffff;      // to mask ints
      278 +    private static final long PARTIES_MASK    = 0xffff0000L; // to mask longs
      279 +    private static final long ONE_ARRIVAL     = 1L;
      280 +    private static final long ONE_PARTY       = 1L << PARTIES_SHIFT;
      281 +    private static final long TERMINATION_BIT = 1L << 63;
 277  282  
      283 +    // The following unpacking methods are usually manually inlined
      284 +
 278  285      private static int unarrivedOf(long s) {
 279      -        return (int) (s & ushortMask);
      286 +        return (int)s & UNARRIVED_MASK;
 280  287      }
 281  288  
 282  289      private static int partiesOf(long s) {
 283      -        return ((int) s) >>> 16;
      290 +        return (int)s >>> PARTIES_SHIFT;
 284  291      }
 285  292  
 286  293      private static int phaseOf(long s) {
 287      -        return (int) (s >>> 32);
      294 +        return (int) (s >>> PHASE_SHIFT);
 288  295      }
 289  296  
 290  297      private static int arrivedOf(long s) {
 291  298          return partiesOf(s) - unarrivedOf(s);
 292  299      }
 293  300  
 294      -    private static long stateFor(int phase, int parties, int unarrived) {
 295      -        return ((((long) phase) << 32) | (((long) parties) << 16) |
 296      -                (long) unarrived);
 297      -    }
 298      -
 299      -    private static long trippedStateFor(int phase, int parties) {
 300      -        long lp = (long) parties;
 301      -        return (((long) phase) << 32) | (lp << 16) | lp;
 302      -    }
 303      -
 304  301      /**
 305      -     * Returns message string for bad bounds exceptions.
 306      -     */
 307      -    private static String badBounds(int parties, int unarrived) {
 308      -        return ("Attempt to set " + unarrived +
 309      -                " unarrived of " + parties + " parties");
 310      -    }
 311      -
 312      -    /**
 313  302       * The parent of this phaser, or null if none
 314  303       */
 315  304      private final Phaser parent;
 316  305  
 317  306      /**
 318  307       * The root of phaser tree. Equals this if not in a tree.  Used to
 319  308       * support faster state push-down.
 320  309       */
 321  310      private final Phaser root;
 322  311  
 323      -    // Wait queues
 324      -
 325  312      /**
 326  313       * Heads of Treiber stacks for waiting threads. To eliminate
 327      -     * contention while releasing some threads while adding others, we
      314 +     * contention when releasing some threads while adding others, we
 328  315       * use two of them, alternating across even and odd phases.
      316 +     * Subphasers share queues with root to speed up releases.
 329  317       */
 330      -    private final AtomicReference<QNode> evenQ = new AtomicReference<QNode>();
 331      -    private final AtomicReference<QNode> oddQ  = new AtomicReference<QNode>();
      318 +    private final AtomicReference<QNode> evenQ;
      319 +    private final AtomicReference<QNode> oddQ;
 332  320  
 333  321      private AtomicReference<QNode> queueFor(int phase) {
 334  322          return ((phase & 1) == 0) ? evenQ : oddQ;
 335  323      }
 336  324  
 337  325      /**
 338      -     * Returns current state, first resolving lagged propagation from
 339      -     * root if necessary.
      326 +     * Returns message string for bounds exceptions on arrival.
 340  327       */
 341      -    private long getReconciledState() {
 342      -        return (parent == null) ? state : reconcileState();
      328 +    private String badArrive(long s) {
      329 +        return "Attempted arrival of unregistered party for " +
      330 +            stateToString(s);
 343  331      }
 344  332  
 345  333      /**
 346      -     * Recursively resolves state.
      334 +     * Returns message string for bounds exceptions on registration.
 347  335       */
 348      -    private long reconcileState() {
 349      -        Phaser p = parent;
 350      -        long s = state;
 351      -        if (p != null) {
 352      -            while (unarrivedOf(s) == 0 && phaseOf(s) != phaseOf(root.state)) {
 353      -                long parentState = p.getReconciledState();
 354      -                int parentPhase = phaseOf(parentState);
 355      -                int phase = phaseOf(s = state);
 356      -                if (phase != parentPhase) {
 357      -                    long next = trippedStateFor(parentPhase, partiesOf(s));
 358      -                    if (casState(s, next)) {
      336 +    private String badRegister(long s) {
      337 +        return "Attempt to register more than " +
      338 +            MAX_PARTIES + " parties for " + stateToString(s);
      339 +    }
      340 +
      341 +    /**
      342 +     * Main implementation for methods arrive and arriveAndDeregister.
      343 +     * Manually tuned to speed up and minimize race windows for the
      344 +     * common case of just decrementing unarrived field.
      345 +     *
      346 +     * @param adj - adjustment to apply to state -- either
      347 +     * ONE_ARRIVAL (for arrive) or
      348 +     * ONE_ARRIVAL|ONE_PARTY (for arriveAndDeregister)
      349 +     */
      350 +    private int doArrive(long adj) {
      351 +        for (;;) {
      352 +            long s = state;
      353 +            int unarrived = (int)s & UNARRIVED_MASK;
      354 +            int phase = (int)(s >>> PHASE_SHIFT);
      355 +            if (phase < 0)
      356 +                return phase;
      357 +            else if (unarrived == 0) {
      358 +                if (reconcileState() == s)     // recheck
      359 +                    throw new IllegalStateException(badArrive(s));
      360 +            }
      361 +            else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adj)) {
      362 +                if (unarrived == 1) {
      363 +                    long p = s & PARTIES_MASK; // unshifted parties field
      364 +                    long lu = p >>> PARTIES_SHIFT;
      365 +                    int u = (int)lu;
      366 +                    int nextPhase = (phase + 1) & MAX_PHASE;
      367 +                    long next = ((long)nextPhase << PHASE_SHIFT) | p | lu;
      368 +                    final Phaser parent = this.parent;
      369 +                    if (parent == null) {
      370 +                        if (onAdvance(phase, u))
      371 +                            next |= TERMINATION_BIT;
      372 +                        UNSAFE.compareAndSwapLong(this, stateOffset, s, next);
 359  373                          releaseWaiters(phase);
 360      -                        s = next;
 361  374                      }
      375 +                    else {
      376 +                        parent.doArrive((u == 0) ?
      377 +                                        ONE_ARRIVAL|ONE_PARTY : ONE_ARRIVAL);
      378 +                        if ((int)(parent.state >>> PHASE_SHIFT) != nextPhase)
      379 +                            reconcileState();
      380 +                        else if (state == s)
      381 +                            UNSAFE.compareAndSwapLong(this, stateOffset, s,
      382 +                                                      next);
      383 +                    }
 362  384                  }
      385 +                return phase;
 363  386              }
 364  387          }
      388 +    }
      389 +
      390 +    /**
      391 +     * Implementation of register, bulkRegister
      392 +     *
      393 +     * @param registrations number to add to both parties and
      394 +     * unarrived fields. Must be greater than zero.
      395 +     */
      396 +    private int doRegister(int registrations) {
      397 +        // adjustment to state
      398 +        long adj = ((long)registrations << PARTIES_SHIFT) | registrations;
      399 +        final Phaser parent = this.parent;
      400 +        for (;;) {
      401 +            long s = (parent == null) ? state : reconcileState();
      402 +            int parties = (int)s >>> PARTIES_SHIFT;
      403 +            int phase = (int)(s >>> PHASE_SHIFT);
      404 +            if (phase < 0)
      405 +                return phase;
      406 +            else if (registrations > MAX_PARTIES - parties)
      407 +                throw new IllegalStateException(badRegister(s));
      408 +            else if ((parties == 0 && parent == null) || // first reg of root
      409 +                     ((int)s & UNARRIVED_MASK) != 0) {   // not advancing
      410 +                if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s + adj))
      411 +                    return phase;
      412 +            }
      413 +            else if (parties != 0)               // wait for onAdvance
      414 +                root.internalAwaitAdvance(phase, null);
      415 +            else {                               // 1st registration of child
      416 +                synchronized (this) {            // register parent first
      417 +                    if (reconcileState() == s) { // recheck under lock
      418 +                        parent.doRegister(1);    // OK if throws IllegalState
      419 +                        for (;;) {               // simpler form of outer loop
      420 +                            s = reconcileState();
      421 +                            phase = (int)(s >>> PHASE_SHIFT);
      422 +                            if (phase < 0 ||
      423 +                                UNSAFE.compareAndSwapLong(this, stateOffset,
      424 +                                                          s, s + adj))
      425 +                                return phase;
      426 +                        }
      427 +                    }
      428 +                }
      429 +            }
      430 +        }
      431 +    }
      432 +
      433 +    /**
      434 +     * Recursively resolves lagged phase propagation from root if necessary.
      435 +     */
      436 +    private long reconcileState() {
      437 +        Phaser par = parent;
      438 +        long s = state;
      439 +        if (par != null) {
      440 +            Phaser rt = root;
      441 +            int phase, rPhase;
      442 +            while ((phase = (int)(s >>> PHASE_SHIFT)) >= 0 &&
      443 +                   (rPhase = (int)(rt.state >>> PHASE_SHIFT)) != phase) {
      444 +                if (par != rt && (int)(par.state >>> PHASE_SHIFT) != rPhase)
      445 +                    par.reconcileState();
      446 +                else if (rPhase < 0 || ((int)s & UNARRIVED_MASK) == 0) {
      447 +                    long u = s & PARTIES_MASK; // reset unarrived to parties
      448 +                    long next = ((((long) rPhase) << PHASE_SHIFT) | u |
      449 +                                 (u >>> PARTIES_SHIFT));
      450 +                    UNSAFE.compareAndSwapLong(this, stateOffset, s, next);
      451 +                }
      452 +                s = state;
      453 +            }
      454 +        }
 365  455          return s;
 366  456      }
 367  457  
 368  458      /**
 369      -     * Creates a new phaser without any initially registered parties,
 370      -     * initial phase number 0, and no parent. Any thread using this
      459 +     * Creates a new phaser with no initially registered parties, no
      460 +     * parent, and initial phase number 0. Any thread using this
 371  461       * phaser will need to first register for it.
 372  462       */
 373  463      public Phaser() {
 374      -        this(null);
      464 +        this(null, 0);
 375  465      }
 376  466  
 377  467      /**
 378      -     * Creates a new phaser with the given numbers of registered
 379      -     * unarrived parties, initial phase number 0, and no parent.
      468 +     * Creates a new phaser with the given number of registered
      469 +     * unarrived parties, no parent, and initial phase number 0.
 380  470       *
 381      -     * @param parties the number of parties required to trip barrier
      471 +     * @param parties the number of parties required to advance to the
      472 +     * next phase
 382  473       * @throws IllegalArgumentException if parties less than zero
 383  474       * or greater than the maximum number of parties supported
 384  475       */
 385  476      public Phaser(int parties) {
 386  477          this(null, parties);
 387  478      }
 388  479  
 389  480      /**
 390      -     * Creates a new phaser with the given parent, without any
 391      -     * initially registered parties. If parent is non-null this phaser
 392      -     * is registered with the parent and its initial phase number is
 393      -     * the same as that of parent phaser.
      481 +     * Equivalent to {@link #Phaser(Phaser, int) Phaser(parent, 0)}.
 394  482       *
 395  483       * @param parent the parent phaser
 396  484       */
 397  485      public Phaser(Phaser parent) {
 398      -        int phase = 0;
 399      -        this.parent = parent;
 400      -        if (parent != null) {
 401      -            this.root = parent.root;
 402      -            phase = parent.register();
 403      -        }
 404      -        else
 405      -            this.root = this;
 406      -        this.state = trippedStateFor(phase, 0);
      486 +        this(parent, 0);
 407  487      }
 408  488  
 409  489      /**
 410      -     * Creates a new phaser with the given parent and numbers of
 411      -     * registered unarrived parties. If parent is non-null, this phaser
 412      -     * is registered with the parent and its initial phase number is
 413      -     * the same as that of parent phaser.
      490 +     * Creates a new phaser with the given parent and number of
      491 +     * registered unarrived parties. Registration and deregistration
      492 +     * of this child phaser with its parent are managed automatically.
      493 +     * If the given parent is non-null, whenever this child phaser has
      494 +     * any registered parties (as established in this constructor,
      495 +     * {@link #register}, or {@link #bulkRegister}), this child phaser
      496 +     * is registered with its parent. Whenever the number of
      497 +     * registered parties becomes zero as the result of an invocation
      498 +     * of {@link #arriveAndDeregister}, this child phaser is
      499 +     * deregistered from its parent.
 414  500       *
 415  501       * @param parent the parent phaser
 416      -     * @param parties the number of parties required to trip barrier
      502 +     * @param parties the number of parties required to advance to the
      503 +     * next phase
 417  504       * @throws IllegalArgumentException if parties less than zero
 418  505       * or greater than the maximum number of parties supported
 419  506       */
 420  507      public Phaser(Phaser parent, int parties) {
 421      -        if (parties < 0 || parties > ushortMask)
      508 +        if (parties >>> PARTIES_SHIFT != 0)
 422  509              throw new IllegalArgumentException("Illegal number of parties");
 423      -        int phase = 0;
      510 +        long s = ((long) parties) | (((long) parties) << PARTIES_SHIFT);
 424  511          this.parent = parent;
 425  512          if (parent != null) {
 426      -            this.root = parent.root;
 427      -            phase = parent.register();
      513 +            Phaser r = parent.root;
      514 +            this.root = r;
      515 +            this.evenQ = r.evenQ;
      516 +            this.oddQ = r.oddQ;
      517 +            if (parties != 0)
      518 +                s |= ((long)(parent.doRegister(1))) << PHASE_SHIFT;
 428  519          }
 429      -        else
      520 +        else {
 430  521              this.root = this;
 431      -        this.state = trippedStateFor(phase, parties);
      522 +            this.evenQ = new AtomicReference<QNode>();
      523 +            this.oddQ = new AtomicReference<QNode>();
      524 +        }
      525 +        this.state = s;
 432  526      }
 433  527  
 434  528      /**
 435      -     * Adds a new unarrived party to this phaser.
      529 +     * Adds a new unarrived party to this phaser.  If an ongoing
      530 +     * invocation of {@link #onAdvance} is in progress, this method
      531 +     * may await its completion before returning.  If this phaser has
      532 +     * a parent, and this phaser previously had no registered parties,
      533 +     * this phaser is also registered with its parent.
 436  534       *
 437  535       * @return the arrival phase number to which this registration applied
 438  536       * @throws IllegalStateException if attempting to register more
 439  537       * than the maximum supported number of parties
 440  538       */
 441  539      public int register() {
 442  540          return doRegister(1);
 443  541      }
 444  542  
 445  543      /**
 446  544       * Adds the given number of new unarrived parties to this phaser.
      545 +     * If an ongoing invocation of {@link #onAdvance} is in progress,
      546 +     * this method may await its completion before returning.  If this
      547 +     * phaser has a parent, and the given number of parities is
      548 +     * greater than zero, and this phaser previously had no registered
      549 +     * parties, this phaser is also registered with its parent.
 447  550       *
 448      -     * @param parties the number of parties required to trip barrier
      551 +     * @param parties the number of additional parties required to
      552 +     * advance to the next phase
 449  553       * @return the arrival phase number to which this registration applied
 450  554       * @throws IllegalStateException if attempting to register more
 451  555       * than the maximum supported number of parties
      556 +     * @throws IllegalArgumentException if {@code parties < 0}
 452  557       */
 453  558      public int bulkRegister(int parties) {
 454  559          if (parties < 0)
 455  560              throw new IllegalArgumentException();
 456  561          if (parties == 0)
 457  562              return getPhase();
 458  563          return doRegister(parties);
 459  564      }
 460  565  
 461  566      /**
 462      -     * Shared code for register, bulkRegister
 463      -     */
 464      -    private int doRegister(int registrations) {
 465      -        int phase;
 466      -        for (;;) {
 467      -            long s = getReconciledState();
 468      -            phase = phaseOf(s);
 469      -            int unarrived = unarrivedOf(s) + registrations;
 470      -            int parties = partiesOf(s) + registrations;
 471      -            if (phase < 0)
 472      -                break;
 473      -            if (parties > ushortMask || unarrived > ushortMask)
 474      -                throw new IllegalStateException(badBounds(parties, unarrived));
 475      -            if (phase == phaseOf(root.state) &&
 476      -                casState(s, stateFor(phase, parties, unarrived)))
 477      -                break;
 478      -        }
 479      -        return phase;
 480      -    }
 481      -
 482      -    /**
 483      -     * Arrives at the barrier, but does not wait for others.  (You can
 484      -     * in turn wait for others via {@link #awaitAdvance}).  It is an
 485      -     * unenforced usage error for an unregistered party to invoke this
 486      -     * method.
      567 +     * Arrives at this phaser, without waiting for others to arrive.
 487  568       *
      569 +     * <p>It is a usage error for an unregistered party to invoke this
      570 +     * method.  However, this error may result in an {@code
      571 +     * IllegalStateException} only upon some subsequent operation on
      572 +     * this phaser, if ever.
      573 +     *
 488  574       * @return the arrival phase number, or a negative value if terminated
 489  575       * @throws IllegalStateException if not terminated and the number
 490  576       * of unarrived parties would become negative
 491  577       */
 492  578      public int arrive() {
 493      -        int phase;
 494      -        for (;;) {
 495      -            long s = state;
 496      -            phase = phaseOf(s);
 497      -            if (phase < 0)
 498      -                break;
 499      -            int parties = partiesOf(s);
 500      -            int unarrived = unarrivedOf(s) - 1;
 501      -            if (unarrived > 0) {        // Not the last arrival
 502      -                if (casState(s, s - 1)) // s-1 adds one arrival
 503      -                    break;
 504      -            }
 505      -            else if (unarrived == 0) {  // the last arrival
 506      -                Phaser par = parent;
 507      -                if (par == null) {      // directly trip
 508      -                    if (casState
 509      -                        (s,
 510      -                         trippedStateFor(onAdvance(phase, parties) ? -1 :
 511      -                                         ((phase + 1) & phaseMask), parties))) {
 512      -                        releaseWaiters(phase);
 513      -                        break;
 514      -                    }
 515      -                }
 516      -                else {                  // cascade to parent
 517      -                    if (casState(s, s - 1)) { // zeroes unarrived
 518      -                        par.arrive();
 519      -                        reconcileState();
 520      -                        break;
 521      -                    }
 522      -                }
 523      -            }
 524      -            else if (phase != phaseOf(root.state)) // or if unreconciled
 525      -                reconcileState();
 526      -            else
 527      -                throw new IllegalStateException(badBounds(parties, unarrived));
 528      -        }
 529      -        return phase;
      579 +        return doArrive(ONE_ARRIVAL);
 530  580      }
 531  581  
 532  582      /**
 533      -     * Arrives at the barrier and deregisters from it without waiting
 534      -     * for others. Deregistration reduces the number of parties
 535      -     * required to trip the barrier in future phases.  If this phaser
      583 +     * Arrives at this phaser and deregisters from it without waiting
      584 +     * for others to arrive. Deregistration reduces the number of
      585 +     * parties required to advance in future phases.  If this phaser
 536  586       * has a parent, and deregistration causes this phaser to have
 537      -     * zero parties, this phaser also arrives at and is deregistered
 538      -     * from its parent.  It is an unenforced usage error for an
 539      -     * unregistered party to invoke this method.
      587 +     * zero parties, this phaser is also deregistered from its parent.
 540  588       *
      589 +     * <p>It is a usage error for an unregistered party to invoke this
      590 +     * method.  However, this error may result in an {@code
      591 +     * IllegalStateException} only upon some subsequent operation on
      592 +     * this phaser, if ever.
      593 +     *
 541  594       * @return the arrival phase number, or a negative value if terminated
 542  595       * @throws IllegalStateException if not terminated and the number
 543  596       * of registered or unarrived parties would become negative
 544  597       */
 545  598      public int arriveAndDeregister() {
 546      -        // similar code to arrive, but too different to merge
 547      -        Phaser par = parent;
 548      -        int phase;
 549      -        for (;;) {
 550      -            long s = state;
 551      -            phase = phaseOf(s);
 552      -            if (phase < 0)
 553      -                break;
 554      -            int parties = partiesOf(s) - 1;
 555      -            int unarrived = unarrivedOf(s) - 1;
 556      -            if (parties >= 0) {
 557      -                if (unarrived > 0 || (unarrived == 0 && par != null)) {
 558      -                    if (casState
 559      -                        (s,
 560      -                         stateFor(phase, parties, unarrived))) {
 561      -                        if (unarrived == 0) {
 562      -                            par.arriveAndDeregister();
 563      -                            reconcileState();
 564      -                        }
 565      -                        break;
 566      -                    }
 567      -                    continue;
 568      -                }
 569      -                if (unarrived == 0) {
 570      -                    if (casState
 571      -                        (s,
 572      -                         trippedStateFor(onAdvance(phase, parties) ? -1 :
 573      -                                         ((phase + 1) & phaseMask), parties))) {
 574      -                        releaseWaiters(phase);
 575      -                        break;
 576      -                    }
 577      -                    continue;
 578      -                }
 579      -                if (par != null && phase != phaseOf(root.state)) {
 580      -                    reconcileState();
 581      -                    continue;
 582      -                }
 583      -            }
 584      -            throw new IllegalStateException(badBounds(parties, unarrived));
 585      -        }
 586      -        return phase;
      599 +        return doArrive(ONE_ARRIVAL|ONE_PARTY);
 587  600      }
 588  601  
 589  602      /**
 590      -     * Arrives at the barrier and awaits others. Equivalent in effect
      603 +     * Arrives at this phaser and awaits others. Equivalent in effect
 591  604       * to {@code awaitAdvance(arrive())}.  If you need to await with
 592  605       * interruption or timeout, you can arrange this with an analogous
 593      -     * construction using one of the other forms of the awaitAdvance
 594      -     * method.  If instead you need to deregister upon arrival use
 595      -     * {@code arriveAndDeregister}. It is an unenforced usage error
 596      -     * for an unregistered party to invoke this method.
      606 +     * construction using one of the other forms of the {@code
      607 +     * awaitAdvance} method.  If instead you need to deregister upon
      608 +     * arrival, use {@code awaitAdvance(arriveAndDeregister())}.
 597  609       *
      610 +     * <p>It is a usage error for an unregistered party to invoke this
      611 +     * method.  However, this error may result in an {@code
      612 +     * IllegalStateException} only upon some subsequent operation on
      613 +     * this phaser, if ever.
      614 +     *
 598  615       * @return the arrival phase number, or a negative number if terminated
 599  616       * @throws IllegalStateException if not terminated and the number
 600  617       * of unarrived parties would become negative
 601  618       */
 602  619      public int arriveAndAwaitAdvance() {
 603      -        return awaitAdvance(arrive());
      620 +        return awaitAdvance(doArrive(ONE_ARRIVAL));
 604  621      }
 605  622  
 606  623      /**
 607      -     * Awaits the phase of the barrier to advance from the given phase
 608      -     * value, returning immediately if the current phase of the
 609      -     * barrier is not equal to the given phase value or this barrier
 610      -     * is terminated.  It is an unenforced usage error for an
 611      -     * unregistered party to invoke this method.
      624 +     * Awaits the phase of this phaser to advance from the given phase
      625 +     * value, returning immediately if the current phase is not equal
      626 +     * to the given phase value or this phaser is terminated.
 612  627       *
 613  628       * @param phase an arrival phase number, or negative value if
 614  629       * terminated; this argument is normally the value returned by a
 615      -     * previous call to {@code arrive} or its variants
      630 +     * previous call to {@code arrive} or {@code arriveAndDeregister}.
 616  631       * @return the next arrival phase number, or a negative value
 617  632       * if terminated or argument is negative
 618  633       */
 619  634      public int awaitAdvance(int phase) {
      635 +        Phaser rt;
      636 +        int p = (int)(state >>> PHASE_SHIFT);
 620  637          if (phase < 0)
 621  638              return phase;
 622      -        long s = getReconciledState();
 623      -        int p = phaseOf(s);
 624      -        if (p != phase)
 625      -            return p;
 626      -        if (unarrivedOf(s) == 0 && parent != null)
 627      -            parent.awaitAdvance(phase);
 628      -        // Fall here even if parent waited, to reconcile and help release
 629      -        return untimedWait(phase);
      639 +        if (p == phase &&
      640 +            (p = (int)((rt = root).state >>> PHASE_SHIFT)) == phase)
      641 +            return rt.internalAwaitAdvance(phase, null);
      642 +        return p;
 630  643      }
 631  644  
 632  645      /**
 633      -     * Awaits the phase of the barrier to advance from the given phase
      646 +     * Awaits the phase of this phaser to advance from the given phase
 634  647       * value, throwing {@code InterruptedException} if interrupted
 635      -     * while waiting, or returning immediately if the current phase of
 636      -     * the barrier is not equal to the given phase value or this
 637      -     * barrier is terminated. It is an unenforced usage error for an
 638      -     * unregistered party to invoke this method.
      648 +     * while waiting, or returning immediately if the current phase is
      649 +     * not equal to the given phase value or this phaser is
      650 +     * terminated.
 639  651       *
 640  652       * @param phase an arrival phase number, or negative value if
 641  653       * terminated; this argument is normally the value returned by a
 642      -     * previous call to {@code arrive} or its variants
      654 +     * previous call to {@code arrive} or {@code arriveAndDeregister}.
 643  655       * @return the next arrival phase number, or a negative value
 644  656       * if terminated or argument is negative
 645  657       * @throws InterruptedException if thread interrupted while waiting
 646  658       */
 647  659      public int awaitAdvanceInterruptibly(int phase)
 648  660          throws InterruptedException {
      661 +        Phaser rt;
      662 +        int p = (int)(state >>> PHASE_SHIFT);
 649  663          if (phase < 0)
 650  664              return phase;
 651      -        long s = getReconciledState();
 652      -        int p = phaseOf(s);
 653      -        if (p != phase)
 654      -            return p;
 655      -        if (unarrivedOf(s) == 0 && parent != null)
 656      -            parent.awaitAdvanceInterruptibly(phase);
 657      -        return interruptibleWait(phase);
      665 +        if (p == phase &&
      666 +            (p = (int)((rt = root).state >>> PHASE_SHIFT)) == phase) {
      667 +            QNode node = new QNode(this, phase, true, false, 0L);
      668 +            p = rt.internalAwaitAdvance(phase, node);
      669 +            if (node.wasInterrupted)
      670 +                throw new InterruptedException();
      671 +        }
      672 +        return p;
 658  673      }
 659  674  
 660  675      /**
 661      -     * Awaits the phase of the barrier to advance from the given phase
      676 +     * Awaits the phase of this phaser to advance from the given phase
 662  677       * value or the given timeout to elapse, throwing {@code
 663  678       * InterruptedException} if interrupted while waiting, or
 664      -     * returning immediately if the current phase of the barrier is
 665      -     * not equal to the given phase value or this barrier is
 666      -     * terminated.  It is an unenforced usage error for an
 667      -     * unregistered party to invoke this method.
      679 +     * returning immediately if the current phase is not equal to the
      680 +     * given phase value or this phaser is terminated.
 668  681       *
 669  682       * @param phase an arrival phase number, or negative value if
 670  683       * terminated; this argument is normally the value returned by a
 671      -     * previous call to {@code arrive} or its variants
      684 +     * previous call to {@code arrive} or {@code arriveAndDeregister}.
 672  685       * @param timeout how long to wait before giving up, in units of
 673  686       *        {@code unit}
 674  687       * @param unit a {@code TimeUnit} determining how to interpret the
 675  688       *        {@code timeout} parameter
 676  689       * @return the next arrival phase number, or a negative value
 677  690       * if terminated or argument is negative
 678  691       * @throws InterruptedException if thread interrupted while waiting
 679  692       * @throws TimeoutException if timed out while waiting
 680  693       */
 681  694      public int awaitAdvanceInterruptibly(int phase,
 682  695                                           long timeout, TimeUnit unit)
 683  696          throws InterruptedException, TimeoutException {
      697 +        long nanos = unit.toNanos(timeout);
      698 +        Phaser rt;
      699 +        int p = (int)(state >>> PHASE_SHIFT);
 684  700          if (phase < 0)
 685  701              return phase;
 686      -        long s = getReconciledState();
 687      -        int p = phaseOf(s);
 688      -        if (p != phase)
 689      -            return p;
 690      -        if (unarrivedOf(s) == 0 && parent != null)
 691      -            parent.awaitAdvanceInterruptibly(phase, timeout, unit);
 692      -        return timedWait(phase, unit.toNanos(timeout));
      702 +        if (p == phase &&
      703 +            (p = (int)((rt = root).state >>> PHASE_SHIFT)) == phase) {
      704 +            QNode node = new QNode(this, phase, true, true, nanos);
      705 +            p = rt.internalAwaitAdvance(phase, node);
      706 +            if (node.wasInterrupted)
      707 +                throw new InterruptedException();
      708 +            else if (p == phase)
      709 +                throw new TimeoutException();
      710 +        }
      711 +        return p;
 693  712      }
 694  713  
 695  714      /**
 696      -     * Forces this barrier to enter termination state. Counts of
 697      -     * arrived and registered parties are unaffected. If this phaser
 698      -     * has a parent, it too is terminated. This method may be useful
 699      -     * for coordinating recovery after one or more tasks encounter
 700      -     * unexpected exceptions.
      715 +     * Forces this phaser to enter termination state.  Counts of
      716 +     * arrived and registered parties are unaffected.  If this phaser
      717 +     * is a member of a tiered set of phasers, then all of the phasers
      718 +     * in the set are terminated.  If this phaser is already
      719 +     * terminated, this method has no effect.  This method may be
      720 +     * useful for coordinating recovery after one or more tasks
      721 +     * encounter unexpected exceptions.
 701  722       */
 702  723      public void forceTermination() {
 703      -        for (;;) {
 704      -            long s = getReconciledState();
 705      -            int phase = phaseOf(s);
 706      -            int parties = partiesOf(s);
 707      -            int unarrived = unarrivedOf(s);
 708      -            if (phase < 0 ||
 709      -                casState(s, stateFor(-1, parties, unarrived))) {
 710      -                releaseWaiters(0);
      724 +        // Only need to change root state
      725 +        final Phaser root = this.root;
      726 +        long s;
      727 +        while ((s = root.state) >= 0) {
      728 +            if (UNSAFE.compareAndSwapLong(root, stateOffset,
      729 +                                          s, s | TERMINATION_BIT)) {
      730 +                releaseWaiters(0); // signal all threads
 711  731                  releaseWaiters(1);
 712      -                if (parent != null)
 713      -                    parent.forceTermination();
 714  732                  return;
 715  733              }
 716  734          }
 717  735      }
 718  736  
 719  737      /**
 720  738       * Returns the current phase number. The maximum phase number is
 721  739       * {@code Integer.MAX_VALUE}, after which it restarts at
 722      -     * zero. Upon termination, the phase number is negative.
      740 +     * zero. Upon termination, the phase number is negative,
      741 +     * in which case the prevailing phase prior to termination
      742 +     * may be obtained via {@code getPhase() + Integer.MIN_VALUE}.
 723  743       *
 724  744       * @return the phase number, or a negative value if terminated
 725  745       */
 726  746      public final int getPhase() {
 727      -        return phaseOf(getReconciledState());
      747 +        return (int)(root.state >>> PHASE_SHIFT);
 728  748      }
 729  749  
 730  750      /**
 731      -     * Returns the number of parties registered at this barrier.
      751 +     * Returns the number of parties registered at this phaser.
 732  752       *
 733  753       * @return the number of parties
 734  754       */
 735  755      public int getRegisteredParties() {
 736  756          return partiesOf(state);
 737  757      }
 738  758  
 739  759      /**
 740  760       * Returns the number of registered parties that have arrived at
 741      -     * the current phase of this barrier.
      761 +     * the current phase of this phaser.
 742  762       *
 743  763       * @return the number of arrived parties
 744  764       */
 745  765      public int getArrivedParties() {
 746      -        return arrivedOf(state);
      766 +        long s = state;
      767 +        int u = unarrivedOf(s); // only reconcile if possibly needed
      768 +        return (u != 0 || parent == null) ?
      769 +            partiesOf(s) - u :
      770 +            arrivedOf(reconcileState());
 747  771      }
 748  772  
 749  773      /**
 750  774       * Returns the number of registered parties that have not yet
 751      -     * arrived at the current phase of this barrier.
      775 +     * arrived at the current phase of this phaser.
 752  776       *
 753  777       * @return the number of unarrived parties
 754  778       */
 755  779      public int getUnarrivedParties() {
 756      -        return unarrivedOf(state);
      780 +        int u = unarrivedOf(state);
      781 +        return (u != 0 || parent == null) ? u : unarrivedOf(reconcileState());
 757  782      }
 758  783  
 759  784      /**
 760  785       * Returns the parent of this phaser, or {@code null} if none.
 761  786       *
 762  787       * @return the parent of this phaser, or {@code null} if none
 763  788       */
 764  789      public Phaser getParent() {
 765  790          return parent;
 766  791      }
↓ open down ↓ 2 lines elided ↑ open up ↑
 769  794       * Returns the root ancestor of this phaser, which is the same as
 770  795       * this phaser if it has no parent.
 771  796       *
 772  797       * @return the root ancestor of this phaser
 773  798       */
 774  799      public Phaser getRoot() {
 775  800          return root;
 776  801      }
 777  802  
 778  803      /**
 779      -     * Returns {@code true} if this barrier has been terminated.
      804 +     * Returns {@code true} if this phaser has been terminated.
 780  805       *
 781      -     * @return {@code true} if this barrier has been terminated
      806 +     * @return {@code true} if this phaser has been terminated
 782  807       */
 783  808      public boolean isTerminated() {
 784      -        return getPhase() < 0;
      809 +        return root.state < 0L;
 785  810      }
 786  811  
 787  812      /**
 788  813       * Overridable method to perform an action upon impending phase
 789  814       * advance, and to control termination. This method is invoked
 790      -     * upon arrival of the party tripping the barrier (when all other
      815 +     * upon arrival of the party advancing this phaser (when all other
 791  816       * waiting parties are dormant).  If this method returns {@code
 792      -     * true}, then, rather than advance the phase number, this barrier
      817 +     * true}, then, rather than advance the phase number, this phaser
 793  818       * will be set to a final termination state, and subsequent calls
 794  819       * to {@link #isTerminated} will return true. Any (unchecked)
 795  820       * Exception or Error thrown by an invocation of this method is
 796      -     * propagated to the party attempting to trip the barrier, in
      821 +     * propagated to the party attempting to advance this phaser, in
 797  822       * which case no advance occurs.
 798  823       *
 799  824       * <p>The arguments to this method provide the state of the phaser
 800      -     * prevailing for the current transition. (When called from within
 801      -     * an implementation of {@code onAdvance} the values returned by
 802      -     * methods such as {@code getPhase} may or may not reliably
 803      -     * indicate the state to which this transition applies.)
      825 +     * prevailing for the current transition.  The effects of invoking
      826 +     * arrival, registration, and waiting methods on this phaser from
      827 +     * within {@code onAdvance} are unspecified and should not be
      828 +     * relied on.
 804  829       *
 805      -     * <p>The default version returns {@code true} when the number of
 806      -     * registered parties is zero. Normally, overrides that arrange
 807      -     * termination for other reasons should also preserve this
 808      -     * property.
      830 +     * <p>If this phaser is a member of a tiered set of phasers, then
      831 +     * {@code onAdvance} is invoked only for its root phaser on each
      832 +     * advance.
 809  833       *
 810      -     * <p>You may override this method to perform an action with side
 811      -     * effects visible to participating tasks, but it is only sensible
 812      -     * to do so in designs where all parties register before any
 813      -     * arrive, and all {@link #awaitAdvance} at each phase.
 814      -     * Otherwise, you cannot ensure lack of interference from other
 815      -     * parties during the invocation of this method. Additionally,
 816      -     * method {@code onAdvance} may be invoked more than once per
 817      -     * transition if registrations are intermixed with arrivals.
      834 +     * <p>To support the most common use cases, the default
      835 +     * implementation of this method returns {@code true} when the
      836 +     * number of registered parties has become zero as the result of a
      837 +     * party invoking {@code arriveAndDeregister}.  You can disable
      838 +     * this behavior, thus enabling continuation upon future
      839 +     * registrations, by overriding this method to always return
      840 +     * {@code false}:
 818  841       *
 819      -     * @param phase the phase number on entering the barrier
      842 +     * <pre> {@code
      843 +     * Phaser phaser = new Phaser() {
      844 +     *   protected boolean onAdvance(int phase, int parties) { return false; }
      845 +     * }}</pre>
      846 +     *
      847 +     * @param phase the current phase number on entry to this method,
      848 +     * before this phaser is advanced
 820  849       * @param registeredParties the current number of registered parties
 821      -     * @return {@code true} if this barrier should terminate
      850 +     * @return {@code true} if this phaser should terminate
 822  851       */
 823  852      protected boolean onAdvance(int phase, int registeredParties) {
 824      -        return registeredParties <= 0;
      853 +        return registeredParties == 0;
 825  854      }
 826  855  
 827  856      /**
 828  857       * Returns a string identifying this phaser, as well as its
 829  858       * state.  The state, in brackets, includes the String {@code
 830  859       * "phase = "} followed by the phase number, {@code "parties = "}
 831  860       * followed by the number of registered parties, and {@code
 832  861       * "arrived = "} followed by the number of arrived parties.
 833  862       *
 834      -     * @return a string identifying this barrier, as well as its state
      863 +     * @return a string identifying this phaser, as well as its state
 835  864       */
 836  865      public String toString() {
 837      -        long s = getReconciledState();
      866 +        return stateToString(reconcileState());
      867 +    }
      868 +
      869 +    /**
      870 +     * Implementation of toString and string-based error messages
      871 +     */
      872 +    private String stateToString(long s) {
 838  873          return super.toString() +
 839  874              "[phase = " + phaseOf(s) +
 840  875              " parties = " + partiesOf(s) +
 841  876              " arrived = " + arrivedOf(s) + "]";
 842  877      }
 843  878  
 844      -    // methods for waiting
      879 +    // Waiting mechanics
 845  880  
 846  881      /**
 847      -     * Wait nodes for Treiber stack representing wait queue
      882 +     * Removes and signals threads from queue for phase.
 848  883       */
 849      -    static final class QNode implements ForkJoinPool.ManagedBlocker {
 850      -        final Phaser phaser;
 851      -        final int phase;
 852      -        final long startTime;
 853      -        final long nanos;
 854      -        final boolean timed;
 855      -        final boolean interruptible;
 856      -        volatile boolean wasInterrupted = false;
 857      -        volatile Thread thread; // nulled to cancel wait
 858      -        QNode next;
 859      -        QNode(Phaser phaser, int phase, boolean interruptible,
 860      -              boolean timed, long startTime, long nanos) {
 861      -            this.phaser = phaser;
 862      -            this.phase = phase;
 863      -            this.timed = timed;
 864      -            this.interruptible = interruptible;
 865      -            this.startTime = startTime;
 866      -            this.nanos = nanos;
 867      -            thread = Thread.currentThread();
 868      -        }
 869      -        public boolean isReleasable() {
 870      -            return (thread == null ||
 871      -                    phaser.getPhase() != phase ||
 872      -                    (interruptible && wasInterrupted) ||
 873      -                    (timed && (nanos - (System.nanoTime() - startTime)) <= 0));
 874      -        }
 875      -        public boolean block() {
 876      -            if (Thread.interrupted()) {
 877      -                wasInterrupted = true;
 878      -                if (interruptible)
 879      -                    return true;
 880      -            }
 881      -            if (!timed)
 882      -                LockSupport.park(this);
 883      -            else {
 884      -                long waitTime = nanos - (System.nanoTime() - startTime);
 885      -                if (waitTime <= 0)
 886      -                    return true;
 887      -                LockSupport.parkNanos(this, waitTime);
 888      -            }
 889      -            return isReleasable();
 890      -        }
 891      -        void signal() {
 892      -            Thread t = thread;
 893      -            if (t != null) {
 894      -                thread = null;
      884 +    private void releaseWaiters(int phase) {
      885 +        QNode q;   // first element of queue
      886 +        int p;     // its phase
      887 +        Thread t;  // its thread
      888 +        AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
      889 +        while ((q = head.get()) != null &&
      890 +               ((p = q.phase) == phase ||
      891 +                (int)(root.state >>> PHASE_SHIFT) != p)) {
      892 +            if (head.compareAndSet(q, q.next) &&
      893 +                (t = q.thread) != null) {
      894 +                q.thread = null;
 895  895                  LockSupport.unpark(t);
 896  896              }
 897  897          }
 898      -        boolean doWait() {
 899      -            if (thread != null) {
 900      -                try {
 901      -                    ForkJoinPool.managedBlock(this);
 902      -                } catch (InterruptedException ie) {
 903      -                }
 904      -            }
 905      -            return wasInterrupted;
 906      -        }
 907      -
 908  898      }
 909  899  
 910      -    /**
 911      -     * Removes and signals waiting threads from wait queue.
 912      -     */
 913      -    private void releaseWaiters(int phase) {
 914      -        AtomicReference<QNode> head = queueFor(phase);
 915      -        QNode q;
 916      -        while ((q = head.get()) != null) {
 917      -            if (head.compareAndSet(q, q.next))
 918      -                q.signal();
 919      -        }
 920      -    }
      900 +    /** The number of CPUs, for spin control */
      901 +    private static final int NCPU = Runtime.getRuntime().availableProcessors();
 921  902  
 922  903      /**
 923      -     * Tries to enqueue given node in the appropriate wait queue.
 924      -     *
 925      -     * @return true if successful
      904 +     * The number of times to spin before blocking while waiting for
      905 +     * advance, per arrival while waiting. On multiprocessors, fully
      906 +     * blocking and waking up a large number of threads all at once is
      907 +     * usually a very slow process, so we use rechargeable spins to
      908 +     * avoid it when threads regularly arrive: When a thread in
      909 +     * internalAwaitAdvance notices another arrival before blocking,
      910 +     * and there appear to be enough CPUs available, it spins
      911 +     * SPINS_PER_ARRIVAL more times before blocking. The value trades
      912 +     * off good-citizenship vs big unnecessary slowdowns.
 926  913       */
 927      -    private boolean tryEnqueue(QNode node) {
 928      -        AtomicReference<QNode> head = queueFor(node.phase);
 929      -        return head.compareAndSet(node.next = head.get(), node);
 930      -    }
      914 +    static final int SPINS_PER_ARRIVAL = (NCPU < 2) ? 1 : 1 << 8;
 931  915  
 932  916      /**
 933      -     * Enqueues node and waits unless aborted or signalled.
      917 +     * Possibly blocks and waits for phase to advance unless aborted.
      918 +     * Call only from root node.
 934  919       *
      920 +     * @param phase current phase
      921 +     * @param node if non-null, the wait node to track interrupt and timeout;
      922 +     * if null, denotes noninterruptible wait
 935  923       * @return current phase
 936  924       */
 937      -    private int untimedWait(int phase) {
 938      -        QNode node = null;
 939      -        boolean queued = false;
 940      -        boolean interrupted = false;
      925 +    private int internalAwaitAdvance(int phase, QNode node) {
      926 +        releaseWaiters(phase-1);          // ensure old queue clean
      927 +        boolean queued = false;           // true when node is enqueued
      928 +        int lastUnarrived = 0;            // to increase spins upon change
      929 +        int spins = SPINS_PER_ARRIVAL;
      930 +        long s;
 941  931          int p;
 942      -        while ((p = getPhase()) == phase) {
 943      -            if (Thread.interrupted())
 944      -                interrupted = true;
 945      -            else if (node == null)
 946      -                node = new QNode(this, phase, false, false, 0, 0);
 947      -            else if (!queued)
 948      -                queued = tryEnqueue(node);
 949      -            else
 950      -                interrupted = node.doWait();
      932 +        while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {
      933 +            if (node == null) {           // spinning in noninterruptible mode
      934 +                int unarrived = (int)s & UNARRIVED_MASK;
      935 +                if (unarrived != lastUnarrived &&
      936 +                    (lastUnarrived = unarrived) < NCPU)
      937 +                    spins += SPINS_PER_ARRIVAL;
      938 +                boolean interrupted = Thread.interrupted();
      939 +                if (interrupted || --spins < 0) { // need node to record intr
      940 +                    node = new QNode(this, phase, false, false, 0L);
      941 +                    node.wasInterrupted = interrupted;
      942 +                }
      943 +            }
      944 +            else if (node.isReleasable()) // done or aborted
      945 +                break;
      946 +            else if (!queued) {           // push onto queue
      947 +                AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
      948 +                QNode q = node.next = head.get();
      949 +                if ((q == null || q.phase == phase) &&
      950 +                    (int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq
      951 +                    queued = head.compareAndSet(q, node);
      952 +            }
      953 +            else {
      954 +                try {
      955 +                    ForkJoinPool.managedBlock(node);
      956 +                } catch (InterruptedException ie) {
      957 +                    node.wasInterrupted = true;
      958 +                }
      959 +            }
 951  960          }
 952      -        if (node != null)
 953      -            node.thread = null;
      961 +
      962 +        if (node != null) {
      963 +            if (node.thread != null)
      964 +                node.thread = null;       // avoid need for unpark()
      965 +            if (node.wasInterrupted && !node.interruptible)
      966 +                Thread.currentThread().interrupt();
      967 +            if ((p = (int)(state >>> PHASE_SHIFT)) == phase)
      968 +                return p;                 // recheck abort
      969 +        }
 954  970          releaseWaiters(phase);
 955      -        if (interrupted)
 956      -            Thread.currentThread().interrupt();
 957  971          return p;
 958  972      }
 959  973  
 960  974      /**
 961      -     * Interruptible version
 962      -     * @return current phase
      975 +     * Wait nodes for Treiber stack representing wait queue
 963  976       */
 964      -    private int interruptibleWait(int phase) throws InterruptedException {
 965      -        QNode node = null;
 966      -        boolean queued = false;
 967      -        boolean interrupted = false;
 968      -        int p;
 969      -        while ((p = getPhase()) == phase && !interrupted) {
 970      -            if (Thread.interrupted())
 971      -                interrupted = true;
 972      -            else if (node == null)
 973      -                node = new QNode(this, phase, true, false, 0, 0);
 974      -            else if (!queued)
 975      -                queued = tryEnqueue(node);
 976      -            else
 977      -                interrupted = node.doWait();
      977 +    static final class QNode implements ForkJoinPool.ManagedBlocker {
      978 +        final Phaser phaser;
      979 +        final int phase;
      980 +        final boolean interruptible;
      981 +        final boolean timed;
      982 +        boolean wasInterrupted;
      983 +        long nanos;
      984 +        long lastTime;
      985 +        volatile Thread thread; // nulled to cancel wait
      986 +        QNode next;
      987 +
      988 +        QNode(Phaser phaser, int phase, boolean interruptible,
      989 +              boolean timed, long nanos) {
      990 +            this.phaser = phaser;
      991 +            this.phase = phase;
      992 +            this.interruptible = interruptible;
      993 +            this.nanos = nanos;
      994 +            this.timed = timed;
      995 +            this.lastTime = timed ? System.nanoTime() : 0L;
      996 +            thread = Thread.currentThread();
 978  997          }
 979      -        if (node != null)
 980      -            node.thread = null;
 981      -        if (p != phase || (p = getPhase()) != phase)
 982      -            releaseWaiters(phase);
 983      -        if (interrupted)
 984      -            throw new InterruptedException();
 985      -        return p;
 986      -    }
 987  998  
 988      -    /**
 989      -     * Timeout version.
 990      -     * @return current phase
 991      -     */
 992      -    private int timedWait(int phase, long nanos)
 993      -        throws InterruptedException, TimeoutException {
 994      -        long startTime = System.nanoTime();
 995      -        QNode node = null;
 996      -        boolean queued = false;
 997      -        boolean interrupted = false;
 998      -        int p;
 999      -        while ((p = getPhase()) == phase && !interrupted) {
      999 +        public boolean isReleasable() {
     1000 +            if (thread == null)
     1001 +                return true;
     1002 +            if (phaser.getPhase() != phase) {
     1003 +                thread = null;
     1004 +                return true;
     1005 +            }
1000 1006              if (Thread.interrupted())
1001      -                interrupted = true;
1002      -            else if (nanos - (System.nanoTime() - startTime) <= 0)
1003      -                break;
1004      -            else if (node == null)
1005      -                node = new QNode(this, phase, true, true, startTime, nanos);
1006      -            else if (!queued)
1007      -                queued = tryEnqueue(node);
1008      -            else
1009      -                interrupted = node.doWait();
     1007 +                wasInterrupted = true;
     1008 +            if (wasInterrupted && interruptible) {
     1009 +                thread = null;
     1010 +                return true;
     1011 +            }
     1012 +            if (timed) {
     1013 +                if (nanos > 0L) {
     1014 +                    long now = System.nanoTime();
     1015 +                    nanos -= now - lastTime;
     1016 +                    lastTime = now;
     1017 +                }
     1018 +                if (nanos <= 0L) {
     1019 +                    thread = null;
     1020 +                    return true;
     1021 +                }
     1022 +            }
     1023 +            return false;
1010 1024          }
1011      -        if (node != null)
1012      -            node.thread = null;
1013      -        if (p != phase || (p = getPhase()) != phase)
1014      -            releaseWaiters(phase);
1015      -        if (interrupted)
1016      -            throw new InterruptedException();
1017      -        if (p == phase)
1018      -            throw new TimeoutException();
1019      -        return p;
     1025 +
     1026 +        public boolean block() {
     1027 +            if (isReleasable())
     1028 +                return true;
     1029 +            else if (!timed)
     1030 +                LockSupport.park(this);
     1031 +            else if (nanos > 0)
     1032 +                LockSupport.parkNanos(this, nanos);
     1033 +            return isReleasable();
     1034 +        }
1020 1035      }
1021 1036  
1022 1037      // Unsafe mechanics
1023 1038  
1024 1039      private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
1025 1040      private static final long stateOffset =
1026 1041          objectFieldOffset("state", Phaser.class);
1027 1042  
1028      -    private final boolean casState(long cmp, long val) {
1029      -        return UNSAFE.compareAndSwapLong(this, stateOffset, cmp, val);
1030      -    }
1031      -
1032 1043      private static long objectFieldOffset(String field, Class<?> klazz) {
1033 1044          try {
1034 1045              return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
1035 1046          } catch (NoSuchFieldException e) {
1036 1047              // Convert Exception to corresponding Error
1037 1048              NoSuchFieldError error = new NoSuchFieldError(field);
1038 1049              error.initCause(e);
1039 1050              throw error;
1040 1051          }
1041 1052      }
1042 1053  }
    
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX