18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/licenses/publicdomain 34 */ 35 36 package java.util.concurrent; 37 38 import java.util.concurrent.atomic.AtomicReference; 39 import java.util.concurrent.locks.LockSupport; 40 41 /** 42 * A reusable synchronization barrier, similar in functionality to 43 * {@link java.util.concurrent.CyclicBarrier CyclicBarrier} and 44 * {@link java.util.concurrent.CountDownLatch CountDownLatch} 45 * but supporting more flexible usage. 46 * 47 * <p> <b>Registration.</b> Unlike the case for other barriers, the 48 * number of parties <em>registered</em> to synchronize on a phaser 49 * may vary over time. Tasks may be registered at any time (using 50 * methods {@link #register}, {@link #bulkRegister}, or forms of 51 * constructors establishing initial numbers of parties), and 52 * optionally deregistered upon any arrival (using {@link 53 * #arriveAndDeregister}). As is the case with most basic 54 * synchronization constructs, registration and deregistration affect 55 * only internal counts; they do not establish any further internal 56 * bookkeeping, so tasks cannot query whether they are registered. 57 * (However, you can introduce such bookkeeping by subclassing this 58 * class.) 59 * 60 * <p> <b>Synchronization.</b> Like a {@code CyclicBarrier}, a {@code 61 * Phaser} may be repeatedly awaited. Method {@link 62 * #arriveAndAwaitAdvance} has effect analogous to {@link 63 * java.util.concurrent.CyclicBarrier#await CyclicBarrier.await}. Each 64 * generation of a {@code Phaser} has an associated phase number. The 65 * phase number starts at zero, and advances when all parties arrive 66 * at the barrier, wrapping around to zero after reaching {@code 67 * Integer.MAX_VALUE}. The use of phase numbers enables independent 68 * control of actions upon arrival at a barrier and upon awaiting 69 * others, via two kinds of methods that may be invoked by any 70 * registered party: 71 * 72 * <ul> 73 * 74 * <li> <b>Arrival.</b> Methods {@link #arrive} and 75 * {@link #arriveAndDeregister} record arrival at a 76 * barrier. These methods do not block, but return an associated 77 * <em>arrival phase number</em>; that is, the phase number of 78 * the barrier to which the arrival applied. When the final 79 * party for a given phase arrives, an optional barrier action 80 * is performed and the phase advances. Barrier actions, 81 * performed by the party triggering a phase advance, are 82 * arranged by overriding method {@link #onAdvance(int, int)}, 83 * which also controls termination. Overriding this method is 84 * similar to, but more flexible than, providing a barrier 85 * action to a {@code CyclicBarrier}. 86 * 87 * <li> <b>Waiting.</b> Method {@link #awaitAdvance} requires an 88 * argument indicating an arrival phase number, and returns when 89 * the barrier advances to (or is already at) a different phase. 90 * Unlike similar constructions using {@code CyclicBarrier}, 91 * method {@code awaitAdvance} continues to wait even if the 92 * waiting thread is interrupted. Interruptible and timeout 93 * versions are also available, but exceptions encountered while 94 * tasks wait interruptibly or with timeout do not change the 95 * state of the barrier. If necessary, you can perform any 96 * associated recovery within handlers of those exceptions, 97 * often after invoking {@code forceTermination}. Phasers may 98 * also be used by tasks executing in a {@link ForkJoinPool}, 99 * which will ensure sufficient parallelism to execute tasks 100 * when others are blocked waiting for a phase to advance. 101 * 102 * </ul> 103 * 104 * <p> <b>Termination.</b> A {@code Phaser} may enter a 105 * <em>termination</em> state in which all synchronization methods 106 * immediately return without updating phaser state or waiting for 107 * advance, and indicating (via a negative phase value) that execution 108 * is complete. Termination is triggered when an invocation of {@code 109 * onAdvance} returns {@code true}. As illustrated below, when 110 * phasers control actions with a fixed number of iterations, it is 111 * often convenient to override this method to cause termination when 112 * the current phase number reaches a threshold. Method {@link 113 * #forceTermination} is also available to abruptly release waiting 114 * threads and allow them to terminate. 115 * 116 * <p> <b>Tiering.</b> Phasers may be <em>tiered</em> (i.e., arranged 117 * in tree structures) to reduce contention. Phasers with large 118 * numbers of parties that would otherwise experience heavy 119 * synchronization contention costs may instead be set up so that 120 * groups of sub-phasers share a common parent. This may greatly 121 * increase throughput even though it incurs greater per-operation 122 * overhead. 123 * 124 * <p><b>Monitoring.</b> While synchronization methods may be invoked 125 * only by registered parties, the current state of a phaser may be 126 * monitored by any caller. At any given moment there are {@link 127 * #getRegisteredParties} parties in total, of which {@link 128 * #getArrivedParties} have arrived at the current phase ({@link 129 * #getPhase}). When the remaining ({@link #getUnarrivedParties}) 130 * parties arrive, the phase advances. The values returned by these 131 * methods may reflect transient states and so are not in general 132 * useful for synchronization control. Method {@link #toString} 133 * returns snapshots of these state queries in a form convenient for 134 * informal monitoring. 135 * 136 * <p><b>Sample usages:</b> 137 * 138 * <p>A {@code Phaser} may be used instead of a {@code CountDownLatch} 139 * to control a one-shot action serving a variable number of 140 * parties. The typical idiom is for the method setting this up to 141 * first register, then start the actions, then deregister, as in: 142 * 143 * <pre> {@code 144 * void runTasks(List<Runnable> tasks) { 145 * final Phaser phaser = new Phaser(1); // "1" to register self 146 * // create and start threads 147 * for (Runnable task : tasks) { 148 * phaser.register(); 149 * new Thread() { 150 * public void run() { 151 * phaser.arriveAndAwaitAdvance(); // await all creation 152 * task.run(); 153 * } 154 * }.start(); 155 * } 156 * 157 * // allow threads to start and deregister self 158 * phaser.arriveAndDeregister(); 159 * }}</pre> 160 * 161 * <p>One way to cause a set of threads to repeatedly perform actions 193 * 194 * <p>Related constructions may be used to await particular phase numbers 195 * in contexts where you are sure that the phase will never wrap around 196 * {@code Integer.MAX_VALUE}. For example: 197 * 198 * <pre> {@code 199 * void awaitPhase(Phaser phaser, int phase) { 200 * int p = phaser.register(); // assumes caller not already registered 201 * while (p < phase) { 202 * if (phaser.isTerminated()) 203 * // ... deal with unexpected termination 204 * else 205 * p = phaser.arriveAndAwaitAdvance(); 206 * } 207 * phaser.arriveAndDeregister(); 208 * }}</pre> 209 * 210 * 211 * <p>To create a set of tasks using a tree of phasers, 212 * you could use code of the following form, assuming a 213 * Task class with a constructor accepting a phaser that 214 * it registers for upon construction: 215 * 216 * <pre> {@code 217 * void build(Task[] actions, int lo, int hi, Phaser ph) { 218 * if (hi - lo > TASKS_PER_PHASER) { 219 * for (int i = lo; i < hi; i += TASKS_PER_PHASER) { 220 * int j = Math.min(i + TASKS_PER_PHASER, hi); 221 * build(actions, i, j, new Phaser(ph)); 222 * } 223 * } else { 224 * for (int i = lo; i < hi; ++i) 225 * actions[i] = new Task(ph); 226 * // assumes new Task(ph) performs ph.register() 227 * } 228 * } 229 * // .. initially called, for n tasks via 230 * build(new Task[n], 0, n, new Phaser());}</pre> 231 * 232 * The best value of {@code TASKS_PER_PHASER} depends mainly on 233 * expected barrier synchronization rates. A value as low as four may 234 * be appropriate for extremely small per-barrier task bodies (thus 235 * high rates), or up to hundreds for extremely large ones. 236 * 237 * </pre> 238 * 239 * <p><b>Implementation notes</b>: This implementation restricts the 240 * maximum number of parties to 65535. Attempts to register additional 241 * parties result in {@code IllegalStateException}. However, you can and 242 * should create tiered phasers to accommodate arbitrarily large sets 243 * of participants. 244 * 245 * @since 1.7 246 * @author Doug Lea 247 */ 248 public class Phaser { 249 /* 250 * This class implements an extension of X10 "clocks". Thanks to 251 * Vijay Saraswat for the idea, and to Vivek Sarkar for 252 * enhancements to extend functionality. 253 */ 254 255 /** 256 * Barrier state representation. Conceptually, a barrier contains 257 * four values: 258 * 259 * * parties -- the number of parties to wait (16 bits) 260 * * unarrived -- the number of parties yet to hit barrier (16 bits) 261 * * phase -- the generation of the barrier (31 bits) 262 * * terminated -- set if barrier is terminated (1 bit) 263 * 264 * However, to efficiently maintain atomicity, these values are 265 * packed into a single (atomic) long. Termination uses the sign 266 * bit of 32 bit representation of phase, so phase is set to -1 on 267 * termination. Good performance relies on keeping state decoding 268 * and encoding simple, and keeping race windows short. 269 * 270 * Note: there are some cheats in arrive() that rely on unarrived 271 * count being lowest 16 bits. 272 */ 273 private volatile long state; 274 275 private static final int ushortMask = 0xffff; 276 private static final int phaseMask = 0x7fffffff; 277 278 private static int unarrivedOf(long s) { 279 return (int) (s & ushortMask); 280 } 281 282 private static int partiesOf(long s) { 283 return ((int) s) >>> 16; 284 } 285 286 private static int phaseOf(long s) { 287 return (int) (s >>> 32); 288 } 289 290 private static int arrivedOf(long s) { 291 return partiesOf(s) - unarrivedOf(s); 292 } 293 294 private static long stateFor(int phase, int parties, int unarrived) { 295 return ((((long) phase) << 32) | (((long) parties) << 16) | 296 (long) unarrived); 297 } 298 299 private static long trippedStateFor(int phase, int parties) { 300 long lp = (long) parties; 301 return (((long) phase) << 32) | (lp << 16) | lp; 302 } 303 304 /** 305 * Returns message string for bad bounds exceptions. 306 */ 307 private static String badBounds(int parties, int unarrived) { 308 return ("Attempt to set " + unarrived + 309 " unarrived of " + parties + " parties"); 310 } 311 312 /** 313 * The parent of this phaser, or null if none 314 */ 315 private final Phaser parent; 316 317 /** 318 * The root of phaser tree. Equals this if not in a tree. Used to 319 * support faster state push-down. 320 */ 321 private final Phaser root; 322 323 // Wait queues 324 325 /** 326 * Heads of Treiber stacks for waiting threads. To eliminate 327 * contention while releasing some threads while adding others, we 328 * use two of them, alternating across even and odd phases. 329 */ 330 private final AtomicReference<QNode> evenQ = new AtomicReference<QNode>(); 331 private final AtomicReference<QNode> oddQ = new AtomicReference<QNode>(); 332 333 private AtomicReference<QNode> queueFor(int phase) { 334 return ((phase & 1) == 0) ? evenQ : oddQ; 335 } 336 337 /** 338 * Returns current state, first resolving lagged propagation from 339 * root if necessary. 340 */ 341 private long getReconciledState() { 342 return (parent == null) ? state : reconcileState(); 343 } 344 345 /** 346 * Recursively resolves state. 347 */ 348 private long reconcileState() { 349 Phaser p = parent; 350 long s = state; 351 if (p != null) { 352 while (unarrivedOf(s) == 0 && phaseOf(s) != phaseOf(root.state)) { 353 long parentState = p.getReconciledState(); 354 int parentPhase = phaseOf(parentState); 355 int phase = phaseOf(s = state); 356 if (phase != parentPhase) { 357 long next = trippedStateFor(parentPhase, partiesOf(s)); 358 if (casState(s, next)) { 359 releaseWaiters(phase); 360 s = next; 361 } 362 } 363 } 364 } 365 return s; 366 } 367 368 /** 369 * Creates a new phaser without any initially registered parties, 370 * initial phase number 0, and no parent. Any thread using this 371 * phaser will need to first register for it. 372 */ 373 public Phaser() { 374 this(null); 375 } 376 377 /** 378 * Creates a new phaser with the given numbers of registered 379 * unarrived parties, initial phase number 0, and no parent. 380 * 381 * @param parties the number of parties required to trip barrier 382 * @throws IllegalArgumentException if parties less than zero 383 * or greater than the maximum number of parties supported 384 */ 385 public Phaser(int parties) { 386 this(null, parties); 387 } 388 389 /** 390 * Creates a new phaser with the given parent, without any 391 * initially registered parties. If parent is non-null this phaser 392 * is registered with the parent and its initial phase number is 393 * the same as that of parent phaser. 394 * 395 * @param parent the parent phaser 396 */ 397 public Phaser(Phaser parent) { 398 int phase = 0; 399 this.parent = parent; 400 if (parent != null) { 401 this.root = parent.root; 402 phase = parent.register(); 403 } 404 else 405 this.root = this; 406 this.state = trippedStateFor(phase, 0); 407 } 408 409 /** 410 * Creates a new phaser with the given parent and numbers of 411 * registered unarrived parties. If parent is non-null, this phaser 412 * is registered with the parent and its initial phase number is 413 * the same as that of parent phaser. 414 * 415 * @param parent the parent phaser 416 * @param parties the number of parties required to trip barrier 417 * @throws IllegalArgumentException if parties less than zero 418 * or greater than the maximum number of parties supported 419 */ 420 public Phaser(Phaser parent, int parties) { 421 if (parties < 0 || parties > ushortMask) 422 throw new IllegalArgumentException("Illegal number of parties"); 423 int phase = 0; 424 this.parent = parent; 425 if (parent != null) { 426 this.root = parent.root; 427 phase = parent.register(); 428 } 429 else 430 this.root = this; 431 this.state = trippedStateFor(phase, parties); 432 } 433 434 /** 435 * Adds a new unarrived party to this phaser. 436 * 437 * @return the arrival phase number to which this registration applied 438 * @throws IllegalStateException if attempting to register more 439 * than the maximum supported number of parties 440 */ 441 public int register() { 442 return doRegister(1); 443 } 444 445 /** 446 * Adds the given number of new unarrived parties to this phaser. 447 * 448 * @param parties the number of parties required to trip barrier 449 * @return the arrival phase number to which this registration applied 450 * @throws IllegalStateException if attempting to register more 451 * than the maximum supported number of parties 452 */ 453 public int bulkRegister(int parties) { 454 if (parties < 0) 455 throw new IllegalArgumentException(); 456 if (parties == 0) 457 return getPhase(); 458 return doRegister(parties); 459 } 460 461 /** 462 * Shared code for register, bulkRegister 463 */ 464 private int doRegister(int registrations) { 465 int phase; 466 for (;;) { 467 long s = getReconciledState(); 468 phase = phaseOf(s); 469 int unarrived = unarrivedOf(s) + registrations; 470 int parties = partiesOf(s) + registrations; 471 if (phase < 0) 472 break; 473 if (parties > ushortMask || unarrived > ushortMask) 474 throw new IllegalStateException(badBounds(parties, unarrived)); 475 if (phase == phaseOf(root.state) && 476 casState(s, stateFor(phase, parties, unarrived))) 477 break; 478 } 479 return phase; 480 } 481 482 /** 483 * Arrives at the barrier, but does not wait for others. (You can 484 * in turn wait for others via {@link #awaitAdvance}). It is an 485 * unenforced usage error for an unregistered party to invoke this 486 * method. 487 * 488 * @return the arrival phase number, or a negative value if terminated 489 * @throws IllegalStateException if not terminated and the number 490 * of unarrived parties would become negative 491 */ 492 public int arrive() { 493 int phase; 494 for (;;) { 495 long s = state; 496 phase = phaseOf(s); 497 if (phase < 0) 498 break; 499 int parties = partiesOf(s); 500 int unarrived = unarrivedOf(s) - 1; 501 if (unarrived > 0) { // Not the last arrival 502 if (casState(s, s - 1)) // s-1 adds one arrival 503 break; 504 } 505 else if (unarrived == 0) { // the last arrival 506 Phaser par = parent; 507 if (par == null) { // directly trip 508 if (casState 509 (s, 510 trippedStateFor(onAdvance(phase, parties) ? -1 : 511 ((phase + 1) & phaseMask), parties))) { 512 releaseWaiters(phase); 513 break; 514 } 515 } 516 else { // cascade to parent 517 if (casState(s, s - 1)) { // zeroes unarrived 518 par.arrive(); 519 reconcileState(); 520 break; 521 } 522 } 523 } 524 else if (phase != phaseOf(root.state)) // or if unreconciled 525 reconcileState(); 526 else 527 throw new IllegalStateException(badBounds(parties, unarrived)); 528 } 529 return phase; 530 } 531 532 /** 533 * Arrives at the barrier and deregisters from it without waiting 534 * for others. Deregistration reduces the number of parties 535 * required to trip the barrier in future phases. If this phaser 536 * has a parent, and deregistration causes this phaser to have 537 * zero parties, this phaser also arrives at and is deregistered 538 * from its parent. It is an unenforced usage error for an 539 * unregistered party to invoke this method. 540 * 541 * @return the arrival phase number, or a negative value if terminated 542 * @throws IllegalStateException if not terminated and the number 543 * of registered or unarrived parties would become negative 544 */ 545 public int arriveAndDeregister() { 546 // similar code to arrive, but too different to merge 547 Phaser par = parent; 548 int phase; 549 for (;;) { 550 long s = state; 551 phase = phaseOf(s); 552 if (phase < 0) 553 break; 554 int parties = partiesOf(s) - 1; 555 int unarrived = unarrivedOf(s) - 1; 556 if (parties >= 0) { 557 if (unarrived > 0 || (unarrived == 0 && par != null)) { 558 if (casState 559 (s, 560 stateFor(phase, parties, unarrived))) { 561 if (unarrived == 0) { 562 par.arriveAndDeregister(); 563 reconcileState(); 564 } 565 break; 566 } 567 continue; 568 } 569 if (unarrived == 0) { 570 if (casState 571 (s, 572 trippedStateFor(onAdvance(phase, parties) ? -1 : 573 ((phase + 1) & phaseMask), parties))) { 574 releaseWaiters(phase); 575 break; 576 } 577 continue; 578 } 579 if (par != null && phase != phaseOf(root.state)) { 580 reconcileState(); 581 continue; 582 } 583 } 584 throw new IllegalStateException(badBounds(parties, unarrived)); 585 } 586 return phase; 587 } 588 589 /** 590 * Arrives at the barrier and awaits others. Equivalent in effect 591 * to {@code awaitAdvance(arrive())}. If you need to await with 592 * interruption or timeout, you can arrange this with an analogous 593 * construction using one of the other forms of the awaitAdvance 594 * method. If instead you need to deregister upon arrival use 595 * {@code arriveAndDeregister}. It is an unenforced usage error 596 * for an unregistered party to invoke this method. 597 * 598 * @return the arrival phase number, or a negative number if terminated 599 * @throws IllegalStateException if not terminated and the number 600 * of unarrived parties would become negative 601 */ 602 public int arriveAndAwaitAdvance() { 603 return awaitAdvance(arrive()); 604 } 605 606 /** 607 * Awaits the phase of the barrier to advance from the given phase 608 * value, returning immediately if the current phase of the 609 * barrier is not equal to the given phase value or this barrier 610 * is terminated. It is an unenforced usage error for an 611 * unregistered party to invoke this method. 612 * 613 * @param phase an arrival phase number, or negative value if 614 * terminated; this argument is normally the value returned by a 615 * previous call to {@code arrive} or its variants 616 * @return the next arrival phase number, or a negative value 617 * if terminated or argument is negative 618 */ 619 public int awaitAdvance(int phase) { 620 if (phase < 0) 621 return phase; 622 long s = getReconciledState(); 623 int p = phaseOf(s); 624 if (p != phase) 625 return p; 626 if (unarrivedOf(s) == 0 && parent != null) 627 parent.awaitAdvance(phase); 628 // Fall here even if parent waited, to reconcile and help release 629 return untimedWait(phase); 630 } 631 632 /** 633 * Awaits the phase of the barrier to advance from the given phase 634 * value, throwing {@code InterruptedException} if interrupted 635 * while waiting, or returning immediately if the current phase of 636 * the barrier is not equal to the given phase value or this 637 * barrier is terminated. It is an unenforced usage error for an 638 * unregistered party to invoke this method. 639 * 640 * @param phase an arrival phase number, or negative value if 641 * terminated; this argument is normally the value returned by a 642 * previous call to {@code arrive} or its variants 643 * @return the next arrival phase number, or a negative value 644 * if terminated or argument is negative 645 * @throws InterruptedException if thread interrupted while waiting 646 */ 647 public int awaitAdvanceInterruptibly(int phase) 648 throws InterruptedException { 649 if (phase < 0) 650 return phase; 651 long s = getReconciledState(); 652 int p = phaseOf(s); 653 if (p != phase) 654 return p; 655 if (unarrivedOf(s) == 0 && parent != null) 656 parent.awaitAdvanceInterruptibly(phase); 657 return interruptibleWait(phase); 658 } 659 660 /** 661 * Awaits the phase of the barrier to advance from the given phase 662 * value or the given timeout to elapse, throwing {@code 663 * InterruptedException} if interrupted while waiting, or 664 * returning immediately if the current phase of the barrier is 665 * not equal to the given phase value or this barrier is 666 * terminated. It is an unenforced usage error for an 667 * unregistered party to invoke this method. 668 * 669 * @param phase an arrival phase number, or negative value if 670 * terminated; this argument is normally the value returned by a 671 * previous call to {@code arrive} or its variants 672 * @param timeout how long to wait before giving up, in units of 673 * {@code unit} 674 * @param unit a {@code TimeUnit} determining how to interpret the 675 * {@code timeout} parameter 676 * @return the next arrival phase number, or a negative value 677 * if terminated or argument is negative 678 * @throws InterruptedException if thread interrupted while waiting 679 * @throws TimeoutException if timed out while waiting 680 */ 681 public int awaitAdvanceInterruptibly(int phase, 682 long timeout, TimeUnit unit) 683 throws InterruptedException, TimeoutException { 684 if (phase < 0) 685 return phase; 686 long s = getReconciledState(); 687 int p = phaseOf(s); 688 if (p != phase) 689 return p; 690 if (unarrivedOf(s) == 0 && parent != null) 691 parent.awaitAdvanceInterruptibly(phase, timeout, unit); 692 return timedWait(phase, unit.toNanos(timeout)); 693 } 694 695 /** 696 * Forces this barrier to enter termination state. Counts of 697 * arrived and registered parties are unaffected. If this phaser 698 * has a parent, it too is terminated. This method may be useful 699 * for coordinating recovery after one or more tasks encounter 700 * unexpected exceptions. 701 */ 702 public void forceTermination() { 703 for (;;) { 704 long s = getReconciledState(); 705 int phase = phaseOf(s); 706 int parties = partiesOf(s); 707 int unarrived = unarrivedOf(s); 708 if (phase < 0 || 709 casState(s, stateFor(-1, parties, unarrived))) { 710 releaseWaiters(0); 711 releaseWaiters(1); 712 if (parent != null) 713 parent.forceTermination(); 714 return; 715 } 716 } 717 } 718 719 /** 720 * Returns the current phase number. The maximum phase number is 721 * {@code Integer.MAX_VALUE}, after which it restarts at 722 * zero. Upon termination, the phase number is negative. 723 * 724 * @return the phase number, or a negative value if terminated 725 */ 726 public final int getPhase() { 727 return phaseOf(getReconciledState()); 728 } 729 730 /** 731 * Returns the number of parties registered at this barrier. 732 * 733 * @return the number of parties 734 */ 735 public int getRegisteredParties() { 736 return partiesOf(state); 737 } 738 739 /** 740 * Returns the number of registered parties that have arrived at 741 * the current phase of this barrier. 742 * 743 * @return the number of arrived parties 744 */ 745 public int getArrivedParties() { 746 return arrivedOf(state); 747 } 748 749 /** 750 * Returns the number of registered parties that have not yet 751 * arrived at the current phase of this barrier. 752 * 753 * @return the number of unarrived parties 754 */ 755 public int getUnarrivedParties() { 756 return unarrivedOf(state); 757 } 758 759 /** 760 * Returns the parent of this phaser, or {@code null} if none. 761 * 762 * @return the parent of this phaser, or {@code null} if none 763 */ 764 public Phaser getParent() { 765 return parent; 766 } 767 768 /** 769 * Returns the root ancestor of this phaser, which is the same as 770 * this phaser if it has no parent. 771 * 772 * @return the root ancestor of this phaser 773 */ 774 public Phaser getRoot() { 775 return root; 776 } 777 778 /** 779 * Returns {@code true} if this barrier has been terminated. 780 * 781 * @return {@code true} if this barrier has been terminated 782 */ 783 public boolean isTerminated() { 784 return getPhase() < 0; 785 } 786 787 /** 788 * Overridable method to perform an action upon impending phase 789 * advance, and to control termination. This method is invoked 790 * upon arrival of the party tripping the barrier (when all other 791 * waiting parties are dormant). If this method returns {@code 792 * true}, then, rather than advance the phase number, this barrier 793 * will be set to a final termination state, and subsequent calls 794 * to {@link #isTerminated} will return true. Any (unchecked) 795 * Exception or Error thrown by an invocation of this method is 796 * propagated to the party attempting to trip the barrier, in 797 * which case no advance occurs. 798 * 799 * <p>The arguments to this method provide the state of the phaser 800 * prevailing for the current transition. (When called from within 801 * an implementation of {@code onAdvance} the values returned by 802 * methods such as {@code getPhase} may or may not reliably 803 * indicate the state to which this transition applies.) 804 * 805 * <p>The default version returns {@code true} when the number of 806 * registered parties is zero. Normally, overrides that arrange 807 * termination for other reasons should also preserve this 808 * property. 809 * 810 * <p>You may override this method to perform an action with side 811 * effects visible to participating tasks, but it is only sensible 812 * to do so in designs where all parties register before any 813 * arrive, and all {@link #awaitAdvance} at each phase. 814 * Otherwise, you cannot ensure lack of interference from other 815 * parties during the invocation of this method. Additionally, 816 * method {@code onAdvance} may be invoked more than once per 817 * transition if registrations are intermixed with arrivals. 818 * 819 * @param phase the phase number on entering the barrier 820 * @param registeredParties the current number of registered parties 821 * @return {@code true} if this barrier should terminate 822 */ 823 protected boolean onAdvance(int phase, int registeredParties) { 824 return registeredParties <= 0; 825 } 826 827 /** 828 * Returns a string identifying this phaser, as well as its 829 * state. The state, in brackets, includes the String {@code 830 * "phase = "} followed by the phase number, {@code "parties = "} 831 * followed by the number of registered parties, and {@code 832 * "arrived = "} followed by the number of arrived parties. 833 * 834 * @return a string identifying this barrier, as well as its state 835 */ 836 public String toString() { 837 long s = getReconciledState(); 838 return super.toString() + 839 "[phase = " + phaseOf(s) + 840 " parties = " + partiesOf(s) + 841 " arrived = " + arrivedOf(s) + "]"; 842 } 843 844 // methods for waiting 845 846 /** 847 * Wait nodes for Treiber stack representing wait queue 848 */ 849 static final class QNode implements ForkJoinPool.ManagedBlocker { 850 final Phaser phaser; 851 final int phase; 852 final long startTime; 853 final long nanos; 854 final boolean timed; 855 final boolean interruptible; 856 volatile boolean wasInterrupted = false; 857 volatile Thread thread; // nulled to cancel wait 858 QNode next; 859 QNode(Phaser phaser, int phase, boolean interruptible, 860 boolean timed, long startTime, long nanos) { 861 this.phaser = phaser; 862 this.phase = phase; 863 this.timed = timed; 864 this.interruptible = interruptible; 865 this.startTime = startTime; 866 this.nanos = nanos; 867 thread = Thread.currentThread(); 868 } 869 public boolean isReleasable() { 870 return (thread == null || 871 phaser.getPhase() != phase || 872 (interruptible && wasInterrupted) || 873 (timed && (nanos - (System.nanoTime() - startTime)) <= 0)); 874 } 875 public boolean block() { 876 if (Thread.interrupted()) { 877 wasInterrupted = true; 878 if (interruptible) 879 return true; 880 } 881 if (!timed) 882 LockSupport.park(this); 883 else { 884 long waitTime = nanos - (System.nanoTime() - startTime); 885 if (waitTime <= 0) 886 return true; 887 LockSupport.parkNanos(this, waitTime); 888 } 889 return isReleasable(); 890 } 891 void signal() { 892 Thread t = thread; 893 if (t != null) { 894 thread = null; 895 LockSupport.unpark(t); 896 } 897 } 898 boolean doWait() { 899 if (thread != null) { 900 try { 901 ForkJoinPool.managedBlock(this); 902 } catch (InterruptedException ie) { 903 } 904 } 905 return wasInterrupted; 906 } 907 908 } 909 910 /** 911 * Removes and signals waiting threads from wait queue. 912 */ 913 private void releaseWaiters(int phase) { 914 AtomicReference<QNode> head = queueFor(phase); 915 QNode q; 916 while ((q = head.get()) != null) { 917 if (head.compareAndSet(q, q.next)) 918 q.signal(); 919 } 920 } 921 922 /** 923 * Tries to enqueue given node in the appropriate wait queue. 924 * 925 * @return true if successful 926 */ 927 private boolean tryEnqueue(QNode node) { 928 AtomicReference<QNode> head = queueFor(node.phase); 929 return head.compareAndSet(node.next = head.get(), node); 930 } 931 932 /** 933 * Enqueues node and waits unless aborted or signalled. 934 * 935 * @return current phase 936 */ 937 private int untimedWait(int phase) { 938 QNode node = null; 939 boolean queued = false; 940 boolean interrupted = false; 941 int p; 942 while ((p = getPhase()) == phase) { 943 if (Thread.interrupted()) 944 interrupted = true; 945 else if (node == null) 946 node = new QNode(this, phase, false, false, 0, 0); 947 else if (!queued) 948 queued = tryEnqueue(node); 949 else 950 interrupted = node.doWait(); 951 } 952 if (node != null) 953 node.thread = null; 954 releaseWaiters(phase); 955 if (interrupted) 956 Thread.currentThread().interrupt(); 957 return p; 958 } 959 960 /** 961 * Interruptible version 962 * @return current phase 963 */ 964 private int interruptibleWait(int phase) throws InterruptedException { 965 QNode node = null; 966 boolean queued = false; 967 boolean interrupted = false; 968 int p; 969 while ((p = getPhase()) == phase && !interrupted) { 970 if (Thread.interrupted()) 971 interrupted = true; 972 else if (node == null) 973 node = new QNode(this, phase, true, false, 0, 0); 974 else if (!queued) 975 queued = tryEnqueue(node); 976 else 977 interrupted = node.doWait(); 978 } 979 if (node != null) 980 node.thread = null; 981 if (p != phase || (p = getPhase()) != phase) 982 releaseWaiters(phase); 983 if (interrupted) 984 throw new InterruptedException(); 985 return p; 986 } 987 988 /** 989 * Timeout version. 990 * @return current phase 991 */ 992 private int timedWait(int phase, long nanos) 993 throws InterruptedException, TimeoutException { 994 long startTime = System.nanoTime(); 995 QNode node = null; 996 boolean queued = false; 997 boolean interrupted = false; 998 int p; 999 while ((p = getPhase()) == phase && !interrupted) { 1000 if (Thread.interrupted()) 1001 interrupted = true; 1002 else if (nanos - (System.nanoTime() - startTime) <= 0) 1003 break; 1004 else if (node == null) 1005 node = new QNode(this, phase, true, true, startTime, nanos); 1006 else if (!queued) 1007 queued = tryEnqueue(node); 1008 else 1009 interrupted = node.doWait(); 1010 } 1011 if (node != null) 1012 node.thread = null; 1013 if (p != phase || (p = getPhase()) != phase) 1014 releaseWaiters(phase); 1015 if (interrupted) 1016 throw new InterruptedException(); 1017 if (p == phase) 1018 throw new TimeoutException(); 1019 return p; 1020 } 1021 1022 // Unsafe mechanics 1023 1024 private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe(); 1025 private static final long stateOffset = 1026 objectFieldOffset("state", Phaser.class); 1027 1028 private final boolean casState(long cmp, long val) { 1029 return UNSAFE.compareAndSwapLong(this, stateOffset, cmp, val); 1030 } 1031 1032 private static long objectFieldOffset(String field, Class<?> klazz) { 1033 try { 1034 return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field)); 1035 } catch (NoSuchFieldException e) { 1036 // Convert Exception to corresponding Error 1037 NoSuchFieldError error = new NoSuchFieldError(field); 1038 error.initCause(e); 1039 throw error; 1040 } 1041 } 1042 } | 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/licenses/publicdomain 34 */ 35 36 package java.util.concurrent; 37 38 import java.util.concurrent.TimeUnit; 39 import java.util.concurrent.TimeoutException; 40 import java.util.concurrent.atomic.AtomicReference; 41 import java.util.concurrent.locks.LockSupport; 42 43 /** 44 * A reusable synchronization barrier, similar in functionality to 45 * {@link java.util.concurrent.CyclicBarrier CyclicBarrier} and 46 * {@link java.util.concurrent.CountDownLatch CountDownLatch} 47 * but supporting more flexible usage. 48 * 49 * <p> <b>Registration.</b> Unlike the case for other barriers, the 50 * number of parties <em>registered</em> to synchronize on a phaser 51 * may vary over time. Tasks may be registered at any time (using 52 * methods {@link #register}, {@link #bulkRegister}, or forms of 53 * constructors establishing initial numbers of parties), and 54 * optionally deregistered upon any arrival (using {@link 55 * #arriveAndDeregister}). As is the case with most basic 56 * synchronization constructs, registration and deregistration affect 57 * only internal counts; they do not establish any further internal 58 * bookkeeping, so tasks cannot query whether they are registered. 59 * (However, you can introduce such bookkeeping by subclassing this 60 * class.) 61 * 62 * <p> <b>Synchronization.</b> Like a {@code CyclicBarrier}, a {@code 63 * Phaser} may be repeatedly awaited. Method {@link 64 * #arriveAndAwaitAdvance} has effect analogous to {@link 65 * java.util.concurrent.CyclicBarrier#await CyclicBarrier.await}. Each 66 * generation of a phaser has an associated phase number. The phase 67 * number starts at zero, and advances when all parties arrive at the 68 * phaser, wrapping around to zero after reaching {@code 69 * Integer.MAX_VALUE}. The use of phase numbers enables independent 70 * control of actions upon arrival at a phaser and upon awaiting 71 * others, via two kinds of methods that may be invoked by any 72 * registered party: 73 * 74 * <ul> 75 * 76 * <li> <b>Arrival.</b> Methods {@link #arrive} and 77 * {@link #arriveAndDeregister} record arrival. These methods 78 * do not block, but return an associated <em>arrival phase 79 * number</em>; that is, the phase number of the phaser to which 80 * the arrival applied. When the final party for a given phase 81 * arrives, an optional action is performed and the phase 82 * advances. These actions are performed by the party 83 * triggering a phase advance, and are arranged by overriding 84 * method {@link #onAdvance(int, int)}, which also controls 85 * termination. Overriding this method is similar to, but more 86 * flexible than, providing a barrier action to a {@code 87 * CyclicBarrier}. 88 * 89 * <li> <b>Waiting.</b> Method {@link #awaitAdvance} requires an 90 * argument indicating an arrival phase number, and returns when 91 * the phaser advances to (or is already at) a different phase. 92 * Unlike similar constructions using {@code CyclicBarrier}, 93 * method {@code awaitAdvance} continues to wait even if the 94 * waiting thread is interrupted. Interruptible and timeout 95 * versions are also available, but exceptions encountered while 96 * tasks wait interruptibly or with timeout do not change the 97 * state of the phaser. If necessary, you can perform any 98 * associated recovery within handlers of those exceptions, 99 * often after invoking {@code forceTermination}. Phasers may 100 * also be used by tasks executing in a {@link ForkJoinPool}, 101 * which will ensure sufficient parallelism to execute tasks 102 * when others are blocked waiting for a phase to advance. 103 * 104 * </ul> 105 * 106 * <p> <b>Termination.</b> A phaser may enter a <em>termination</em> 107 * state in which all synchronization methods immediately return 108 * without updating phaser state or waiting for advance, and 109 * indicating (via a negative phase value) that execution is complete. 110 * Termination is triggered when an invocation of {@code onAdvance} 111 * returns {@code true}. The default implementation returns {@code 112 * true} if a deregistration has caused the number of registered 113 * parties to become zero. As illustrated below, when phasers control 114 * actions with a fixed number of iterations, it is often convenient 115 * to override this method to cause termination when the current phase 116 * number reaches a threshold. Method {@link #forceTermination} is 117 * also available to abruptly release waiting threads and allow them 118 * to terminate. 119 * 120 * <p> <b>Tiering.</b> Phasers may be <em>tiered</em> (i.e., 121 * constructed in tree structures) to reduce contention. Phasers with 122 * large numbers of parties that would otherwise experience heavy 123 * synchronization contention costs may instead be set up so that 124 * groups of sub-phasers share a common parent. This may greatly 125 * increase throughput even though it incurs greater per-operation 126 * overhead. 127 * 128 * <p><b>Monitoring.</b> While synchronization methods may be invoked 129 * only by registered parties, the current state of a phaser may be 130 * monitored by any caller. At any given moment there are {@link 131 * #getRegisteredParties} parties in total, of which {@link 132 * #getArrivedParties} have arrived at the current phase ({@link 133 * #getPhase}). When the remaining ({@link #getUnarrivedParties}) 134 * parties arrive, the phase advances. The values returned by these 135 * methods may reflect transient states and so are not in general 136 * useful for synchronization control. Method {@link #toString} 137 * returns snapshots of these state queries in a form convenient for 138 * informal monitoring. 139 * 140 * <p><b>Sample usages:</b> 141 * 142 * <p>A {@code Phaser} may be used instead of a {@code CountDownLatch} 143 * to control a one-shot action serving a variable number of parties. 144 * The typical idiom is for the method setting this up to first 145 * register, then start the actions, then deregister, as in: 146 * 147 * <pre> {@code 148 * void runTasks(List<Runnable> tasks) { 149 * final Phaser phaser = new Phaser(1); // "1" to register self 150 * // create and start threads 151 * for (Runnable task : tasks) { 152 * phaser.register(); 153 * new Thread() { 154 * public void run() { 155 * phaser.arriveAndAwaitAdvance(); // await all creation 156 * task.run(); 157 * } 158 * }.start(); 159 * } 160 * 161 * // allow threads to start and deregister self 162 * phaser.arriveAndDeregister(); 163 * }}</pre> 164 * 165 * <p>One way to cause a set of threads to repeatedly perform actions 197 * 198 * <p>Related constructions may be used to await particular phase numbers 199 * in contexts where you are sure that the phase will never wrap around 200 * {@code Integer.MAX_VALUE}. For example: 201 * 202 * <pre> {@code 203 * void awaitPhase(Phaser phaser, int phase) { 204 * int p = phaser.register(); // assumes caller not already registered 205 * while (p < phase) { 206 * if (phaser.isTerminated()) 207 * // ... deal with unexpected termination 208 * else 209 * p = phaser.arriveAndAwaitAdvance(); 210 * } 211 * phaser.arriveAndDeregister(); 212 * }}</pre> 213 * 214 * 215 * <p>To create a set of tasks using a tree of phasers, 216 * you could use code of the following form, assuming a 217 * Task class with a constructor accepting a {@code Phaser} that 218 * it registers with upon construction: 219 * 220 * <pre> {@code 221 * void build(Task[] actions, int lo, int hi, Phaser ph) { 222 * if (hi - lo > TASKS_PER_PHASER) { 223 * for (int i = lo; i < hi; i += TASKS_PER_PHASER) { 224 * int j = Math.min(i + TASKS_PER_PHASER, hi); 225 * build(actions, i, j, new Phaser(ph)); 226 * } 227 * } else { 228 * for (int i = lo; i < hi; ++i) 229 * actions[i] = new Task(ph); 230 * // assumes new Task(ph) performs ph.register() 231 * } 232 * } 233 * // .. initially called, for n tasks via 234 * build(new Task[n], 0, n, new Phaser());}</pre> 235 * 236 * The best value of {@code TASKS_PER_PHASER} depends mainly on 237 * expected synchronization rates. A value as low as four may 238 * be appropriate for extremely small per-phase task bodies (thus 239 * high rates), or up to hundreds for extremely large ones. 240 * 241 * <p><b>Implementation notes</b>: This implementation restricts the 242 * maximum number of parties to 65535. Attempts to register additional 243 * parties result in {@code IllegalStateException}. However, you can and 244 * should create tiered phasers to accommodate arbitrarily large sets 245 * of participants. 246 * 247 * @since 1.7 248 * @author Doug Lea 249 */ 250 public class Phaser { 251 /* 252 * This class implements an extension of X10 "clocks". Thanks to 253 * Vijay Saraswat for the idea, and to Vivek Sarkar for 254 * enhancements to extend functionality. 255 */ 256 257 /** 258 * Primary state representation, holding four fields: 259 * 260 * * unarrived -- the number of parties yet to hit barrier (bits 0-15) 261 * * parties -- the number of parties to wait (bits 16-31) 262 * * phase -- the generation of the barrier (bits 32-62) 263 * * terminated -- set if barrier is terminated (bit 63 / sign) 264 * 265 * However, to efficiently maintain atomicity, these values are 266 * packed into a single (atomic) long. Termination uses the sign 267 * bit of 32 bit representation of phase, so phase is set to -1 on 268 * termination. Good performance relies on keeping state decoding 269 * and encoding simple, and keeping race windows short. 270 */ 271 private volatile long state; 272 273 private static final int MAX_PARTIES = 0xffff; 274 private static final int MAX_PHASE = 0x7fffffff; 275 private static final int PARTIES_SHIFT = 16; 276 private static final int PHASE_SHIFT = 32; 277 private static final int UNARRIVED_MASK = 0xffff; // to mask ints 278 private static final long PARTIES_MASK = 0xffff0000L; // to mask longs 279 private static final long ONE_ARRIVAL = 1L; 280 private static final long ONE_PARTY = 1L << PARTIES_SHIFT; 281 private static final long TERMINATION_BIT = 1L << 63; 282 283 // The following unpacking methods are usually manually inlined 284 285 private static int unarrivedOf(long s) { 286 return (int)s & UNARRIVED_MASK; 287 } 288 289 private static int partiesOf(long s) { 290 return (int)s >>> PARTIES_SHIFT; 291 } 292 293 private static int phaseOf(long s) { 294 return (int) (s >>> PHASE_SHIFT); 295 } 296 297 private static int arrivedOf(long s) { 298 return partiesOf(s) - unarrivedOf(s); 299 } 300 301 /** 302 * The parent of this phaser, or null if none 303 */ 304 private final Phaser parent; 305 306 /** 307 * The root of phaser tree. Equals this if not in a tree. Used to 308 * support faster state push-down. 309 */ 310 private final Phaser root; 311 312 /** 313 * Heads of Treiber stacks for waiting threads. To eliminate 314 * contention when releasing some threads while adding others, we 315 * use two of them, alternating across even and odd phases. 316 * Subphasers share queues with root to speed up releases. 317 */ 318 private final AtomicReference<QNode> evenQ; 319 private final AtomicReference<QNode> oddQ; 320 321 private AtomicReference<QNode> queueFor(int phase) { 322 return ((phase & 1) == 0) ? evenQ : oddQ; 323 } 324 325 /** 326 * Returns message string for bounds exceptions on arrival. 327 */ 328 private String badArrive(long s) { 329 return "Attempted arrival of unregistered party for " + 330 stateToString(s); 331 } 332 333 /** 334 * Returns message string for bounds exceptions on registration. 335 */ 336 private String badRegister(long s) { 337 return "Attempt to register more than " + 338 MAX_PARTIES + " parties for " + stateToString(s); 339 } 340 341 /** 342 * Main implementation for methods arrive and arriveAndDeregister. 343 * Manually tuned to speed up and minimize race windows for the 344 * common case of just decrementing unarrived field. 345 * 346 * @param adj - adjustment to apply to state -- either 347 * ONE_ARRIVAL (for arrive) or 348 * ONE_ARRIVAL|ONE_PARTY (for arriveAndDeregister) 349 */ 350 private int doArrive(long adj) { 351 for (;;) { 352 long s = state; 353 int unarrived = (int)s & UNARRIVED_MASK; 354 int phase = (int)(s >>> PHASE_SHIFT); 355 if (phase < 0) 356 return phase; 357 else if (unarrived == 0) { 358 if (reconcileState() == s) // recheck 359 throw new IllegalStateException(badArrive(s)); 360 } 361 else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adj)) { 362 if (unarrived == 1) { 363 long p = s & PARTIES_MASK; // unshifted parties field 364 long lu = p >>> PARTIES_SHIFT; 365 int u = (int)lu; 366 int nextPhase = (phase + 1) & MAX_PHASE; 367 long next = ((long)nextPhase << PHASE_SHIFT) | p | lu; 368 final Phaser parent = this.parent; 369 if (parent == null) { 370 if (onAdvance(phase, u)) 371 next |= TERMINATION_BIT; 372 UNSAFE.compareAndSwapLong(this, stateOffset, s, next); 373 releaseWaiters(phase); 374 } 375 else { 376 parent.doArrive((u == 0) ? 377 ONE_ARRIVAL|ONE_PARTY : ONE_ARRIVAL); 378 if ((int)(parent.state >>> PHASE_SHIFT) != nextPhase) 379 reconcileState(); 380 else if (state == s) 381 UNSAFE.compareAndSwapLong(this, stateOffset, s, 382 next); 383 } 384 } 385 return phase; 386 } 387 } 388 } 389 390 /** 391 * Implementation of register, bulkRegister 392 * 393 * @param registrations number to add to both parties and 394 * unarrived fields. Must be greater than zero. 395 */ 396 private int doRegister(int registrations) { 397 // adjustment to state 398 long adj = ((long)registrations << PARTIES_SHIFT) | registrations; 399 final Phaser parent = this.parent; 400 for (;;) { 401 long s = (parent == null) ? state : reconcileState(); 402 int parties = (int)s >>> PARTIES_SHIFT; 403 int phase = (int)(s >>> PHASE_SHIFT); 404 if (phase < 0) 405 return phase; 406 else if (registrations > MAX_PARTIES - parties) 407 throw new IllegalStateException(badRegister(s)); 408 else if ((parties == 0 && parent == null) || // first reg of root 409 ((int)s & UNARRIVED_MASK) != 0) { // not advancing 410 if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s + adj)) 411 return phase; 412 } 413 else if (parties != 0) // wait for onAdvance 414 root.internalAwaitAdvance(phase, null); 415 else { // 1st registration of child 416 synchronized (this) { // register parent first 417 if (reconcileState() == s) { // recheck under lock 418 parent.doRegister(1); // OK if throws IllegalState 419 for (;;) { // simpler form of outer loop 420 s = reconcileState(); 421 phase = (int)(s >>> PHASE_SHIFT); 422 if (phase < 0 || 423 UNSAFE.compareAndSwapLong(this, stateOffset, 424 s, s + adj)) 425 return phase; 426 } 427 } 428 } 429 } 430 } 431 } 432 433 /** 434 * Recursively resolves lagged phase propagation from root if necessary. 435 */ 436 private long reconcileState() { 437 Phaser par = parent; 438 long s = state; 439 if (par != null) { 440 Phaser rt = root; 441 int phase, rPhase; 442 while ((phase = (int)(s >>> PHASE_SHIFT)) >= 0 && 443 (rPhase = (int)(rt.state >>> PHASE_SHIFT)) != phase) { 444 if (par != rt && (int)(par.state >>> PHASE_SHIFT) != rPhase) 445 par.reconcileState(); 446 else if (rPhase < 0 || ((int)s & UNARRIVED_MASK) == 0) { 447 long u = s & PARTIES_MASK; // reset unarrived to parties 448 long next = ((((long) rPhase) << PHASE_SHIFT) | u | 449 (u >>> PARTIES_SHIFT)); 450 UNSAFE.compareAndSwapLong(this, stateOffset, s, next); 451 } 452 s = state; 453 } 454 } 455 return s; 456 } 457 458 /** 459 * Creates a new phaser with no initially registered parties, no 460 * parent, and initial phase number 0. Any thread using this 461 * phaser will need to first register for it. 462 */ 463 public Phaser() { 464 this(null, 0); 465 } 466 467 /** 468 * Creates a new phaser with the given number of registered 469 * unarrived parties, no parent, and initial phase number 0. 470 * 471 * @param parties the number of parties required to advance to the 472 * next phase 473 * @throws IllegalArgumentException if parties less than zero 474 * or greater than the maximum number of parties supported 475 */ 476 public Phaser(int parties) { 477 this(null, parties); 478 } 479 480 /** 481 * Equivalent to {@link #Phaser(Phaser, int) Phaser(parent, 0)}. 482 * 483 * @param parent the parent phaser 484 */ 485 public Phaser(Phaser parent) { 486 this(parent, 0); 487 } 488 489 /** 490 * Creates a new phaser with the given parent and number of 491 * registered unarrived parties. Registration and deregistration 492 * of this child phaser with its parent are managed automatically. 493 * If the given parent is non-null, whenever this child phaser has 494 * any registered parties (as established in this constructor, 495 * {@link #register}, or {@link #bulkRegister}), this child phaser 496 * is registered with its parent. Whenever the number of 497 * registered parties becomes zero as the result of an invocation 498 * of {@link #arriveAndDeregister}, this child phaser is 499 * deregistered from its parent. 500 * 501 * @param parent the parent phaser 502 * @param parties the number of parties required to advance to the 503 * next phase 504 * @throws IllegalArgumentException if parties less than zero 505 * or greater than the maximum number of parties supported 506 */ 507 public Phaser(Phaser parent, int parties) { 508 if (parties >>> PARTIES_SHIFT != 0) 509 throw new IllegalArgumentException("Illegal number of parties"); 510 long s = ((long) parties) | (((long) parties) << PARTIES_SHIFT); 511 this.parent = parent; 512 if (parent != null) { 513 Phaser r = parent.root; 514 this.root = r; 515 this.evenQ = r.evenQ; 516 this.oddQ = r.oddQ; 517 if (parties != 0) 518 s |= ((long)(parent.doRegister(1))) << PHASE_SHIFT; 519 } 520 else { 521 this.root = this; 522 this.evenQ = new AtomicReference<QNode>(); 523 this.oddQ = new AtomicReference<QNode>(); 524 } 525 this.state = s; 526 } 527 528 /** 529 * Adds a new unarrived party to this phaser. If an ongoing 530 * invocation of {@link #onAdvance} is in progress, this method 531 * may await its completion before returning. If this phaser has 532 * a parent, and this phaser previously had no registered parties, 533 * this phaser is also registered with its parent. 534 * 535 * @return the arrival phase number to which this registration applied 536 * @throws IllegalStateException if attempting to register more 537 * than the maximum supported number of parties 538 */ 539 public int register() { 540 return doRegister(1); 541 } 542 543 /** 544 * Adds the given number of new unarrived parties to this phaser. 545 * If an ongoing invocation of {@link #onAdvance} is in progress, 546 * this method may await its completion before returning. If this 547 * phaser has a parent, and the given number of parities is 548 * greater than zero, and this phaser previously had no registered 549 * parties, this phaser is also registered with its parent. 550 * 551 * @param parties the number of additional parties required to 552 * advance to the next phase 553 * @return the arrival phase number to which this registration applied 554 * @throws IllegalStateException if attempting to register more 555 * than the maximum supported number of parties 556 * @throws IllegalArgumentException if {@code parties < 0} 557 */ 558 public int bulkRegister(int parties) { 559 if (parties < 0) 560 throw new IllegalArgumentException(); 561 if (parties == 0) 562 return getPhase(); 563 return doRegister(parties); 564 } 565 566 /** 567 * Arrives at this phaser, without waiting for others to arrive. 568 * 569 * <p>It is a usage error for an unregistered party to invoke this 570 * method. However, this error may result in an {@code 571 * IllegalStateException} only upon some subsequent operation on 572 * this phaser, if ever. 573 * 574 * @return the arrival phase number, or a negative value if terminated 575 * @throws IllegalStateException if not terminated and the number 576 * of unarrived parties would become negative 577 */ 578 public int arrive() { 579 return doArrive(ONE_ARRIVAL); 580 } 581 582 /** 583 * Arrives at this phaser and deregisters from it without waiting 584 * for others to arrive. Deregistration reduces the number of 585 * parties required to advance in future phases. If this phaser 586 * has a parent, and deregistration causes this phaser to have 587 * zero parties, this phaser is also deregistered from its parent. 588 * 589 * <p>It is a usage error for an unregistered party to invoke this 590 * method. However, this error may result in an {@code 591 * IllegalStateException} only upon some subsequent operation on 592 * this phaser, if ever. 593 * 594 * @return the arrival phase number, or a negative value if terminated 595 * @throws IllegalStateException if not terminated and the number 596 * of registered or unarrived parties would become negative 597 */ 598 public int arriveAndDeregister() { 599 return doArrive(ONE_ARRIVAL|ONE_PARTY); 600 } 601 602 /** 603 * Arrives at this phaser and awaits others. Equivalent in effect 604 * to {@code awaitAdvance(arrive())}. If you need to await with 605 * interruption or timeout, you can arrange this with an analogous 606 * construction using one of the other forms of the {@code 607 * awaitAdvance} method. If instead you need to deregister upon 608 * arrival, use {@code awaitAdvance(arriveAndDeregister())}. 609 * 610 * <p>It is a usage error for an unregistered party to invoke this 611 * method. However, this error may result in an {@code 612 * IllegalStateException} only upon some subsequent operation on 613 * this phaser, if ever. 614 * 615 * @return the arrival phase number, or a negative number if terminated 616 * @throws IllegalStateException if not terminated and the number 617 * of unarrived parties would become negative 618 */ 619 public int arriveAndAwaitAdvance() { 620 return awaitAdvance(doArrive(ONE_ARRIVAL)); 621 } 622 623 /** 624 * Awaits the phase of this phaser to advance from the given phase 625 * value, returning immediately if the current phase is not equal 626 * to the given phase value or this phaser is terminated. 627 * 628 * @param phase an arrival phase number, or negative value if 629 * terminated; this argument is normally the value returned by a 630 * previous call to {@code arrive} or {@code arriveAndDeregister}. 631 * @return the next arrival phase number, or a negative value 632 * if terminated or argument is negative 633 */ 634 public int awaitAdvance(int phase) { 635 Phaser rt; 636 int p = (int)(state >>> PHASE_SHIFT); 637 if (phase < 0) 638 return phase; 639 if (p == phase && 640 (p = (int)((rt = root).state >>> PHASE_SHIFT)) == phase) 641 return rt.internalAwaitAdvance(phase, null); 642 return p; 643 } 644 645 /** 646 * Awaits the phase of this phaser to advance from the given phase 647 * value, throwing {@code InterruptedException} if interrupted 648 * while waiting, or returning immediately if the current phase is 649 * not equal to the given phase value or this phaser is 650 * terminated. 651 * 652 * @param phase an arrival phase number, or negative value if 653 * terminated; this argument is normally the value returned by a 654 * previous call to {@code arrive} or {@code arriveAndDeregister}. 655 * @return the next arrival phase number, or a negative value 656 * if terminated or argument is negative 657 * @throws InterruptedException if thread interrupted while waiting 658 */ 659 public int awaitAdvanceInterruptibly(int phase) 660 throws InterruptedException { 661 Phaser rt; 662 int p = (int)(state >>> PHASE_SHIFT); 663 if (phase < 0) 664 return phase; 665 if (p == phase && 666 (p = (int)((rt = root).state >>> PHASE_SHIFT)) == phase) { 667 QNode node = new QNode(this, phase, true, false, 0L); 668 p = rt.internalAwaitAdvance(phase, node); 669 if (node.wasInterrupted) 670 throw new InterruptedException(); 671 } 672 return p; 673 } 674 675 /** 676 * Awaits the phase of this phaser to advance from the given phase 677 * value or the given timeout to elapse, throwing {@code 678 * InterruptedException} if interrupted while waiting, or 679 * returning immediately if the current phase is not equal to the 680 * given phase value or this phaser is terminated. 681 * 682 * @param phase an arrival phase number, or negative value if 683 * terminated; this argument is normally the value returned by a 684 * previous call to {@code arrive} or {@code arriveAndDeregister}. 685 * @param timeout how long to wait before giving up, in units of 686 * {@code unit} 687 * @param unit a {@code TimeUnit} determining how to interpret the 688 * {@code timeout} parameter 689 * @return the next arrival phase number, or a negative value 690 * if terminated or argument is negative 691 * @throws InterruptedException if thread interrupted while waiting 692 * @throws TimeoutException if timed out while waiting 693 */ 694 public int awaitAdvanceInterruptibly(int phase, 695 long timeout, TimeUnit unit) 696 throws InterruptedException, TimeoutException { 697 long nanos = unit.toNanos(timeout); 698 Phaser rt; 699 int p = (int)(state >>> PHASE_SHIFT); 700 if (phase < 0) 701 return phase; 702 if (p == phase && 703 (p = (int)((rt = root).state >>> PHASE_SHIFT)) == phase) { 704 QNode node = new QNode(this, phase, true, true, nanos); 705 p = rt.internalAwaitAdvance(phase, node); 706 if (node.wasInterrupted) 707 throw new InterruptedException(); 708 else if (p == phase) 709 throw new TimeoutException(); 710 } 711 return p; 712 } 713 714 /** 715 * Forces this phaser to enter termination state. Counts of 716 * arrived and registered parties are unaffected. If this phaser 717 * is a member of a tiered set of phasers, then all of the phasers 718 * in the set are terminated. If this phaser is already 719 * terminated, this method has no effect. This method may be 720 * useful for coordinating recovery after one or more tasks 721 * encounter unexpected exceptions. 722 */ 723 public void forceTermination() { 724 // Only need to change root state 725 final Phaser root = this.root; 726 long s; 727 while ((s = root.state) >= 0) { 728 if (UNSAFE.compareAndSwapLong(root, stateOffset, 729 s, s | TERMINATION_BIT)) { 730 releaseWaiters(0); // signal all threads 731 releaseWaiters(1); 732 return; 733 } 734 } 735 } 736 737 /** 738 * Returns the current phase number. The maximum phase number is 739 * {@code Integer.MAX_VALUE}, after which it restarts at 740 * zero. Upon termination, the phase number is negative, 741 * in which case the prevailing phase prior to termination 742 * may be obtained via {@code getPhase() + Integer.MIN_VALUE}. 743 * 744 * @return the phase number, or a negative value if terminated 745 */ 746 public final int getPhase() { 747 return (int)(root.state >>> PHASE_SHIFT); 748 } 749 750 /** 751 * Returns the number of parties registered at this phaser. 752 * 753 * @return the number of parties 754 */ 755 public int getRegisteredParties() { 756 return partiesOf(state); 757 } 758 759 /** 760 * Returns the number of registered parties that have arrived at 761 * the current phase of this phaser. 762 * 763 * @return the number of arrived parties 764 */ 765 public int getArrivedParties() { 766 long s = state; 767 int u = unarrivedOf(s); // only reconcile if possibly needed 768 return (u != 0 || parent == null) ? 769 partiesOf(s) - u : 770 arrivedOf(reconcileState()); 771 } 772 773 /** 774 * Returns the number of registered parties that have not yet 775 * arrived at the current phase of this phaser. 776 * 777 * @return the number of unarrived parties 778 */ 779 public int getUnarrivedParties() { 780 int u = unarrivedOf(state); 781 return (u != 0 || parent == null) ? u : unarrivedOf(reconcileState()); 782 } 783 784 /** 785 * Returns the parent of this phaser, or {@code null} if none. 786 * 787 * @return the parent of this phaser, or {@code null} if none 788 */ 789 public Phaser getParent() { 790 return parent; 791 } 792 793 /** 794 * Returns the root ancestor of this phaser, which is the same as 795 * this phaser if it has no parent. 796 * 797 * @return the root ancestor of this phaser 798 */ 799 public Phaser getRoot() { 800 return root; 801 } 802 803 /** 804 * Returns {@code true} if this phaser has been terminated. 805 * 806 * @return {@code true} if this phaser has been terminated 807 */ 808 public boolean isTerminated() { 809 return root.state < 0L; 810 } 811 812 /** 813 * Overridable method to perform an action upon impending phase 814 * advance, and to control termination. This method is invoked 815 * upon arrival of the party advancing this phaser (when all other 816 * waiting parties are dormant). If this method returns {@code 817 * true}, then, rather than advance the phase number, this phaser 818 * will be set to a final termination state, and subsequent calls 819 * to {@link #isTerminated} will return true. Any (unchecked) 820 * Exception or Error thrown by an invocation of this method is 821 * propagated to the party attempting to advance this phaser, in 822 * which case no advance occurs. 823 * 824 * <p>The arguments to this method provide the state of the phaser 825 * prevailing for the current transition. The effects of invoking 826 * arrival, registration, and waiting methods on this phaser from 827 * within {@code onAdvance} are unspecified and should not be 828 * relied on. 829 * 830 * <p>If this phaser is a member of a tiered set of phasers, then 831 * {@code onAdvance} is invoked only for its root phaser on each 832 * advance. 833 * 834 * <p>To support the most common use cases, the default 835 * implementation of this method returns {@code true} when the 836 * number of registered parties has become zero as the result of a 837 * party invoking {@code arriveAndDeregister}. You can disable 838 * this behavior, thus enabling continuation upon future 839 * registrations, by overriding this method to always return 840 * {@code false}: 841 * 842 * <pre> {@code 843 * Phaser phaser = new Phaser() { 844 * protected boolean onAdvance(int phase, int parties) { return false; } 845 * }}</pre> 846 * 847 * @param phase the current phase number on entry to this method, 848 * before this phaser is advanced 849 * @param registeredParties the current number of registered parties 850 * @return {@code true} if this phaser should terminate 851 */ 852 protected boolean onAdvance(int phase, int registeredParties) { 853 return registeredParties == 0; 854 } 855 856 /** 857 * Returns a string identifying this phaser, as well as its 858 * state. The state, in brackets, includes the String {@code 859 * "phase = "} followed by the phase number, {@code "parties = "} 860 * followed by the number of registered parties, and {@code 861 * "arrived = "} followed by the number of arrived parties. 862 * 863 * @return a string identifying this phaser, as well as its state 864 */ 865 public String toString() { 866 return stateToString(reconcileState()); 867 } 868 869 /** 870 * Implementation of toString and string-based error messages 871 */ 872 private String stateToString(long s) { 873 return super.toString() + 874 "[phase = " + phaseOf(s) + 875 " parties = " + partiesOf(s) + 876 " arrived = " + arrivedOf(s) + "]"; 877 } 878 879 // Waiting mechanics 880 881 /** 882 * Removes and signals threads from queue for phase. 883 */ 884 private void releaseWaiters(int phase) { 885 QNode q; // first element of queue 886 int p; // its phase 887 Thread t; // its thread 888 AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ; 889 while ((q = head.get()) != null && 890 ((p = q.phase) == phase || 891 (int)(root.state >>> PHASE_SHIFT) != p)) { 892 if (head.compareAndSet(q, q.next) && 893 (t = q.thread) != null) { 894 q.thread = null; 895 LockSupport.unpark(t); 896 } 897 } 898 } 899 900 /** The number of CPUs, for spin control */ 901 private static final int NCPU = Runtime.getRuntime().availableProcessors(); 902 903 /** 904 * The number of times to spin before blocking while waiting for 905 * advance, per arrival while waiting. On multiprocessors, fully 906 * blocking and waking up a large number of threads all at once is 907 * usually a very slow process, so we use rechargeable spins to 908 * avoid it when threads regularly arrive: When a thread in 909 * internalAwaitAdvance notices another arrival before blocking, 910 * and there appear to be enough CPUs available, it spins 911 * SPINS_PER_ARRIVAL more times before blocking. The value trades 912 * off good-citizenship vs big unnecessary slowdowns. 913 */ 914 static final int SPINS_PER_ARRIVAL = (NCPU < 2) ? 1 : 1 << 8; 915 916 /** 917 * Possibly blocks and waits for phase to advance unless aborted. 918 * Call only from root node. 919 * 920 * @param phase current phase 921 * @param node if non-null, the wait node to track interrupt and timeout; 922 * if null, denotes noninterruptible wait 923 * @return current phase 924 */ 925 private int internalAwaitAdvance(int phase, QNode node) { 926 releaseWaiters(phase-1); // ensure old queue clean 927 boolean queued = false; // true when node is enqueued 928 int lastUnarrived = 0; // to increase spins upon change 929 int spins = SPINS_PER_ARRIVAL; 930 long s; 931 int p; 932 while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) { 933 if (node == null) { // spinning in noninterruptible mode 934 int unarrived = (int)s & UNARRIVED_MASK; 935 if (unarrived != lastUnarrived && 936 (lastUnarrived = unarrived) < NCPU) 937 spins += SPINS_PER_ARRIVAL; 938 boolean interrupted = Thread.interrupted(); 939 if (interrupted || --spins < 0) { // need node to record intr 940 node = new QNode(this, phase, false, false, 0L); 941 node.wasInterrupted = interrupted; 942 } 943 } 944 else if (node.isReleasable()) // done or aborted 945 break; 946 else if (!queued) { // push onto queue 947 AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ; 948 QNode q = node.next = head.get(); 949 if ((q == null || q.phase == phase) && 950 (int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq 951 queued = head.compareAndSet(q, node); 952 } 953 else { 954 try { 955 ForkJoinPool.managedBlock(node); 956 } catch (InterruptedException ie) { 957 node.wasInterrupted = true; 958 } 959 } 960 } 961 962 if (node != null) { 963 if (node.thread != null) 964 node.thread = null; // avoid need for unpark() 965 if (node.wasInterrupted && !node.interruptible) 966 Thread.currentThread().interrupt(); 967 if ((p = (int)(state >>> PHASE_SHIFT)) == phase) 968 return p; // recheck abort 969 } 970 releaseWaiters(phase); 971 return p; 972 } 973 974 /** 975 * Wait nodes for Treiber stack representing wait queue 976 */ 977 static final class QNode implements ForkJoinPool.ManagedBlocker { 978 final Phaser phaser; 979 final int phase; 980 final boolean interruptible; 981 final boolean timed; 982 boolean wasInterrupted; 983 long nanos; 984 long lastTime; 985 volatile Thread thread; // nulled to cancel wait 986 QNode next; 987 988 QNode(Phaser phaser, int phase, boolean interruptible, 989 boolean timed, long nanos) { 990 this.phaser = phaser; 991 this.phase = phase; 992 this.interruptible = interruptible; 993 this.nanos = nanos; 994 this.timed = timed; 995 this.lastTime = timed ? System.nanoTime() : 0L; 996 thread = Thread.currentThread(); 997 } 998 999 public boolean isReleasable() { 1000 if (thread == null) 1001 return true; 1002 if (phaser.getPhase() != phase) { 1003 thread = null; 1004 return true; 1005 } 1006 if (Thread.interrupted()) 1007 wasInterrupted = true; 1008 if (wasInterrupted && interruptible) { 1009 thread = null; 1010 return true; 1011 } 1012 if (timed) { 1013 if (nanos > 0L) { 1014 long now = System.nanoTime(); 1015 nanos -= now - lastTime; 1016 lastTime = now; 1017 } 1018 if (nanos <= 0L) { 1019 thread = null; 1020 return true; 1021 } 1022 } 1023 return false; 1024 } 1025 1026 public boolean block() { 1027 if (isReleasable()) 1028 return true; 1029 else if (!timed) 1030 LockSupport.park(this); 1031 else if (nanos > 0) 1032 LockSupport.parkNanos(this, nanos); 1033 return isReleasable(); 1034 } 1035 } 1036 1037 // Unsafe mechanics 1038 1039 private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe(); 1040 private static final long stateOffset = 1041 objectFieldOffset("state", Phaser.class); 1042 1043 private static long objectFieldOffset(String field, Class<?> klazz) { 1044 try { 1045 return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field)); 1046 } catch (NoSuchFieldException e) { 1047 // Convert Exception to corresponding Error 1048 NoSuchFieldError error = new NoSuchFieldError(field); 1049 error.initCause(e); 1050 throw error; 1051 } 1052 } 1053 } |