1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/licenses/publicdomain 34 */ 35 36 package java.util.concurrent; 37 38 import java.util.concurrent.TimeUnit; 39 import java.util.concurrent.TimeoutException; 40 import java.util.concurrent.atomic.AtomicReference; 41 import java.util.concurrent.locks.LockSupport; 42 43 /** 44 * A reusable synchronization barrier, similar in functionality to 45 * {@link java.util.concurrent.CyclicBarrier CyclicBarrier} and 46 * {@link java.util.concurrent.CountDownLatch CountDownLatch} 47 * but supporting more flexible usage. 48 * 49 * <p> <b>Registration.</b> Unlike the case for other barriers, the 50 * number of parties <em>registered</em> to synchronize on a phaser 51 * may vary over time. Tasks may be registered at any time (using 52 * methods {@link #register}, {@link #bulkRegister}, or forms of 53 * constructors establishing initial numbers of parties), and 54 * optionally deregistered upon any arrival (using {@link 55 * #arriveAndDeregister}). As is the case with most basic 56 * synchronization constructs, registration and deregistration affect 57 * only internal counts; they do not establish any further internal 58 * bookkeeping, so tasks cannot query whether they are registered. 59 * (However, you can introduce such bookkeeping by subclassing this 60 * class.) 61 * 62 * <p> <b>Synchronization.</b> Like a {@code CyclicBarrier}, a {@code 63 * Phaser} may be repeatedly awaited. Method {@link 64 * #arriveAndAwaitAdvance} has effect analogous to {@link 65 * java.util.concurrent.CyclicBarrier#await CyclicBarrier.await}. Each 66 * generation of a phaser has an associated phase number. The phase 67 * number starts at zero, and advances when all parties arrive at the 68 * phaser, wrapping around to zero after reaching {@code 69 * Integer.MAX_VALUE}. The use of phase numbers enables independent 70 * control of actions upon arrival at a phaser and upon awaiting 71 * others, via two kinds of methods that may be invoked by any 72 * registered party: 73 * 74 * <ul> 75 * 76 * <li> <b>Arrival.</b> Methods {@link #arrive} and 77 * {@link #arriveAndDeregister} record arrival. These methods 78 * do not block, but return an associated <em>arrival phase 79 * number</em>; that is, the phase number of the phaser to which 80 * the arrival applied. When the final party for a given phase 81 * arrives, an optional action is performed and the phase 82 * advances. These actions are performed by the party 83 * triggering a phase advance, and are arranged by overriding 84 * method {@link #onAdvance(int, int)}, which also controls 85 * termination. Overriding this method is similar to, but more 86 * flexible than, providing a barrier action to a {@code 87 * CyclicBarrier}. 88 * 89 * <li> <b>Waiting.</b> Method {@link #awaitAdvance} requires an 90 * argument indicating an arrival phase number, and returns when 91 * the phaser advances to (or is already at) a different phase. 92 * Unlike similar constructions using {@code CyclicBarrier}, 93 * method {@code awaitAdvance} continues to wait even if the 94 * waiting thread is interrupted. Interruptible and timeout 95 * versions are also available, but exceptions encountered while 96 * tasks wait interruptibly or with timeout do not change the 97 * state of the phaser. If necessary, you can perform any 98 * associated recovery within handlers of those exceptions, 99 * often after invoking {@code forceTermination}. Phasers may 100 * also be used by tasks executing in a {@link ForkJoinPool}, 101 * which will ensure sufficient parallelism to execute tasks 102 * when others are blocked waiting for a phase to advance. 103 * 104 * </ul> 105 * 106 * <p> <b>Termination.</b> A phaser may enter a <em>termination</em> 107 * state in which all synchronization methods immediately return 108 * without updating phaser state or waiting for advance, and 109 * indicating (via a negative phase value) that execution is complete. 110 * Termination is triggered when an invocation of {@code onAdvance} 111 * returns {@code true}. The default implementation returns {@code 112 * true} if a deregistration has caused the number of registered 113 * parties to become zero. As illustrated below, when phasers control 114 * actions with a fixed number of iterations, it is often convenient 115 * to override this method to cause termination when the current phase 116 * number reaches a threshold. Method {@link #forceTermination} is 117 * also available to abruptly release waiting threads and allow them 118 * to terminate. 119 * 120 * <p> <b>Tiering.</b> Phasers may be <em>tiered</em> (i.e., 121 * constructed in tree structures) to reduce contention. Phasers with 122 * large numbers of parties that would otherwise experience heavy 123 * synchronization contention costs may instead be set up so that 124 * groups of sub-phasers share a common parent. This may greatly 125 * increase throughput even though it incurs greater per-operation 126 * overhead. 127 * 128 * <p><b>Monitoring.</b> While synchronization methods may be invoked 129 * only by registered parties, the current state of a phaser may be 130 * monitored by any caller. At any given moment there are {@link 131 * #getRegisteredParties} parties in total, of which {@link 132 * #getArrivedParties} have arrived at the current phase ({@link 133 * #getPhase}). When the remaining ({@link #getUnarrivedParties}) 134 * parties arrive, the phase advances. The values returned by these 135 * methods may reflect transient states and so are not in general 136 * useful for synchronization control. Method {@link #toString} 137 * returns snapshots of these state queries in a form convenient for 138 * informal monitoring. 139 * 140 * <p><b>Sample usages:</b> 141 * 142 * <p>A {@code Phaser} may be used instead of a {@code CountDownLatch} 143 * to control a one-shot action serving a variable number of parties. 144 * The typical idiom is for the method setting this up to first 145 * register, then start the actions, then deregister, as in: 146 * 147 * <pre> {@code 148 * void runTasks(List<Runnable> tasks) { 149 * final Phaser phaser = new Phaser(1); // "1" to register self 150 * // create and start threads 151 * for (Runnable task : tasks) { 152 * phaser.register(); 153 * new Thread() { 154 * public void run() { 155 * phaser.arriveAndAwaitAdvance(); // await all creation 156 * task.run(); 157 * } 158 * }.start(); 159 * } 160 * 161 * // allow threads to start and deregister self 162 * phaser.arriveAndDeregister(); 163 * }}</pre> 164 * 165 * <p>One way to cause a set of threads to repeatedly perform actions 166 * for a given number of iterations is to override {@code onAdvance}: 167 * 168 * <pre> {@code 169 * void startTasks(List<Runnable> tasks, final int iterations) { 170 * final Phaser phaser = new Phaser() { 171 * protected boolean onAdvance(int phase, int registeredParties) { 172 * return phase >= iterations || registeredParties == 0; 173 * } 174 * }; 175 * phaser.register(); 176 * for (final Runnable task : tasks) { 177 * phaser.register(); 178 * new Thread() { 179 * public void run() { 180 * do { 181 * task.run(); 182 * phaser.arriveAndAwaitAdvance(); 183 * } while (!phaser.isTerminated()); 184 * } 185 * }.start(); 186 * } 187 * phaser.arriveAndDeregister(); // deregister self, don't wait 188 * }}</pre> 189 * 190 * If the main task must later await termination, it 191 * may re-register and then execute a similar loop: 192 * <pre> {@code 193 * // ... 194 * phaser.register(); 195 * while (!phaser.isTerminated()) 196 * phaser.arriveAndAwaitAdvance();}</pre> 197 * 198 * <p>Related constructions may be used to await particular phase numbers 199 * in contexts where you are sure that the phase will never wrap around 200 * {@code Integer.MAX_VALUE}. For example: 201 * 202 * <pre> {@code 203 * void awaitPhase(Phaser phaser, int phase) { 204 * int p = phaser.register(); // assumes caller not already registered 205 * while (p < phase) { 206 * if (phaser.isTerminated()) 207 * // ... deal with unexpected termination 208 * else 209 * p = phaser.arriveAndAwaitAdvance(); 210 * } 211 * phaser.arriveAndDeregister(); 212 * }}</pre> 213 * 214 * 215 * <p>To create a set of tasks using a tree of phasers, 216 * you could use code of the following form, assuming a 217 * Task class with a constructor accepting a {@code Phaser} that 218 * it registers with upon construction: 219 * 220 * <pre> {@code 221 * void build(Task[] actions, int lo, int hi, Phaser ph) { 222 * if (hi - lo > TASKS_PER_PHASER) { 223 * for (int i = lo; i < hi; i += TASKS_PER_PHASER) { 224 * int j = Math.min(i + TASKS_PER_PHASER, hi); 225 * build(actions, i, j, new Phaser(ph)); 226 * } 227 * } else { 228 * for (int i = lo; i < hi; ++i) 229 * actions[i] = new Task(ph); 230 * // assumes new Task(ph) performs ph.register() 231 * } 232 * } 233 * // .. initially called, for n tasks via 234 * build(new Task[n], 0, n, new Phaser());}</pre> 235 * 236 * The best value of {@code TASKS_PER_PHASER} depends mainly on 237 * expected synchronization rates. A value as low as four may 238 * be appropriate for extremely small per-phase task bodies (thus 239 * high rates), or up to hundreds for extremely large ones. 240 * 241 * <p><b>Implementation notes</b>: This implementation restricts the 242 * maximum number of parties to 65535. Attempts to register additional 243 * parties result in {@code IllegalStateException}. However, you can and 244 * should create tiered phasers to accommodate arbitrarily large sets 245 * of participants. 246 * 247 * @since 1.7 248 * @author Doug Lea 249 */ 250 public class Phaser { 251 /* 252 * This class implements an extension of X10 "clocks". Thanks to 253 * Vijay Saraswat for the idea, and to Vivek Sarkar for 254 * enhancements to extend functionality. 255 */ 256 257 /** 258 * Primary state representation, holding four fields: 259 * 260 * * unarrived -- the number of parties yet to hit barrier (bits 0-15) 261 * * parties -- the number of parties to wait (bits 16-31) 262 * * phase -- the generation of the barrier (bits 32-62) 263 * * terminated -- set if barrier is terminated (bit 63 / sign) 264 * 265 * However, to efficiently maintain atomicity, these values are 266 * packed into a single (atomic) long. Termination uses the sign 267 * bit of 32 bit representation of phase, so phase is set to -1 on 268 * termination. Good performance relies on keeping state decoding 269 * and encoding simple, and keeping race windows short. 270 */ 271 private volatile long state; 272 273 private static final int MAX_PARTIES = 0xffff; 274 private static final int MAX_PHASE = 0x7fffffff; 275 private static final int PARTIES_SHIFT = 16; 276 private static final int PHASE_SHIFT = 32; 277 private static final int UNARRIVED_MASK = 0xffff; // to mask ints 278 private static final long PARTIES_MASK = 0xffff0000L; // to mask longs 279 private static final long ONE_ARRIVAL = 1L; 280 private static final long ONE_PARTY = 1L << PARTIES_SHIFT; 281 private static final long TERMINATION_BIT = 1L << 63; 282 283 // The following unpacking methods are usually manually inlined 284 285 private static int unarrivedOf(long s) { 286 return (int)s & UNARRIVED_MASK; 287 } 288 289 private static int partiesOf(long s) { 290 return (int)s >>> PARTIES_SHIFT; 291 } 292 293 private static int phaseOf(long s) { 294 return (int) (s >>> PHASE_SHIFT); 295 } 296 297 private static int arrivedOf(long s) { 298 return partiesOf(s) - unarrivedOf(s); 299 } 300 301 /** 302 * The parent of this phaser, or null if none 303 */ 304 private final Phaser parent; 305 306 /** 307 * The root of phaser tree. Equals this if not in a tree. Used to 308 * support faster state push-down. 309 */ 310 private final Phaser root; 311 312 /** 313 * Heads of Treiber stacks for waiting threads. To eliminate 314 * contention when releasing some threads while adding others, we 315 * use two of them, alternating across even and odd phases. 316 * Subphasers share queues with root to speed up releases. 317 */ 318 private final AtomicReference<QNode> evenQ; 319 private final AtomicReference<QNode> oddQ; 320 321 private AtomicReference<QNode> queueFor(int phase) { 322 return ((phase & 1) == 0) ? evenQ : oddQ; 323 } 324 325 /** 326 * Returns message string for bounds exceptions on arrival. 327 */ 328 private String badArrive(long s) { 329 return "Attempted arrival of unregistered party for " + 330 stateToString(s); 331 } 332 333 /** 334 * Returns message string for bounds exceptions on registration. 335 */ 336 private String badRegister(long s) { 337 return "Attempt to register more than " + 338 MAX_PARTIES + " parties for " + stateToString(s); 339 } 340 341 /** 342 * Main implementation for methods arrive and arriveAndDeregister. 343 * Manually tuned to speed up and minimize race windows for the 344 * common case of just decrementing unarrived field. 345 * 346 * @param adj - adjustment to apply to state -- either 347 * ONE_ARRIVAL (for arrive) or 348 * ONE_ARRIVAL|ONE_PARTY (for arriveAndDeregister) 349 */ 350 private int doArrive(long adj) { 351 for (;;) { 352 long s = state; 353 int unarrived = (int)s & UNARRIVED_MASK; 354 int phase = (int)(s >>> PHASE_SHIFT); 355 if (phase < 0) 356 return phase; 357 else if (unarrived == 0) { 358 if (reconcileState() == s) // recheck 359 throw new IllegalStateException(badArrive(s)); 360 } 361 else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adj)) { 362 if (unarrived == 1) { 363 long p = s & PARTIES_MASK; // unshifted parties field 364 long lu = p >>> PARTIES_SHIFT; 365 int u = (int)lu; 366 int nextPhase = (phase + 1) & MAX_PHASE; 367 long next = ((long)nextPhase << PHASE_SHIFT) | p | lu; 368 final Phaser parent = this.parent; 369 if (parent == null) { 370 if (onAdvance(phase, u)) 371 next |= TERMINATION_BIT; 372 UNSAFE.compareAndSwapLong(this, stateOffset, s, next); 373 releaseWaiters(phase); 374 } 375 else { 376 parent.doArrive((u == 0) ? 377 ONE_ARRIVAL|ONE_PARTY : ONE_ARRIVAL); 378 if ((int)(parent.state >>> PHASE_SHIFT) != nextPhase) 379 reconcileState(); 380 else if (state == s) 381 UNSAFE.compareAndSwapLong(this, stateOffset, s, 382 next); 383 } 384 } 385 return phase; 386 } 387 } 388 } 389 390 /** 391 * Implementation of register, bulkRegister 392 * 393 * @param registrations number to add to both parties and 394 * unarrived fields. Must be greater than zero. 395 */ 396 private int doRegister(int registrations) { 397 // adjustment to state 398 long adj = ((long)registrations << PARTIES_SHIFT) | registrations; 399 final Phaser parent = this.parent; 400 for (;;) { 401 long s = (parent == null) ? state : reconcileState(); 402 int parties = (int)s >>> PARTIES_SHIFT; 403 int phase = (int)(s >>> PHASE_SHIFT); 404 if (phase < 0) 405 return phase; 406 else if (registrations > MAX_PARTIES - parties) 407 throw new IllegalStateException(badRegister(s)); 408 else if ((parties == 0 && parent == null) || // first reg of root 409 ((int)s & UNARRIVED_MASK) != 0) { // not advancing 410 if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s + adj)) 411 return phase; 412 } 413 else if (parties != 0) // wait for onAdvance 414 root.internalAwaitAdvance(phase, null); 415 else { // 1st registration of child 416 synchronized (this) { // register parent first 417 if (reconcileState() == s) { // recheck under lock 418 parent.doRegister(1); // OK if throws IllegalState 419 for (;;) { // simpler form of outer loop 420 s = reconcileState(); 421 phase = (int)(s >>> PHASE_SHIFT); 422 if (phase < 0 || 423 UNSAFE.compareAndSwapLong(this, stateOffset, 424 s, s + adj)) 425 return phase; 426 } 427 } 428 } 429 } 430 } 431 } 432 433 /** 434 * Recursively resolves lagged phase propagation from root if necessary. 435 */ 436 private long reconcileState() { 437 Phaser par = parent; 438 long s = state; 439 if (par != null) { 440 Phaser rt = root; 441 int phase, rPhase; 442 while ((phase = (int)(s >>> PHASE_SHIFT)) >= 0 && 443 (rPhase = (int)(rt.state >>> PHASE_SHIFT)) != phase) { 444 if (par != rt && (int)(par.state >>> PHASE_SHIFT) != rPhase) 445 par.reconcileState(); 446 else if (rPhase < 0 || ((int)s & UNARRIVED_MASK) == 0) { 447 long u = s & PARTIES_MASK; // reset unarrived to parties 448 long next = ((((long) rPhase) << PHASE_SHIFT) | u | 449 (u >>> PARTIES_SHIFT)); 450 UNSAFE.compareAndSwapLong(this, stateOffset, s, next); 451 } 452 s = state; 453 } 454 } 455 return s; 456 } 457 458 /** 459 * Creates a new phaser with no initially registered parties, no 460 * parent, and initial phase number 0. Any thread using this 461 * phaser will need to first register for it. 462 */ 463 public Phaser() { 464 this(null, 0); 465 } 466 467 /** 468 * Creates a new phaser with the given number of registered 469 * unarrived parties, no parent, and initial phase number 0. 470 * 471 * @param parties the number of parties required to advance to the 472 * next phase 473 * @throws IllegalArgumentException if parties less than zero 474 * or greater than the maximum number of parties supported 475 */ 476 public Phaser(int parties) { 477 this(null, parties); 478 } 479 480 /** 481 * Equivalent to {@link #Phaser(Phaser, int) Phaser(parent, 0)}. 482 * 483 * @param parent the parent phaser 484 */ 485 public Phaser(Phaser parent) { 486 this(parent, 0); 487 } 488 489 /** 490 * Creates a new phaser with the given parent and number of 491 * registered unarrived parties. Registration and deregistration 492 * of this child phaser with its parent are managed automatically. 493 * If the given parent is non-null, whenever this child phaser has 494 * any registered parties (as established in this constructor, 495 * {@link #register}, or {@link #bulkRegister}), this child phaser 496 * is registered with its parent. Whenever the number of 497 * registered parties becomes zero as the result of an invocation 498 * of {@link #arriveAndDeregister}, this child phaser is 499 * deregistered from its parent. 500 * 501 * @param parent the parent phaser 502 * @param parties the number of parties required to advance to the 503 * next phase 504 * @throws IllegalArgumentException if parties less than zero 505 * or greater than the maximum number of parties supported 506 */ 507 public Phaser(Phaser parent, int parties) { 508 if (parties >>> PARTIES_SHIFT != 0) 509 throw new IllegalArgumentException("Illegal number of parties"); 510 long s = ((long) parties) | (((long) parties) << PARTIES_SHIFT); 511 this.parent = parent; 512 if (parent != null) { 513 Phaser r = parent.root; 514 this.root = r; 515 this.evenQ = r.evenQ; 516 this.oddQ = r.oddQ; 517 if (parties != 0) 518 s |= ((long)(parent.doRegister(1))) << PHASE_SHIFT; 519 } 520 else { 521 this.root = this; 522 this.evenQ = new AtomicReference<QNode>(); 523 this.oddQ = new AtomicReference<QNode>(); 524 } 525 this.state = s; 526 } 527 528 /** 529 * Adds a new unarrived party to this phaser. If an ongoing 530 * invocation of {@link #onAdvance} is in progress, this method 531 * may await its completion before returning. If this phaser has 532 * a parent, and this phaser previously had no registered parties, 533 * this phaser is also registered with its parent. 534 * 535 * @return the arrival phase number to which this registration applied 536 * @throws IllegalStateException if attempting to register more 537 * than the maximum supported number of parties 538 */ 539 public int register() { 540 return doRegister(1); 541 } 542 543 /** 544 * Adds the given number of new unarrived parties to this phaser. 545 * If an ongoing invocation of {@link #onAdvance} is in progress, 546 * this method may await its completion before returning. If this 547 * phaser has a parent, and the given number of parities is 548 * greater than zero, and this phaser previously had no registered 549 * parties, this phaser is also registered with its parent. 550 * 551 * @param parties the number of additional parties required to 552 * advance to the next phase 553 * @return the arrival phase number to which this registration applied 554 * @throws IllegalStateException if attempting to register more 555 * than the maximum supported number of parties 556 * @throws IllegalArgumentException if {@code parties < 0} 557 */ 558 public int bulkRegister(int parties) { 559 if (parties < 0) 560 throw new IllegalArgumentException(); 561 if (parties == 0) 562 return getPhase(); 563 return doRegister(parties); 564 } 565 566 /** 567 * Arrives at this phaser, without waiting for others to arrive. 568 * 569 * <p>It is a usage error for an unregistered party to invoke this 570 * method. However, this error may result in an {@code 571 * IllegalStateException} only upon some subsequent operation on 572 * this phaser, if ever. 573 * 574 * @return the arrival phase number, or a negative value if terminated 575 * @throws IllegalStateException if not terminated and the number 576 * of unarrived parties would become negative 577 */ 578 public int arrive() { 579 return doArrive(ONE_ARRIVAL); 580 } 581 582 /** 583 * Arrives at this phaser and deregisters from it without waiting 584 * for others to arrive. Deregistration reduces the number of 585 * parties required to advance in future phases. If this phaser 586 * has a parent, and deregistration causes this phaser to have 587 * zero parties, this phaser is also deregistered from its parent. 588 * 589 * <p>It is a usage error for an unregistered party to invoke this 590 * method. However, this error may result in an {@code 591 * IllegalStateException} only upon some subsequent operation on 592 * this phaser, if ever. 593 * 594 * @return the arrival phase number, or a negative value if terminated 595 * @throws IllegalStateException if not terminated and the number 596 * of registered or unarrived parties would become negative 597 */ 598 public int arriveAndDeregister() { 599 return doArrive(ONE_ARRIVAL|ONE_PARTY); 600 } 601 602 /** 603 * Arrives at this phaser and awaits others. Equivalent in effect 604 * to {@code awaitAdvance(arrive())}. If you need to await with 605 * interruption or timeout, you can arrange this with an analogous 606 * construction using one of the other forms of the {@code 607 * awaitAdvance} method. If instead you need to deregister upon 608 * arrival, use {@code awaitAdvance(arriveAndDeregister())}. 609 * 610 * <p>It is a usage error for an unregistered party to invoke this 611 * method. However, this error may result in an {@code 612 * IllegalStateException} only upon some subsequent operation on 613 * this phaser, if ever. 614 * 615 * @return the arrival phase number, or a negative number if terminated 616 * @throws IllegalStateException if not terminated and the number 617 * of unarrived parties would become negative 618 */ 619 public int arriveAndAwaitAdvance() { 620 return awaitAdvance(doArrive(ONE_ARRIVAL)); 621 } 622 623 /** 624 * Awaits the phase of this phaser to advance from the given phase 625 * value, returning immediately if the current phase is not equal 626 * to the given phase value or this phaser is terminated. 627 * 628 * @param phase an arrival phase number, or negative value if 629 * terminated; this argument is normally the value returned by a 630 * previous call to {@code arrive} or {@code arriveAndDeregister}. 631 * @return the next arrival phase number, or a negative value 632 * if terminated or argument is negative 633 */ 634 public int awaitAdvance(int phase) { 635 Phaser rt; 636 int p = (int)(state >>> PHASE_SHIFT); 637 if (phase < 0) 638 return phase; 639 if (p == phase && 640 (p = (int)((rt = root).state >>> PHASE_SHIFT)) == phase) 641 return rt.internalAwaitAdvance(phase, null); 642 return p; 643 } 644 645 /** 646 * Awaits the phase of this phaser to advance from the given phase 647 * value, throwing {@code InterruptedException} if interrupted 648 * while waiting, or returning immediately if the current phase is 649 * not equal to the given phase value or this phaser is 650 * terminated. 651 * 652 * @param phase an arrival phase number, or negative value if 653 * terminated; this argument is normally the value returned by a 654 * previous call to {@code arrive} or {@code arriveAndDeregister}. 655 * @return the next arrival phase number, or a negative value 656 * if terminated or argument is negative 657 * @throws InterruptedException if thread interrupted while waiting 658 */ 659 public int awaitAdvanceInterruptibly(int phase) 660 throws InterruptedException { 661 Phaser rt; 662 int p = (int)(state >>> PHASE_SHIFT); 663 if (phase < 0) 664 return phase; 665 if (p == phase && 666 (p = (int)((rt = root).state >>> PHASE_SHIFT)) == phase) { 667 QNode node = new QNode(this, phase, true, false, 0L); 668 p = rt.internalAwaitAdvance(phase, node); 669 if (node.wasInterrupted) 670 throw new InterruptedException(); 671 } 672 return p; 673 } 674 675 /** 676 * Awaits the phase of this phaser to advance from the given phase 677 * value or the given timeout to elapse, throwing {@code 678 * InterruptedException} if interrupted while waiting, or 679 * returning immediately if the current phase is not equal to the 680 * given phase value or this phaser is terminated. 681 * 682 * @param phase an arrival phase number, or negative value if 683 * terminated; this argument is normally the value returned by a 684 * previous call to {@code arrive} or {@code arriveAndDeregister}. 685 * @param timeout how long to wait before giving up, in units of 686 * {@code unit} 687 * @param unit a {@code TimeUnit} determining how to interpret the 688 * {@code timeout} parameter 689 * @return the next arrival phase number, or a negative value 690 * if terminated or argument is negative 691 * @throws InterruptedException if thread interrupted while waiting 692 * @throws TimeoutException if timed out while waiting 693 */ 694 public int awaitAdvanceInterruptibly(int phase, 695 long timeout, TimeUnit unit) 696 throws InterruptedException, TimeoutException { 697 long nanos = unit.toNanos(timeout); 698 Phaser rt; 699 int p = (int)(state >>> PHASE_SHIFT); 700 if (phase < 0) 701 return phase; 702 if (p == phase && 703 (p = (int)((rt = root).state >>> PHASE_SHIFT)) == phase) { 704 QNode node = new QNode(this, phase, true, true, nanos); 705 p = rt.internalAwaitAdvance(phase, node); 706 if (node.wasInterrupted) 707 throw new InterruptedException(); 708 else if (p == phase) 709 throw new TimeoutException(); 710 } 711 return p; 712 } 713 714 /** 715 * Forces this phaser to enter termination state. Counts of 716 * arrived and registered parties are unaffected. If this phaser 717 * is a member of a tiered set of phasers, then all of the phasers 718 * in the set are terminated. If this phaser is already 719 * terminated, this method has no effect. This method may be 720 * useful for coordinating recovery after one or more tasks 721 * encounter unexpected exceptions. 722 */ 723 public void forceTermination() { 724 // Only need to change root state 725 final Phaser root = this.root; 726 long s; 727 while ((s = root.state) >= 0) { 728 if (UNSAFE.compareAndSwapLong(root, stateOffset, 729 s, s | TERMINATION_BIT)) { 730 releaseWaiters(0); // signal all threads 731 releaseWaiters(1); 732 return; 733 } 734 } 735 } 736 737 /** 738 * Returns the current phase number. The maximum phase number is 739 * {@code Integer.MAX_VALUE}, after which it restarts at 740 * zero. Upon termination, the phase number is negative, 741 * in which case the prevailing phase prior to termination 742 * may be obtained via {@code getPhase() + Integer.MIN_VALUE}. 743 * 744 * @return the phase number, or a negative value if terminated 745 */ 746 public final int getPhase() { 747 return (int)(root.state >>> PHASE_SHIFT); 748 } 749 750 /** 751 * Returns the number of parties registered at this phaser. 752 * 753 * @return the number of parties 754 */ 755 public int getRegisteredParties() { 756 return partiesOf(state); 757 } 758 759 /** 760 * Returns the number of registered parties that have arrived at 761 * the current phase of this phaser. 762 * 763 * @return the number of arrived parties 764 */ 765 public int getArrivedParties() { 766 long s = state; 767 int u = unarrivedOf(s); // only reconcile if possibly needed 768 return (u != 0 || parent == null) ? 769 partiesOf(s) - u : 770 arrivedOf(reconcileState()); 771 } 772 773 /** 774 * Returns the number of registered parties that have not yet 775 * arrived at the current phase of this phaser. 776 * 777 * @return the number of unarrived parties 778 */ 779 public int getUnarrivedParties() { 780 int u = unarrivedOf(state); 781 return (u != 0 || parent == null) ? u : unarrivedOf(reconcileState()); 782 } 783 784 /** 785 * Returns the parent of this phaser, or {@code null} if none. 786 * 787 * @return the parent of this phaser, or {@code null} if none 788 */ 789 public Phaser getParent() { 790 return parent; 791 } 792 793 /** 794 * Returns the root ancestor of this phaser, which is the same as 795 * this phaser if it has no parent. 796 * 797 * @return the root ancestor of this phaser 798 */ 799 public Phaser getRoot() { 800 return root; 801 } 802 803 /** 804 * Returns {@code true} if this phaser has been terminated. 805 * 806 * @return {@code true} if this phaser has been terminated 807 */ 808 public boolean isTerminated() { 809 return root.state < 0L; 810 } 811 812 /** 813 * Overridable method to perform an action upon impending phase 814 * advance, and to control termination. This method is invoked 815 * upon arrival of the party advancing this phaser (when all other 816 * waiting parties are dormant). If this method returns {@code 817 * true}, then, rather than advance the phase number, this phaser 818 * will be set to a final termination state, and subsequent calls 819 * to {@link #isTerminated} will return true. Any (unchecked) 820 * Exception or Error thrown by an invocation of this method is 821 * propagated to the party attempting to advance this phaser, in 822 * which case no advance occurs. 823 * 824 * <p>The arguments to this method provide the state of the phaser 825 * prevailing for the current transition. The effects of invoking 826 * arrival, registration, and waiting methods on this phaser from 827 * within {@code onAdvance} are unspecified and should not be 828 * relied on. 829 * 830 * <p>If this phaser is a member of a tiered set of phasers, then 831 * {@code onAdvance} is invoked only for its root phaser on each 832 * advance. 833 * 834 * <p>To support the most common use cases, the default 835 * implementation of this method returns {@code true} when the 836 * number of registered parties has become zero as the result of a 837 * party invoking {@code arriveAndDeregister}. You can disable 838 * this behavior, thus enabling continuation upon future 839 * registrations, by overriding this method to always return 840 * {@code false}: 841 * 842 * <pre> {@code 843 * Phaser phaser = new Phaser() { 844 * protected boolean onAdvance(int phase, int parties) { return false; } 845 * }}</pre> 846 * 847 * @param phase the current phase number on entry to this method, 848 * before this phaser is advanced 849 * @param registeredParties the current number of registered parties 850 * @return {@code true} if this phaser should terminate 851 */ 852 protected boolean onAdvance(int phase, int registeredParties) { 853 return registeredParties == 0; 854 } 855 856 /** 857 * Returns a string identifying this phaser, as well as its 858 * state. The state, in brackets, includes the String {@code 859 * "phase = "} followed by the phase number, {@code "parties = "} 860 * followed by the number of registered parties, and {@code 861 * "arrived = "} followed by the number of arrived parties. 862 * 863 * @return a string identifying this phaser, as well as its state 864 */ 865 public String toString() { 866 return stateToString(reconcileState()); 867 } 868 869 /** 870 * Implementation of toString and string-based error messages 871 */ 872 private String stateToString(long s) { 873 return super.toString() + 874 "[phase = " + phaseOf(s) + 875 " parties = " + partiesOf(s) + 876 " arrived = " + arrivedOf(s) + "]"; 877 } 878 879 // Waiting mechanics 880 881 /** 882 * Removes and signals threads from queue for phase. 883 */ 884 private void releaseWaiters(int phase) { 885 QNode q; // first element of queue 886 int p; // its phase 887 Thread t; // its thread 888 AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ; 889 while ((q = head.get()) != null && 890 ((p = q.phase) == phase || 891 (int)(root.state >>> PHASE_SHIFT) != p)) { 892 if (head.compareAndSet(q, q.next) && 893 (t = q.thread) != null) { 894 q.thread = null; 895 LockSupport.unpark(t); 896 } 897 } 898 } 899 900 /** The number of CPUs, for spin control */ 901 private static final int NCPU = Runtime.getRuntime().availableProcessors(); 902 903 /** 904 * The number of times to spin before blocking while waiting for 905 * advance, per arrival while waiting. On multiprocessors, fully 906 * blocking and waking up a large number of threads all at once is 907 * usually a very slow process, so we use rechargeable spins to 908 * avoid it when threads regularly arrive: When a thread in 909 * internalAwaitAdvance notices another arrival before blocking, 910 * and there appear to be enough CPUs available, it spins 911 * SPINS_PER_ARRIVAL more times before blocking. The value trades 912 * off good-citizenship vs big unnecessary slowdowns. 913 */ 914 static final int SPINS_PER_ARRIVAL = (NCPU < 2) ? 1 : 1 << 8; 915 916 /** 917 * Possibly blocks and waits for phase to advance unless aborted. 918 * Call only from root node. 919 * 920 * @param phase current phase 921 * @param node if non-null, the wait node to track interrupt and timeout; 922 * if null, denotes noninterruptible wait 923 * @return current phase 924 */ 925 private int internalAwaitAdvance(int phase, QNode node) { 926 releaseWaiters(phase-1); // ensure old queue clean 927 boolean queued = false; // true when node is enqueued 928 int lastUnarrived = 0; // to increase spins upon change 929 int spins = SPINS_PER_ARRIVAL; 930 long s; 931 int p; 932 while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) { 933 if (node == null) { // spinning in noninterruptible mode 934 int unarrived = (int)s & UNARRIVED_MASK; 935 if (unarrived != lastUnarrived && 936 (lastUnarrived = unarrived) < NCPU) 937 spins += SPINS_PER_ARRIVAL; 938 boolean interrupted = Thread.interrupted(); 939 if (interrupted || --spins < 0) { // need node to record intr 940 node = new QNode(this, phase, false, false, 0L); 941 node.wasInterrupted = interrupted; 942 } 943 } 944 else if (node.isReleasable()) // done or aborted 945 break; 946 else if (!queued) { // push onto queue 947 AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ; 948 QNode q = node.next = head.get(); 949 if ((q == null || q.phase == phase) && 950 (int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq 951 queued = head.compareAndSet(q, node); 952 } 953 else { 954 try { 955 ForkJoinPool.managedBlock(node); 956 } catch (InterruptedException ie) { 957 node.wasInterrupted = true; 958 } 959 } 960 } 961 962 if (node != null) { 963 if (node.thread != null) 964 node.thread = null; // avoid need for unpark() 965 if (node.wasInterrupted && !node.interruptible) 966 Thread.currentThread().interrupt(); 967 if ((p = (int)(state >>> PHASE_SHIFT)) == phase) 968 return p; // recheck abort 969 } 970 releaseWaiters(phase); 971 return p; 972 } 973 974 /** 975 * Wait nodes for Treiber stack representing wait queue 976 */ 977 static final class QNode implements ForkJoinPool.ManagedBlocker { 978 final Phaser phaser; 979 final int phase; 980 final boolean interruptible; 981 final boolean timed; 982 boolean wasInterrupted; 983 long nanos; 984 long lastTime; 985 volatile Thread thread; // nulled to cancel wait 986 QNode next; 987 988 QNode(Phaser phaser, int phase, boolean interruptible, 989 boolean timed, long nanos) { 990 this.phaser = phaser; 991 this.phase = phase; 992 this.interruptible = interruptible; 993 this.nanos = nanos; 994 this.timed = timed; 995 this.lastTime = timed ? System.nanoTime() : 0L; 996 thread = Thread.currentThread(); 997 } 998 999 public boolean isReleasable() { 1000 if (thread == null) 1001 return true; 1002 if (phaser.getPhase() != phase) { 1003 thread = null; 1004 return true; 1005 } 1006 if (Thread.interrupted()) 1007 wasInterrupted = true; 1008 if (wasInterrupted && interruptible) { 1009 thread = null; 1010 return true; 1011 } 1012 if (timed) { 1013 if (nanos > 0L) { 1014 long now = System.nanoTime(); 1015 nanos -= now - lastTime; 1016 lastTime = now; 1017 } 1018 if (nanos <= 0L) { 1019 thread = null; 1020 return true; 1021 } 1022 } 1023 return false; 1024 } 1025 1026 public boolean block() { 1027 if (isReleasable()) 1028 return true; 1029 else if (!timed) 1030 LockSupport.park(this); 1031 else if (nanos > 0) 1032 LockSupport.parkNanos(this, nanos); 1033 return isReleasable(); 1034 } 1035 } 1036 1037 // Unsafe mechanics 1038 1039 private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe(); 1040 private static final long stateOffset = 1041 objectFieldOffset("state", Phaser.class); 1042 1043 private static long objectFieldOffset(String field, Class<?> klazz) { 1044 try { 1045 return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field)); 1046 } catch (NoSuchFieldException e) { 1047 // Convert Exception to corresponding Error 1048 NoSuchFieldError error = new NoSuchFieldError(field); 1049 error.initCause(e); 1050 throw error; 1051 } 1052 } 1053 }