1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/licenses/publicdomain 34 */ 35 36 package java.util.concurrent; 37 38 import java.util.concurrent.atomic.AtomicReference; 39 import java.util.concurrent.locks.LockSupport; 40 41 /** 42 * A reusable synchronization barrier, similar in functionality to 43 * {@link java.util.concurrent.CyclicBarrier CyclicBarrier} and 44 * {@link java.util.concurrent.CountDownLatch CountDownLatch} 45 * but supporting more flexible usage. 46 * 47 * <p> <b>Registration.</b> Unlike the case for other barriers, the 48 * number of parties <em>registered</em> to synchronize on a phaser 49 * may vary over time. Tasks may be registered at any time (using 50 * methods {@link #register}, {@link #bulkRegister}, or forms of 51 * constructors establishing initial numbers of parties), and 52 * optionally deregistered upon any arrival (using {@link 53 * #arriveAndDeregister}). As is the case with most basic 54 * synchronization constructs, registration and deregistration affect 55 * only internal counts; they do not establish any further internal 56 * bookkeeping, so tasks cannot query whether they are registered. 57 * (However, you can introduce such bookkeeping by subclassing this 58 * class.) 59 * 60 * <p> <b>Synchronization.</b> Like a {@code CyclicBarrier}, a {@code 61 * Phaser} may be repeatedly awaited. Method {@link 62 * #arriveAndAwaitAdvance} has effect analogous to {@link 63 * java.util.concurrent.CyclicBarrier#await CyclicBarrier.await}. Each 64 * generation of a {@code Phaser} has an associated phase number. The 65 * phase number starts at zero, and advances when all parties arrive 66 * at the barrier, wrapping around to zero after reaching {@code 67 * Integer.MAX_VALUE}. The use of phase numbers enables independent 68 * control of actions upon arrival at a barrier and upon awaiting 69 * others, via two kinds of methods that may be invoked by any 70 * registered party: 71 * 72 * <ul> 73 * 74 * <li> <b>Arrival.</b> Methods {@link #arrive} and 75 * {@link #arriveAndDeregister} record arrival at a 76 * barrier. These methods do not block, but return an associated 77 * <em>arrival phase number</em>; that is, the phase number of 78 * the barrier to which the arrival applied. When the final 79 * party for a given phase arrives, an optional barrier action 80 * is performed and the phase advances. Barrier actions, 81 * performed by the party triggering a phase advance, are 82 * arranged by overriding method {@link #onAdvance(int, int)}, 83 * which also controls termination. Overriding this method is 84 * similar to, but more flexible than, providing a barrier 85 * action to a {@code CyclicBarrier}. 86 * 87 * <li> <b>Waiting.</b> Method {@link #awaitAdvance} requires an 88 * argument indicating an arrival phase number, and returns when 89 * the barrier advances to (or is already at) a different phase. 90 * Unlike similar constructions using {@code CyclicBarrier}, 91 * method {@code awaitAdvance} continues to wait even if the 92 * waiting thread is interrupted. Interruptible and timeout 93 * versions are also available, but exceptions encountered while 94 * tasks wait interruptibly or with timeout do not change the 95 * state of the barrier. If necessary, you can perform any 96 * associated recovery within handlers of those exceptions, 97 * often after invoking {@code forceTermination}. Phasers may 98 * also be used by tasks executing in a {@link ForkJoinPool}, 99 * which will ensure sufficient parallelism to execute tasks 100 * when others are blocked waiting for a phase to advance. 101 * 102 * </ul> 103 * 104 * <p> <b>Termination.</b> A {@code Phaser} may enter a 105 * <em>termination</em> state in which all synchronization methods 106 * immediately return without updating phaser state or waiting for 107 * advance, and indicating (via a negative phase value) that execution 108 * is complete. Termination is triggered when an invocation of {@code 109 * onAdvance} returns {@code true}. As illustrated below, when 110 * phasers control actions with a fixed number of iterations, it is 111 * often convenient to override this method to cause termination when 112 * the current phase number reaches a threshold. Method {@link 113 * #forceTermination} is also available to abruptly release waiting 114 * threads and allow them to terminate. 115 * 116 * <p> <b>Tiering.</b> Phasers may be <em>tiered</em> (i.e., arranged 117 * in tree structures) to reduce contention. Phasers with large 118 * numbers of parties that would otherwise experience heavy 119 * synchronization contention costs may instead be set up so that 120 * groups of sub-phasers share a common parent. This may greatly 121 * increase throughput even though it incurs greater per-operation 122 * overhead. 123 * 124 * <p><b>Monitoring.</b> While synchronization methods may be invoked 125 * only by registered parties, the current state of a phaser may be 126 * monitored by any caller. At any given moment there are {@link 127 * #getRegisteredParties} parties in total, of which {@link 128 * #getArrivedParties} have arrived at the current phase ({@link 129 * #getPhase}). When the remaining ({@link #getUnarrivedParties}) 130 * parties arrive, the phase advances. The values returned by these 131 * methods may reflect transient states and so are not in general 132 * useful for synchronization control. Method {@link #toString} 133 * returns snapshots of these state queries in a form convenient for 134 * informal monitoring. 135 * 136 * <p><b>Sample usages:</b> 137 * 138 * <p>A {@code Phaser} may be used instead of a {@code CountDownLatch} 139 * to control a one-shot action serving a variable number of 140 * parties. The typical idiom is for the method setting this up to 141 * first register, then start the actions, then deregister, as in: 142 * 143 * <pre> {@code 144 * void runTasks(List<Runnable> tasks) { 145 * final Phaser phaser = new Phaser(1); // "1" to register self 146 * // create and start threads 147 * for (Runnable task : tasks) { 148 * phaser.register(); 149 * new Thread() { 150 * public void run() { 151 * phaser.arriveAndAwaitAdvance(); // await all creation 152 * task.run(); 153 * } 154 * }.start(); 155 * } 156 * 157 * // allow threads to start and deregister self 158 * phaser.arriveAndDeregister(); 159 * }}</pre> 160 * 161 * <p>One way to cause a set of threads to repeatedly perform actions 162 * for a given number of iterations is to override {@code onAdvance}: 163 * 164 * <pre> {@code 165 * void startTasks(List<Runnable> tasks, final int iterations) { 166 * final Phaser phaser = new Phaser() { 167 * protected boolean onAdvance(int phase, int registeredParties) { 168 * return phase >= iterations || registeredParties == 0; 169 * } 170 * }; 171 * phaser.register(); 172 * for (final Runnable task : tasks) { 173 * phaser.register(); 174 * new Thread() { 175 * public void run() { 176 * do { 177 * task.run(); 178 * phaser.arriveAndAwaitAdvance(); 179 * } while (!phaser.isTerminated()); 180 * } 181 * }.start(); 182 * } 183 * phaser.arriveAndDeregister(); // deregister self, don't wait 184 * }}</pre> 185 * 186 * If the main task must later await termination, it 187 * may re-register and then execute a similar loop: 188 * <pre> {@code 189 * // ... 190 * phaser.register(); 191 * while (!phaser.isTerminated()) 192 * phaser.arriveAndAwaitAdvance();}</pre> 193 * 194 * <p>Related constructions may be used to await particular phase numbers 195 * in contexts where you are sure that the phase will never wrap around 196 * {@code Integer.MAX_VALUE}. For example: 197 * 198 * <pre> {@code 199 * void awaitPhase(Phaser phaser, int phase) { 200 * int p = phaser.register(); // assumes caller not already registered 201 * while (p < phase) { 202 * if (phaser.isTerminated()) 203 * // ... deal with unexpected termination 204 * else 205 * p = phaser.arriveAndAwaitAdvance(); 206 * } 207 * phaser.arriveAndDeregister(); 208 * }}</pre> 209 * 210 * 211 * <p>To create a set of tasks using a tree of phasers, 212 * you could use code of the following form, assuming a 213 * Task class with a constructor accepting a phaser that 214 * it registers for upon construction: 215 * 216 * <pre> {@code 217 * void build(Task[] actions, int lo, int hi, Phaser ph) { 218 * if (hi - lo > TASKS_PER_PHASER) { 219 * for (int i = lo; i < hi; i += TASKS_PER_PHASER) { 220 * int j = Math.min(i + TASKS_PER_PHASER, hi); 221 * build(actions, i, j, new Phaser(ph)); 222 * } 223 * } else { 224 * for (int i = lo; i < hi; ++i) 225 * actions[i] = new Task(ph); 226 * // assumes new Task(ph) performs ph.register() 227 * } 228 * } 229 * // .. initially called, for n tasks via 230 * build(new Task[n], 0, n, new Phaser());}</pre> 231 * 232 * The best value of {@code TASKS_PER_PHASER} depends mainly on 233 * expected barrier synchronization rates. A value as low as four may 234 * be appropriate for extremely small per-barrier task bodies (thus 235 * high rates), or up to hundreds for extremely large ones. 236 * 237 * </pre> 238 * 239 * <p><b>Implementation notes</b>: This implementation restricts the 240 * maximum number of parties to 65535. Attempts to register additional 241 * parties result in {@code IllegalStateException}. However, you can and 242 * should create tiered phasers to accommodate arbitrarily large sets 243 * of participants. 244 * 245 * @since 1.7 246 * @author Doug Lea 247 */ 248 public class Phaser { 249 /* 250 * This class implements an extension of X10 "clocks". Thanks to 251 * Vijay Saraswat for the idea, and to Vivek Sarkar for 252 * enhancements to extend functionality. 253 */ 254 255 /** 256 * Barrier state representation. Conceptually, a barrier contains 257 * four values: 258 * 259 * * parties -- the number of parties to wait (16 bits) 260 * * unarrived -- the number of parties yet to hit barrier (16 bits) 261 * * phase -- the generation of the barrier (31 bits) 262 * * terminated -- set if barrier is terminated (1 bit) 263 * 264 * However, to efficiently maintain atomicity, these values are 265 * packed into a single (atomic) long. Termination uses the sign 266 * bit of 32 bit representation of phase, so phase is set to -1 on 267 * termination. Good performance relies on keeping state decoding 268 * and encoding simple, and keeping race windows short. 269 * 270 * Note: there are some cheats in arrive() that rely on unarrived 271 * count being lowest 16 bits. 272 */ 273 private volatile long state; 274 275 private static final int ushortMask = 0xffff; 276 private static final int phaseMask = 0x7fffffff; 277 278 private static int unarrivedOf(long s) { 279 return (int) (s & ushortMask); 280 } 281 282 private static int partiesOf(long s) { 283 return ((int) s) >>> 16; 284 } 285 286 private static int phaseOf(long s) { 287 return (int) (s >>> 32); 288 } 289 290 private static int arrivedOf(long s) { 291 return partiesOf(s) - unarrivedOf(s); 292 } 293 294 private static long stateFor(int phase, int parties, int unarrived) { 295 return ((((long) phase) << 32) | (((long) parties) << 16) | 296 (long) unarrived); 297 } 298 299 private static long trippedStateFor(int phase, int parties) { 300 long lp = (long) parties; 301 return (((long) phase) << 32) | (lp << 16) | lp; 302 } 303 304 /** 305 * Returns message string for bad bounds exceptions. 306 */ 307 private static String badBounds(int parties, int unarrived) { 308 return ("Attempt to set " + unarrived + 309 " unarrived of " + parties + " parties"); 310 } 311 312 /** 313 * The parent of this phaser, or null if none 314 */ 315 private final Phaser parent; 316 317 /** 318 * The root of phaser tree. Equals this if not in a tree. Used to 319 * support faster state push-down. 320 */ 321 private final Phaser root; 322 323 // Wait queues 324 325 /** 326 * Heads of Treiber stacks for waiting threads. To eliminate 327 * contention while releasing some threads while adding others, we 328 * use two of them, alternating across even and odd phases. 329 */ 330 private final AtomicReference<QNode> evenQ = new AtomicReference<QNode>(); 331 private final AtomicReference<QNode> oddQ = new AtomicReference<QNode>(); 332 333 private AtomicReference<QNode> queueFor(int phase) { 334 return ((phase & 1) == 0) ? evenQ : oddQ; 335 } 336 337 /** 338 * Returns current state, first resolving lagged propagation from 339 * root if necessary. 340 */ 341 private long getReconciledState() { 342 return (parent == null) ? state : reconcileState(); 343 } 344 345 /** 346 * Recursively resolves state. 347 */ 348 private long reconcileState() { 349 Phaser p = parent; 350 long s = state; 351 if (p != null) { 352 while (unarrivedOf(s) == 0 && phaseOf(s) != phaseOf(root.state)) { 353 long parentState = p.getReconciledState(); 354 int parentPhase = phaseOf(parentState); 355 int phase = phaseOf(s = state); 356 if (phase != parentPhase) { 357 long next = trippedStateFor(parentPhase, partiesOf(s)); 358 if (casState(s, next)) { 359 releaseWaiters(phase); 360 s = next; 361 } 362 } 363 } 364 } 365 return s; 366 } 367 368 /** 369 * Creates a new phaser without any initially registered parties, 370 * initial phase number 0, and no parent. Any thread using this 371 * phaser will need to first register for it. 372 */ 373 public Phaser() { 374 this(null); 375 } 376 377 /** 378 * Creates a new phaser with the given numbers of registered 379 * unarrived parties, initial phase number 0, and no parent. 380 * 381 * @param parties the number of parties required to trip barrier 382 * @throws IllegalArgumentException if parties less than zero 383 * or greater than the maximum number of parties supported 384 */ 385 public Phaser(int parties) { 386 this(null, parties); 387 } 388 389 /** 390 * Creates a new phaser with the given parent, without any 391 * initially registered parties. If parent is non-null this phaser 392 * is registered with the parent and its initial phase number is 393 * the same as that of parent phaser. 394 * 395 * @param parent the parent phaser 396 */ 397 public Phaser(Phaser parent) { 398 int phase = 0; 399 this.parent = parent; 400 if (parent != null) { 401 this.root = parent.root; 402 phase = parent.register(); 403 } 404 else 405 this.root = this; 406 this.state = trippedStateFor(phase, 0); 407 } 408 409 /** 410 * Creates a new phaser with the given parent and numbers of 411 * registered unarrived parties. If parent is non-null, this phaser 412 * is registered with the parent and its initial phase number is 413 * the same as that of parent phaser. 414 * 415 * @param parent the parent phaser 416 * @param parties the number of parties required to trip barrier 417 * @throws IllegalArgumentException if parties less than zero 418 * or greater than the maximum number of parties supported 419 */ 420 public Phaser(Phaser parent, int parties) { 421 if (parties < 0 || parties > ushortMask) 422 throw new IllegalArgumentException("Illegal number of parties"); 423 int phase = 0; 424 this.parent = parent; 425 if (parent != null) { 426 this.root = parent.root; 427 phase = parent.register(); 428 } 429 else 430 this.root = this; 431 this.state = trippedStateFor(phase, parties); 432 } 433 434 /** 435 * Adds a new unarrived party to this phaser. 436 * 437 * @return the arrival phase number to which this registration applied 438 * @throws IllegalStateException if attempting to register more 439 * than the maximum supported number of parties 440 */ 441 public int register() { 442 return doRegister(1); 443 } 444 445 /** 446 * Adds the given number of new unarrived parties to this phaser. 447 * 448 * @param parties the number of parties required to trip barrier 449 * @return the arrival phase number to which this registration applied 450 * @throws IllegalStateException if attempting to register more 451 * than the maximum supported number of parties 452 */ 453 public int bulkRegister(int parties) { 454 if (parties < 0) 455 throw new IllegalArgumentException(); 456 if (parties == 0) 457 return getPhase(); 458 return doRegister(parties); 459 } 460 461 /** 462 * Shared code for register, bulkRegister 463 */ 464 private int doRegister(int registrations) { 465 int phase; 466 for (;;) { 467 long s = getReconciledState(); 468 phase = phaseOf(s); 469 int unarrived = unarrivedOf(s) + registrations; 470 int parties = partiesOf(s) + registrations; 471 if (phase < 0) 472 break; 473 if (parties > ushortMask || unarrived > ushortMask) 474 throw new IllegalStateException(badBounds(parties, unarrived)); 475 if (phase == phaseOf(root.state) && 476 casState(s, stateFor(phase, parties, unarrived))) 477 break; 478 } 479 return phase; 480 } 481 482 /** 483 * Arrives at the barrier, but does not wait for others. (You can 484 * in turn wait for others via {@link #awaitAdvance}). It is an 485 * unenforced usage error for an unregistered party to invoke this 486 * method. 487 * 488 * @return the arrival phase number, or a negative value if terminated 489 * @throws IllegalStateException if not terminated and the number 490 * of unarrived parties would become negative 491 */ 492 public int arrive() { 493 int phase; 494 for (;;) { 495 long s = state; 496 phase = phaseOf(s); 497 if (phase < 0) 498 break; 499 int parties = partiesOf(s); 500 int unarrived = unarrivedOf(s) - 1; 501 if (unarrived > 0) { // Not the last arrival 502 if (casState(s, s - 1)) // s-1 adds one arrival 503 break; 504 } 505 else if (unarrived == 0) { // the last arrival 506 Phaser par = parent; 507 if (par == null) { // directly trip 508 if (casState 509 (s, 510 trippedStateFor(onAdvance(phase, parties) ? -1 : 511 ((phase + 1) & phaseMask), parties))) { 512 releaseWaiters(phase); 513 break; 514 } 515 } 516 else { // cascade to parent 517 if (casState(s, s - 1)) { // zeroes unarrived 518 par.arrive(); 519 reconcileState(); 520 break; 521 } 522 } 523 } 524 else if (phase != phaseOf(root.state)) // or if unreconciled 525 reconcileState(); 526 else 527 throw new IllegalStateException(badBounds(parties, unarrived)); 528 } 529 return phase; 530 } 531 532 /** 533 * Arrives at the barrier and deregisters from it without waiting 534 * for others. Deregistration reduces the number of parties 535 * required to trip the barrier in future phases. If this phaser 536 * has a parent, and deregistration causes this phaser to have 537 * zero parties, this phaser also arrives at and is deregistered 538 * from its parent. It is an unenforced usage error for an 539 * unregistered party to invoke this method. 540 * 541 * @return the arrival phase number, or a negative value if terminated 542 * @throws IllegalStateException if not terminated and the number 543 * of registered or unarrived parties would become negative 544 */ 545 public int arriveAndDeregister() { 546 // similar code to arrive, but too different to merge 547 Phaser par = parent; 548 int phase; 549 for (;;) { 550 long s = state; 551 phase = phaseOf(s); 552 if (phase < 0) 553 break; 554 int parties = partiesOf(s) - 1; 555 int unarrived = unarrivedOf(s) - 1; 556 if (parties >= 0) { 557 if (unarrived > 0 || (unarrived == 0 && par != null)) { 558 if (casState 559 (s, 560 stateFor(phase, parties, unarrived))) { 561 if (unarrived == 0) { 562 par.arriveAndDeregister(); 563 reconcileState(); 564 } 565 break; 566 } 567 continue; 568 } 569 if (unarrived == 0) { 570 if (casState 571 (s, 572 trippedStateFor(onAdvance(phase, parties) ? -1 : 573 ((phase + 1) & phaseMask), parties))) { 574 releaseWaiters(phase); 575 break; 576 } 577 continue; 578 } 579 if (par != null && phase != phaseOf(root.state)) { 580 reconcileState(); 581 continue; 582 } 583 } 584 throw new IllegalStateException(badBounds(parties, unarrived)); 585 } 586 return phase; 587 } 588 589 /** 590 * Arrives at the barrier and awaits others. Equivalent in effect 591 * to {@code awaitAdvance(arrive())}. If you need to await with 592 * interruption or timeout, you can arrange this with an analogous 593 * construction using one of the other forms of the awaitAdvance 594 * method. If instead you need to deregister upon arrival use 595 * {@code arriveAndDeregister}. It is an unenforced usage error 596 * for an unregistered party to invoke this method. 597 * 598 * @return the arrival phase number, or a negative number if terminated 599 * @throws IllegalStateException if not terminated and the number 600 * of unarrived parties would become negative 601 */ 602 public int arriveAndAwaitAdvance() { 603 return awaitAdvance(arrive()); 604 } 605 606 /** 607 * Awaits the phase of the barrier to advance from the given phase 608 * value, returning immediately if the current phase of the 609 * barrier is not equal to the given phase value or this barrier 610 * is terminated. It is an unenforced usage error for an 611 * unregistered party to invoke this method. 612 * 613 * @param phase an arrival phase number, or negative value if 614 * terminated; this argument is normally the value returned by a 615 * previous call to {@code arrive} or its variants 616 * @return the next arrival phase number, or a negative value 617 * if terminated or argument is negative 618 */ 619 public int awaitAdvance(int phase) { 620 if (phase < 0) 621 return phase; 622 long s = getReconciledState(); 623 int p = phaseOf(s); 624 if (p != phase) 625 return p; 626 if (unarrivedOf(s) == 0 && parent != null) 627 parent.awaitAdvance(phase); 628 // Fall here even if parent waited, to reconcile and help release 629 return untimedWait(phase); 630 } 631 632 /** 633 * Awaits the phase of the barrier to advance from the given phase 634 * value, throwing {@code InterruptedException} if interrupted 635 * while waiting, or returning immediately if the current phase of 636 * the barrier is not equal to the given phase value or this 637 * barrier is terminated. It is an unenforced usage error for an 638 * unregistered party to invoke this method. 639 * 640 * @param phase an arrival phase number, or negative value if 641 * terminated; this argument is normally the value returned by a 642 * previous call to {@code arrive} or its variants 643 * @return the next arrival phase number, or a negative value 644 * if terminated or argument is negative 645 * @throws InterruptedException if thread interrupted while waiting 646 */ 647 public int awaitAdvanceInterruptibly(int phase) 648 throws InterruptedException { 649 if (phase < 0) 650 return phase; 651 long s = getReconciledState(); 652 int p = phaseOf(s); 653 if (p != phase) 654 return p; 655 if (unarrivedOf(s) == 0 && parent != null) 656 parent.awaitAdvanceInterruptibly(phase); 657 return interruptibleWait(phase); 658 } 659 660 /** 661 * Awaits the phase of the barrier to advance from the given phase 662 * value or the given timeout to elapse, throwing {@code 663 * InterruptedException} if interrupted while waiting, or 664 * returning immediately if the current phase of the barrier is 665 * not equal to the given phase value or this barrier is 666 * terminated. It is an unenforced usage error for an 667 * unregistered party to invoke this method. 668 * 669 * @param phase an arrival phase number, or negative value if 670 * terminated; this argument is normally the value returned by a 671 * previous call to {@code arrive} or its variants 672 * @param timeout how long to wait before giving up, in units of 673 * {@code unit} 674 * @param unit a {@code TimeUnit} determining how to interpret the 675 * {@code timeout} parameter 676 * @return the next arrival phase number, or a negative value 677 * if terminated or argument is negative 678 * @throws InterruptedException if thread interrupted while waiting 679 * @throws TimeoutException if timed out while waiting 680 */ 681 public int awaitAdvanceInterruptibly(int phase, 682 long timeout, TimeUnit unit) 683 throws InterruptedException, TimeoutException { 684 if (phase < 0) 685 return phase; 686 long s = getReconciledState(); 687 int p = phaseOf(s); 688 if (p != phase) 689 return p; 690 if (unarrivedOf(s) == 0 && parent != null) 691 parent.awaitAdvanceInterruptibly(phase, timeout, unit); 692 return timedWait(phase, unit.toNanos(timeout)); 693 } 694 695 /** 696 * Forces this barrier to enter termination state. Counts of 697 * arrived and registered parties are unaffected. If this phaser 698 * has a parent, it too is terminated. This method may be useful 699 * for coordinating recovery after one or more tasks encounter 700 * unexpected exceptions. 701 */ 702 public void forceTermination() { 703 for (;;) { 704 long s = getReconciledState(); 705 int phase = phaseOf(s); 706 int parties = partiesOf(s); 707 int unarrived = unarrivedOf(s); 708 if (phase < 0 || 709 casState(s, stateFor(-1, parties, unarrived))) { 710 releaseWaiters(0); 711 releaseWaiters(1); 712 if (parent != null) 713 parent.forceTermination(); 714 return; 715 } 716 } 717 } 718 719 /** 720 * Returns the current phase number. The maximum phase number is 721 * {@code Integer.MAX_VALUE}, after which it restarts at 722 * zero. Upon termination, the phase number is negative. 723 * 724 * @return the phase number, or a negative value if terminated 725 */ 726 public final int getPhase() { 727 return phaseOf(getReconciledState()); 728 } 729 730 /** 731 * Returns the number of parties registered at this barrier. 732 * 733 * @return the number of parties 734 */ 735 public int getRegisteredParties() { 736 return partiesOf(state); 737 } 738 739 /** 740 * Returns the number of registered parties that have arrived at 741 * the current phase of this barrier. 742 * 743 * @return the number of arrived parties 744 */ 745 public int getArrivedParties() { 746 return arrivedOf(state); 747 } 748 749 /** 750 * Returns the number of registered parties that have not yet 751 * arrived at the current phase of this barrier. 752 * 753 * @return the number of unarrived parties 754 */ 755 public int getUnarrivedParties() { 756 return unarrivedOf(state); 757 } 758 759 /** 760 * Returns the parent of this phaser, or {@code null} if none. 761 * 762 * @return the parent of this phaser, or {@code null} if none 763 */ 764 public Phaser getParent() { 765 return parent; 766 } 767 768 /** 769 * Returns the root ancestor of this phaser, which is the same as 770 * this phaser if it has no parent. 771 * 772 * @return the root ancestor of this phaser 773 */ 774 public Phaser getRoot() { 775 return root; 776 } 777 778 /** 779 * Returns {@code true} if this barrier has been terminated. 780 * 781 * @return {@code true} if this barrier has been terminated 782 */ 783 public boolean isTerminated() { 784 return getPhase() < 0; 785 } 786 787 /** 788 * Overridable method to perform an action upon impending phase 789 * advance, and to control termination. This method is invoked 790 * upon arrival of the party tripping the barrier (when all other 791 * waiting parties are dormant). If this method returns {@code 792 * true}, then, rather than advance the phase number, this barrier 793 * will be set to a final termination state, and subsequent calls 794 * to {@link #isTerminated} will return true. Any (unchecked) 795 * Exception or Error thrown by an invocation of this method is 796 * propagated to the party attempting to trip the barrier, in 797 * which case no advance occurs. 798 * 799 * <p>The arguments to this method provide the state of the phaser 800 * prevailing for the current transition. (When called from within 801 * an implementation of {@code onAdvance} the values returned by 802 * methods such as {@code getPhase} may or may not reliably 803 * indicate the state to which this transition applies.) 804 * 805 * <p>The default version returns {@code true} when the number of 806 * registered parties is zero. Normally, overrides that arrange 807 * termination for other reasons should also preserve this 808 * property. 809 * 810 * <p>You may override this method to perform an action with side 811 * effects visible to participating tasks, but it is only sensible 812 * to do so in designs where all parties register before any 813 * arrive, and all {@link #awaitAdvance} at each phase. 814 * Otherwise, you cannot ensure lack of interference from other 815 * parties during the invocation of this method. Additionally, 816 * method {@code onAdvance} may be invoked more than once per 817 * transition if registrations are intermixed with arrivals. 818 * 819 * @param phase the phase number on entering the barrier 820 * @param registeredParties the current number of registered parties 821 * @return {@code true} if this barrier should terminate 822 */ 823 protected boolean onAdvance(int phase, int registeredParties) { 824 return registeredParties <= 0; 825 } 826 827 /** 828 * Returns a string identifying this phaser, as well as its 829 * state. The state, in brackets, includes the String {@code 830 * "phase = "} followed by the phase number, {@code "parties = "} 831 * followed by the number of registered parties, and {@code 832 * "arrived = "} followed by the number of arrived parties. 833 * 834 * @return a string identifying this barrier, as well as its state 835 */ 836 public String toString() { 837 long s = getReconciledState(); 838 return super.toString() + 839 "[phase = " + phaseOf(s) + 840 " parties = " + partiesOf(s) + 841 " arrived = " + arrivedOf(s) + "]"; 842 } 843 844 // methods for waiting 845 846 /** 847 * Wait nodes for Treiber stack representing wait queue 848 */ 849 static final class QNode implements ForkJoinPool.ManagedBlocker { 850 final Phaser phaser; 851 final int phase; 852 final long startTime; 853 final long nanos; 854 final boolean timed; 855 final boolean interruptible; 856 volatile boolean wasInterrupted = false; 857 volatile Thread thread; // nulled to cancel wait 858 QNode next; 859 QNode(Phaser phaser, int phase, boolean interruptible, 860 boolean timed, long startTime, long nanos) { 861 this.phaser = phaser; 862 this.phase = phase; 863 this.timed = timed; 864 this.interruptible = interruptible; 865 this.startTime = startTime; 866 this.nanos = nanos; 867 thread = Thread.currentThread(); 868 } 869 public boolean isReleasable() { 870 return (thread == null || 871 phaser.getPhase() != phase || 872 (interruptible && wasInterrupted) || 873 (timed && (nanos - (System.nanoTime() - startTime)) <= 0)); 874 } 875 public boolean block() { 876 if (Thread.interrupted()) { 877 wasInterrupted = true; 878 if (interruptible) 879 return true; 880 } 881 if (!timed) 882 LockSupport.park(this); 883 else { 884 long waitTime = nanos - (System.nanoTime() - startTime); 885 if (waitTime <= 0) 886 return true; 887 LockSupport.parkNanos(this, waitTime); 888 } 889 return isReleasable(); 890 } 891 void signal() { 892 Thread t = thread; 893 if (t != null) { 894 thread = null; 895 LockSupport.unpark(t); 896 } 897 } 898 boolean doWait() { 899 if (thread != null) { 900 try { 901 ForkJoinPool.managedBlock(this); 902 } catch (InterruptedException ie) { 903 } 904 } 905 return wasInterrupted; 906 } 907 908 } 909 910 /** 911 * Removes and signals waiting threads from wait queue. 912 */ 913 private void releaseWaiters(int phase) { 914 AtomicReference<QNode> head = queueFor(phase); 915 QNode q; 916 while ((q = head.get()) != null) { 917 if (head.compareAndSet(q, q.next)) 918 q.signal(); 919 } 920 } 921 922 /** 923 * Tries to enqueue given node in the appropriate wait queue. 924 * 925 * @return true if successful 926 */ 927 private boolean tryEnqueue(QNode node) { 928 AtomicReference<QNode> head = queueFor(node.phase); 929 return head.compareAndSet(node.next = head.get(), node); 930 } 931 932 /** 933 * Enqueues node and waits unless aborted or signalled. 934 * 935 * @return current phase 936 */ 937 private int untimedWait(int phase) { 938 QNode node = null; 939 boolean queued = false; 940 boolean interrupted = false; 941 int p; 942 while ((p = getPhase()) == phase) { 943 if (Thread.interrupted()) 944 interrupted = true; 945 else if (node == null) 946 node = new QNode(this, phase, false, false, 0, 0); 947 else if (!queued) 948 queued = tryEnqueue(node); 949 else 950 interrupted = node.doWait(); 951 } 952 if (node != null) 953 node.thread = null; 954 releaseWaiters(phase); 955 if (interrupted) 956 Thread.currentThread().interrupt(); 957 return p; 958 } 959 960 /** 961 * Interruptible version 962 * @return current phase 963 */ 964 private int interruptibleWait(int phase) throws InterruptedException { 965 QNode node = null; 966 boolean queued = false; 967 boolean interrupted = false; 968 int p; 969 while ((p = getPhase()) == phase && !interrupted) { 970 if (Thread.interrupted()) 971 interrupted = true; 972 else if (node == null) 973 node = new QNode(this, phase, true, false, 0, 0); 974 else if (!queued) 975 queued = tryEnqueue(node); 976 else 977 interrupted = node.doWait(); 978 } 979 if (node != null) 980 node.thread = null; 981 if (p != phase || (p = getPhase()) != phase) 982 releaseWaiters(phase); 983 if (interrupted) 984 throw new InterruptedException(); 985 return p; 986 } 987 988 /** 989 * Timeout version. 990 * @return current phase 991 */ 992 private int timedWait(int phase, long nanos) 993 throws InterruptedException, TimeoutException { 994 long startTime = System.nanoTime(); 995 QNode node = null; 996 boolean queued = false; 997 boolean interrupted = false; 998 int p; 999 while ((p = getPhase()) == phase && !interrupted) { 1000 if (Thread.interrupted()) 1001 interrupted = true; 1002 else if (nanos - (System.nanoTime() - startTime) <= 0) 1003 break; 1004 else if (node == null) 1005 node = new QNode(this, phase, true, true, startTime, nanos); 1006 else if (!queued) 1007 queued = tryEnqueue(node); 1008 else 1009 interrupted = node.doWait(); 1010 } 1011 if (node != null) 1012 node.thread = null; 1013 if (p != phase || (p = getPhase()) != phase) 1014 releaseWaiters(phase); 1015 if (interrupted) 1016 throw new InterruptedException(); 1017 if (p == phase) 1018 throw new TimeoutException(); 1019 return p; 1020 } 1021 1022 // Unsafe mechanics 1023 1024 private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe(); 1025 private static final long stateOffset = 1026 objectFieldOffset("state", Phaser.class); 1027 1028 private final boolean casState(long cmp, long val) { 1029 return UNSAFE.compareAndSwapLong(this, stateOffset, cmp, val); 1030 } 1031 1032 private static long objectFieldOffset(String field, Class<?> klazz) { 1033 try { 1034 return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field)); 1035 } catch (NoSuchFieldException e) { 1036 // Convert Exception to corresponding Error 1037 NoSuchFieldError error = new NoSuchFieldError(field); 1038 error.initCause(e); 1039 throw error; 1040 } 1041 } 1042 } --- EOF ---