Print this page
Split |
Close |
Expand all |
Collapse all |
--- old/src/share/classes/java/util/concurrent/Phaser.java
+++ new/src/share/classes/java/util/concurrent/Phaser.java
1 1 /*
2 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3 3 *
4 4 * This code is free software; you can redistribute it and/or modify it
5 5 * under the terms of the GNU General Public License version 2 only, as
6 6 * published by the Free Software Foundation. Oracle designates this
7 7 * particular file as subject to the "Classpath" exception as provided
8 8 * by Oracle in the LICENSE file that accompanied this code.
9 9 *
10 10 * This code is distributed in the hope that it will be useful, but WITHOUT
11 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 13 * version 2 for more details (a copy is included in the LICENSE file that
14 14 * accompanied this code).
15 15 *
16 16 * You should have received a copy of the GNU General Public License version
17 17 * 2 along with this work; if not, write to the Free Software Foundation,
18 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19 19 *
20 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21 21 * or visit www.oracle.com if you need additional information or have any
22 22 * questions.
23 23 */
24 24
25 25 /*
26 26 * This file is available under and governed by the GNU General Public
27 27 * License version 2 only, as published by the Free Software Foundation.
↓ open down ↓ |
27 lines elided |
↑ open up ↑ |
28 28 * However, the following notice accompanied the original version of this
29 29 * file:
30 30 *
31 31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 32 * Expert Group and released to the public domain, as explained at
33 33 * http://creativecommons.org/licenses/publicdomain
34 34 */
35 35
36 36 package java.util.concurrent;
37 37
38 +import java.util.concurrent.TimeUnit;
39 +import java.util.concurrent.TimeoutException;
38 40 import java.util.concurrent.atomic.AtomicReference;
39 41 import java.util.concurrent.locks.LockSupport;
40 42
41 43 /**
42 44 * A reusable synchronization barrier, similar in functionality to
43 45 * {@link java.util.concurrent.CyclicBarrier CyclicBarrier} and
44 46 * {@link java.util.concurrent.CountDownLatch CountDownLatch}
45 47 * but supporting more flexible usage.
46 48 *
47 49 * <p> <b>Registration.</b> Unlike the case for other barriers, the
48 50 * number of parties <em>registered</em> to synchronize on a phaser
49 51 * may vary over time. Tasks may be registered at any time (using
50 52 * methods {@link #register}, {@link #bulkRegister}, or forms of
51 53 * constructors establishing initial numbers of parties), and
52 54 * optionally deregistered upon any arrival (using {@link
53 55 * #arriveAndDeregister}). As is the case with most basic
↓ open down ↓ |
6 lines elided |
↑ open up ↑ |
54 56 * synchronization constructs, registration and deregistration affect
55 57 * only internal counts; they do not establish any further internal
56 58 * bookkeeping, so tasks cannot query whether they are registered.
57 59 * (However, you can introduce such bookkeeping by subclassing this
58 60 * class.)
59 61 *
60 62 * <p> <b>Synchronization.</b> Like a {@code CyclicBarrier}, a {@code
61 63 * Phaser} may be repeatedly awaited. Method {@link
62 64 * #arriveAndAwaitAdvance} has effect analogous to {@link
63 65 * java.util.concurrent.CyclicBarrier#await CyclicBarrier.await}. Each
64 - * generation of a {@code Phaser} has an associated phase number. The
65 - * phase number starts at zero, and advances when all parties arrive
66 - * at the barrier, wrapping around to zero after reaching {@code
66 + * generation of a phaser has an associated phase number. The phase
67 + * number starts at zero, and advances when all parties arrive at the
68 + * phaser, wrapping around to zero after reaching {@code
67 69 * Integer.MAX_VALUE}. The use of phase numbers enables independent
68 - * control of actions upon arrival at a barrier and upon awaiting
70 + * control of actions upon arrival at a phaser and upon awaiting
69 71 * others, via two kinds of methods that may be invoked by any
70 72 * registered party:
71 73 *
72 74 * <ul>
73 75 *
74 76 * <li> <b>Arrival.</b> Methods {@link #arrive} and
75 - * {@link #arriveAndDeregister} record arrival at a
76 - * barrier. These methods do not block, but return an associated
77 - * <em>arrival phase number</em>; that is, the phase number of
78 - * the barrier to which the arrival applied. When the final
79 - * party for a given phase arrives, an optional barrier action
80 - * is performed and the phase advances. Barrier actions,
81 - * performed by the party triggering a phase advance, are
82 - * arranged by overriding method {@link #onAdvance(int, int)},
83 - * which also controls termination. Overriding this method is
84 - * similar to, but more flexible than, providing a barrier
85 - * action to a {@code CyclicBarrier}.
77 + * {@link #arriveAndDeregister} record arrival. These methods
78 + * do not block, but return an associated <em>arrival phase
79 + * number</em>; that is, the phase number of the phaser to which
80 + * the arrival applied. When the final party for a given phase
81 + * arrives, an optional action is performed and the phase
82 + * advances. These actions are performed by the party
83 + * triggering a phase advance, and are arranged by overriding
84 + * method {@link #onAdvance(int, int)}, which also controls
85 + * termination. Overriding this method is similar to, but more
86 + * flexible than, providing a barrier action to a {@code
87 + * CyclicBarrier}.
86 88 *
87 89 * <li> <b>Waiting.</b> Method {@link #awaitAdvance} requires an
88 90 * argument indicating an arrival phase number, and returns when
89 - * the barrier advances to (or is already at) a different phase.
91 + * the phaser advances to (or is already at) a different phase.
90 92 * Unlike similar constructions using {@code CyclicBarrier},
91 93 * method {@code awaitAdvance} continues to wait even if the
92 94 * waiting thread is interrupted. Interruptible and timeout
93 95 * versions are also available, but exceptions encountered while
94 96 * tasks wait interruptibly or with timeout do not change the
95 - * state of the barrier. If necessary, you can perform any
97 + * state of the phaser. If necessary, you can perform any
96 98 * associated recovery within handlers of those exceptions,
97 99 * often after invoking {@code forceTermination}. Phasers may
98 100 * also be used by tasks executing in a {@link ForkJoinPool},
99 101 * which will ensure sufficient parallelism to execute tasks
100 102 * when others are blocked waiting for a phase to advance.
101 103 *
102 104 * </ul>
103 105 *
104 - * <p> <b>Termination.</b> A {@code Phaser} may enter a
105 - * <em>termination</em> state in which all synchronization methods
106 - * immediately return without updating phaser state or waiting for
107 - * advance, and indicating (via a negative phase value) that execution
108 - * is complete. Termination is triggered when an invocation of {@code
109 - * onAdvance} returns {@code true}. As illustrated below, when
110 - * phasers control actions with a fixed number of iterations, it is
111 - * often convenient to override this method to cause termination when
112 - * the current phase number reaches a threshold. Method {@link
113 - * #forceTermination} is also available to abruptly release waiting
114 - * threads and allow them to terminate.
106 + * <p> <b>Termination.</b> A phaser may enter a <em>termination</em>
107 + * state in which all synchronization methods immediately return
108 + * without updating phaser state or waiting for advance, and
109 + * indicating (via a negative phase value) that execution is complete.
110 + * Termination is triggered when an invocation of {@code onAdvance}
111 + * returns {@code true}. The default implementation returns {@code
112 + * true} if a deregistration has caused the number of registered
113 + * parties to become zero. As illustrated below, when phasers control
114 + * actions with a fixed number of iterations, it is often convenient
115 + * to override this method to cause termination when the current phase
116 + * number reaches a threshold. Method {@link #forceTermination} is
117 + * also available to abruptly release waiting threads and allow them
118 + * to terminate.
115 119 *
116 - * <p> <b>Tiering.</b> Phasers may be <em>tiered</em> (i.e., arranged
117 - * in tree structures) to reduce contention. Phasers with large
118 - * numbers of parties that would otherwise experience heavy
120 + * <p> <b>Tiering.</b> Phasers may be <em>tiered</em> (i.e.,
121 + * constructed in tree structures) to reduce contention. Phasers with
122 + * large numbers of parties that would otherwise experience heavy
119 123 * synchronization contention costs may instead be set up so that
120 124 * groups of sub-phasers share a common parent. This may greatly
121 125 * increase throughput even though it incurs greater per-operation
122 126 * overhead.
123 127 *
124 128 * <p><b>Monitoring.</b> While synchronization methods may be invoked
125 129 * only by registered parties, the current state of a phaser may be
126 130 * monitored by any caller. At any given moment there are {@link
127 131 * #getRegisteredParties} parties in total, of which {@link
128 132 * #getArrivedParties} have arrived at the current phase ({@link
129 133 * #getPhase}). When the remaining ({@link #getUnarrivedParties})
130 134 * parties arrive, the phase advances. The values returned by these
131 135 * methods may reflect transient states and so are not in general
132 136 * useful for synchronization control. Method {@link #toString}
133 137 * returns snapshots of these state queries in a form convenient for
134 138 * informal monitoring.
135 139 *
136 140 * <p><b>Sample usages:</b>
137 141 *
138 142 * <p>A {@code Phaser} may be used instead of a {@code CountDownLatch}
139 - * to control a one-shot action serving a variable number of
140 - * parties. The typical idiom is for the method setting this up to
141 - * first register, then start the actions, then deregister, as in:
143 + * to control a one-shot action serving a variable number of parties.
144 + * The typical idiom is for the method setting this up to first
145 + * register, then start the actions, then deregister, as in:
142 146 *
143 147 * <pre> {@code
144 148 * void runTasks(List<Runnable> tasks) {
145 149 * final Phaser phaser = new Phaser(1); // "1" to register self
146 150 * // create and start threads
147 151 * for (Runnable task : tasks) {
148 152 * phaser.register();
149 153 * new Thread() {
150 154 * public void run() {
151 155 * phaser.arriveAndAwaitAdvance(); // await all creation
152 156 * task.run();
153 157 * }
154 158 * }.start();
155 159 * }
156 160 *
157 161 * // allow threads to start and deregister self
158 162 * phaser.arriveAndDeregister();
159 163 * }}</pre>
160 164 *
161 165 * <p>One way to cause a set of threads to repeatedly perform actions
162 166 * for a given number of iterations is to override {@code onAdvance}:
163 167 *
164 168 * <pre> {@code
165 169 * void startTasks(List<Runnable> tasks, final int iterations) {
166 170 * final Phaser phaser = new Phaser() {
167 171 * protected boolean onAdvance(int phase, int registeredParties) {
168 172 * return phase >= iterations || registeredParties == 0;
169 173 * }
170 174 * };
171 175 * phaser.register();
172 176 * for (final Runnable task : tasks) {
173 177 * phaser.register();
174 178 * new Thread() {
175 179 * public void run() {
176 180 * do {
177 181 * task.run();
178 182 * phaser.arriveAndAwaitAdvance();
179 183 * } while (!phaser.isTerminated());
180 184 * }
181 185 * }.start();
182 186 * }
183 187 * phaser.arriveAndDeregister(); // deregister self, don't wait
184 188 * }}</pre>
185 189 *
186 190 * If the main task must later await termination, it
187 191 * may re-register and then execute a similar loop:
188 192 * <pre> {@code
189 193 * // ...
190 194 * phaser.register();
191 195 * while (!phaser.isTerminated())
192 196 * phaser.arriveAndAwaitAdvance();}</pre>
193 197 *
194 198 * <p>Related constructions may be used to await particular phase numbers
195 199 * in contexts where you are sure that the phase will never wrap around
196 200 * {@code Integer.MAX_VALUE}. For example:
197 201 *
198 202 * <pre> {@code
199 203 * void awaitPhase(Phaser phaser, int phase) {
200 204 * int p = phaser.register(); // assumes caller not already registered
201 205 * while (p < phase) {
202 206 * if (phaser.isTerminated())
↓ open down ↓ |
51 lines elided |
↑ open up ↑ |
203 207 * // ... deal with unexpected termination
204 208 * else
205 209 * p = phaser.arriveAndAwaitAdvance();
206 210 * }
207 211 * phaser.arriveAndDeregister();
208 212 * }}</pre>
209 213 *
210 214 *
211 215 * <p>To create a set of tasks using a tree of phasers,
212 216 * you could use code of the following form, assuming a
213 - * Task class with a constructor accepting a phaser that
214 - * it registers for upon construction:
217 + * Task class with a constructor accepting a {@code Phaser} that
218 + * it registers with upon construction:
215 219 *
216 220 * <pre> {@code
217 221 * void build(Task[] actions, int lo, int hi, Phaser ph) {
218 222 * if (hi - lo > TASKS_PER_PHASER) {
219 223 * for (int i = lo; i < hi; i += TASKS_PER_PHASER) {
220 224 * int j = Math.min(i + TASKS_PER_PHASER, hi);
221 225 * build(actions, i, j, new Phaser(ph));
222 226 * }
223 227 * } else {
224 228 * for (int i = lo; i < hi; ++i)
225 229 * actions[i] = new Task(ph);
226 230 * // assumes new Task(ph) performs ph.register()
227 231 * }
228 232 * }
229 233 * // .. initially called, for n tasks via
230 234 * build(new Task[n], 0, n, new Phaser());}</pre>
231 235 *
232 236 * The best value of {@code TASKS_PER_PHASER} depends mainly on
233 - * expected barrier synchronization rates. A value as low as four may
234 - * be appropriate for extremely small per-barrier task bodies (thus
237 + * expected synchronization rates. A value as low as four may
238 + * be appropriate for extremely small per-phase task bodies (thus
235 239 * high rates), or up to hundreds for extremely large ones.
236 240 *
237 - * </pre>
238 - *
239 241 * <p><b>Implementation notes</b>: This implementation restricts the
240 242 * maximum number of parties to 65535. Attempts to register additional
241 243 * parties result in {@code IllegalStateException}. However, you can and
242 244 * should create tiered phasers to accommodate arbitrarily large sets
243 245 * of participants.
244 246 *
245 247 * @since 1.7
246 248 * @author Doug Lea
247 249 */
248 250 public class Phaser {
249 251 /*
250 252 * This class implements an extension of X10 "clocks". Thanks to
251 253 * Vijay Saraswat for the idea, and to Vivek Sarkar for
252 254 * enhancements to extend functionality.
253 255 */
254 256
255 257 /**
256 - * Barrier state representation. Conceptually, a barrier contains
257 - * four values:
258 + * Primary state representation, holding four fields:
258 259 *
259 - * * parties -- the number of parties to wait (16 bits)
260 - * * unarrived -- the number of parties yet to hit barrier (16 bits)
261 - * * phase -- the generation of the barrier (31 bits)
262 - * * terminated -- set if barrier is terminated (1 bit)
260 + * * unarrived -- the number of parties yet to hit barrier (bits 0-15)
261 + * * parties -- the number of parties to wait (bits 16-31)
262 + * * phase -- the generation of the barrier (bits 32-62)
263 + * * terminated -- set if barrier is terminated (bit 63 / sign)
263 264 *
264 265 * However, to efficiently maintain atomicity, these values are
265 266 * packed into a single (atomic) long. Termination uses the sign
266 267 * bit of 32 bit representation of phase, so phase is set to -1 on
267 268 * termination. Good performance relies on keeping state decoding
268 269 * and encoding simple, and keeping race windows short.
269 - *
270 - * Note: there are some cheats in arrive() that rely on unarrived
271 - * count being lowest 16 bits.
272 270 */
273 271 private volatile long state;
274 272
275 - private static final int ushortMask = 0xffff;
276 - private static final int phaseMask = 0x7fffffff;
273 + private static final int MAX_PARTIES = 0xffff;
274 + private static final int MAX_PHASE = 0x7fffffff;
275 + private static final int PARTIES_SHIFT = 16;
276 + private static final int PHASE_SHIFT = 32;
277 + private static final int UNARRIVED_MASK = 0xffff; // to mask ints
278 + private static final long PARTIES_MASK = 0xffff0000L; // to mask longs
279 + private static final long ONE_ARRIVAL = 1L;
280 + private static final long ONE_PARTY = 1L << PARTIES_SHIFT;
281 + private static final long TERMINATION_BIT = 1L << 63;
277 282
283 + // The following unpacking methods are usually manually inlined
284 +
278 285 private static int unarrivedOf(long s) {
279 - return (int) (s & ushortMask);
286 + return (int)s & UNARRIVED_MASK;
280 287 }
281 288
282 289 private static int partiesOf(long s) {
283 - return ((int) s) >>> 16;
290 + return (int)s >>> PARTIES_SHIFT;
284 291 }
285 292
286 293 private static int phaseOf(long s) {
287 - return (int) (s >>> 32);
294 + return (int) (s >>> PHASE_SHIFT);
288 295 }
289 296
290 297 private static int arrivedOf(long s) {
291 298 return partiesOf(s) - unarrivedOf(s);
292 299 }
293 300
294 - private static long stateFor(int phase, int parties, int unarrived) {
295 - return ((((long) phase) << 32) | (((long) parties) << 16) |
296 - (long) unarrived);
297 - }
298 -
299 - private static long trippedStateFor(int phase, int parties) {
300 - long lp = (long) parties;
301 - return (((long) phase) << 32) | (lp << 16) | lp;
302 - }
303 -
304 301 /**
305 - * Returns message string for bad bounds exceptions.
306 - */
307 - private static String badBounds(int parties, int unarrived) {
308 - return ("Attempt to set " + unarrived +
309 - " unarrived of " + parties + " parties");
310 - }
311 -
312 - /**
313 302 * The parent of this phaser, or null if none
314 303 */
315 304 private final Phaser parent;
316 305
317 306 /**
318 307 * The root of phaser tree. Equals this if not in a tree. Used to
319 308 * support faster state push-down.
320 309 */
321 310 private final Phaser root;
322 311
323 - // Wait queues
324 -
325 312 /**
326 313 * Heads of Treiber stacks for waiting threads. To eliminate
327 - * contention while releasing some threads while adding others, we
314 + * contention when releasing some threads while adding others, we
328 315 * use two of them, alternating across even and odd phases.
316 + * Subphasers share queues with root to speed up releases.
329 317 */
330 - private final AtomicReference<QNode> evenQ = new AtomicReference<QNode>();
331 - private final AtomicReference<QNode> oddQ = new AtomicReference<QNode>();
318 + private final AtomicReference<QNode> evenQ;
319 + private final AtomicReference<QNode> oddQ;
332 320
333 321 private AtomicReference<QNode> queueFor(int phase) {
334 322 return ((phase & 1) == 0) ? evenQ : oddQ;
335 323 }
336 324
337 325 /**
338 - * Returns current state, first resolving lagged propagation from
339 - * root if necessary.
326 + * Returns message string for bounds exceptions on arrival.
340 327 */
341 - private long getReconciledState() {
342 - return (parent == null) ? state : reconcileState();
328 + private String badArrive(long s) {
329 + return "Attempted arrival of unregistered party for " +
330 + stateToString(s);
343 331 }
344 332
345 333 /**
346 - * Recursively resolves state.
334 + * Returns message string for bounds exceptions on registration.
347 335 */
348 - private long reconcileState() {
349 - Phaser p = parent;
350 - long s = state;
351 - if (p != null) {
352 - while (unarrivedOf(s) == 0 && phaseOf(s) != phaseOf(root.state)) {
353 - long parentState = p.getReconciledState();
354 - int parentPhase = phaseOf(parentState);
355 - int phase = phaseOf(s = state);
356 - if (phase != parentPhase) {
357 - long next = trippedStateFor(parentPhase, partiesOf(s));
358 - if (casState(s, next)) {
336 + private String badRegister(long s) {
337 + return "Attempt to register more than " +
338 + MAX_PARTIES + " parties for " + stateToString(s);
339 + }
340 +
341 + /**
342 + * Main implementation for methods arrive and arriveAndDeregister.
343 + * Manually tuned to speed up and minimize race windows for the
344 + * common case of just decrementing unarrived field.
345 + *
346 + * @param adj - adjustment to apply to state -- either
347 + * ONE_ARRIVAL (for arrive) or
348 + * ONE_ARRIVAL|ONE_PARTY (for arriveAndDeregister)
349 + */
350 + private int doArrive(long adj) {
351 + for (;;) {
352 + long s = state;
353 + int unarrived = (int)s & UNARRIVED_MASK;
354 + int phase = (int)(s >>> PHASE_SHIFT);
355 + if (phase < 0)
356 + return phase;
357 + else if (unarrived == 0) {
358 + if (reconcileState() == s) // recheck
359 + throw new IllegalStateException(badArrive(s));
360 + }
361 + else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adj)) {
362 + if (unarrived == 1) {
363 + long p = s & PARTIES_MASK; // unshifted parties field
364 + long lu = p >>> PARTIES_SHIFT;
365 + int u = (int)lu;
366 + int nextPhase = (phase + 1) & MAX_PHASE;
367 + long next = ((long)nextPhase << PHASE_SHIFT) | p | lu;
368 + final Phaser parent = this.parent;
369 + if (parent == null) {
370 + if (onAdvance(phase, u))
371 + next |= TERMINATION_BIT;
372 + UNSAFE.compareAndSwapLong(this, stateOffset, s, next);
359 373 releaseWaiters(phase);
360 - s = next;
361 374 }
375 + else {
376 + parent.doArrive((u == 0) ?
377 + ONE_ARRIVAL|ONE_PARTY : ONE_ARRIVAL);
378 + if ((int)(parent.state >>> PHASE_SHIFT) != nextPhase)
379 + reconcileState();
380 + else if (state == s)
381 + UNSAFE.compareAndSwapLong(this, stateOffset, s,
382 + next);
383 + }
362 384 }
385 + return phase;
363 386 }
364 387 }
388 + }
389 +
390 + /**
391 + * Implementation of register, bulkRegister
392 + *
393 + * @param registrations number to add to both parties and
394 + * unarrived fields. Must be greater than zero.
395 + */
396 + private int doRegister(int registrations) {
397 + // adjustment to state
398 + long adj = ((long)registrations << PARTIES_SHIFT) | registrations;
399 + final Phaser parent = this.parent;
400 + for (;;) {
401 + long s = (parent == null) ? state : reconcileState();
402 + int parties = (int)s >>> PARTIES_SHIFT;
403 + int phase = (int)(s >>> PHASE_SHIFT);
404 + if (phase < 0)
405 + return phase;
406 + else if (registrations > MAX_PARTIES - parties)
407 + throw new IllegalStateException(badRegister(s));
408 + else if ((parties == 0 && parent == null) || // first reg of root
409 + ((int)s & UNARRIVED_MASK) != 0) { // not advancing
410 + if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s + adj))
411 + return phase;
412 + }
413 + else if (parties != 0) // wait for onAdvance
414 + root.internalAwaitAdvance(phase, null);
415 + else { // 1st registration of child
416 + synchronized (this) { // register parent first
417 + if (reconcileState() == s) { // recheck under lock
418 + parent.doRegister(1); // OK if throws IllegalState
419 + for (;;) { // simpler form of outer loop
420 + s = reconcileState();
421 + phase = (int)(s >>> PHASE_SHIFT);
422 + if (phase < 0 ||
423 + UNSAFE.compareAndSwapLong(this, stateOffset,
424 + s, s + adj))
425 + return phase;
426 + }
427 + }
428 + }
429 + }
430 + }
431 + }
432 +
433 + /**
434 + * Recursively resolves lagged phase propagation from root if necessary.
435 + */
436 + private long reconcileState() {
437 + Phaser par = parent;
438 + long s = state;
439 + if (par != null) {
440 + Phaser rt = root;
441 + int phase, rPhase;
442 + while ((phase = (int)(s >>> PHASE_SHIFT)) >= 0 &&
443 + (rPhase = (int)(rt.state >>> PHASE_SHIFT)) != phase) {
444 + if (par != rt && (int)(par.state >>> PHASE_SHIFT) != rPhase)
445 + par.reconcileState();
446 + else if (rPhase < 0 || ((int)s & UNARRIVED_MASK) == 0) {
447 + long u = s & PARTIES_MASK; // reset unarrived to parties
448 + long next = ((((long) rPhase) << PHASE_SHIFT) | u |
449 + (u >>> PARTIES_SHIFT));
450 + UNSAFE.compareAndSwapLong(this, stateOffset, s, next);
451 + }
452 + s = state;
453 + }
454 + }
365 455 return s;
366 456 }
367 457
368 458 /**
369 - * Creates a new phaser without any initially registered parties,
370 - * initial phase number 0, and no parent. Any thread using this
459 + * Creates a new phaser with no initially registered parties, no
460 + * parent, and initial phase number 0. Any thread using this
371 461 * phaser will need to first register for it.
372 462 */
373 463 public Phaser() {
374 - this(null);
464 + this(null, 0);
375 465 }
376 466
377 467 /**
378 - * Creates a new phaser with the given numbers of registered
379 - * unarrived parties, initial phase number 0, and no parent.
468 + * Creates a new phaser with the given number of registered
469 + * unarrived parties, no parent, and initial phase number 0.
380 470 *
381 - * @param parties the number of parties required to trip barrier
471 + * @param parties the number of parties required to advance to the
472 + * next phase
382 473 * @throws IllegalArgumentException if parties less than zero
383 474 * or greater than the maximum number of parties supported
384 475 */
385 476 public Phaser(int parties) {
386 477 this(null, parties);
387 478 }
388 479
389 480 /**
390 - * Creates a new phaser with the given parent, without any
391 - * initially registered parties. If parent is non-null this phaser
392 - * is registered with the parent and its initial phase number is
393 - * the same as that of parent phaser.
481 + * Equivalent to {@link #Phaser(Phaser, int) Phaser(parent, 0)}.
394 482 *
395 483 * @param parent the parent phaser
396 484 */
397 485 public Phaser(Phaser parent) {
398 - int phase = 0;
399 - this.parent = parent;
400 - if (parent != null) {
401 - this.root = parent.root;
402 - phase = parent.register();
403 - }
404 - else
405 - this.root = this;
406 - this.state = trippedStateFor(phase, 0);
486 + this(parent, 0);
407 487 }
408 488
409 489 /**
410 - * Creates a new phaser with the given parent and numbers of
411 - * registered unarrived parties. If parent is non-null, this phaser
412 - * is registered with the parent and its initial phase number is
413 - * the same as that of parent phaser.
490 + * Creates a new phaser with the given parent and number of
491 + * registered unarrived parties. Registration and deregistration
492 + * of this child phaser with its parent are managed automatically.
493 + * If the given parent is non-null, whenever this child phaser has
494 + * any registered parties (as established in this constructor,
495 + * {@link #register}, or {@link #bulkRegister}), this child phaser
496 + * is registered with its parent. Whenever the number of
497 + * registered parties becomes zero as the result of an invocation
498 + * of {@link #arriveAndDeregister}, this child phaser is
499 + * deregistered from its parent.
414 500 *
415 501 * @param parent the parent phaser
416 - * @param parties the number of parties required to trip barrier
502 + * @param parties the number of parties required to advance to the
503 + * next phase
417 504 * @throws IllegalArgumentException if parties less than zero
418 505 * or greater than the maximum number of parties supported
419 506 */
420 507 public Phaser(Phaser parent, int parties) {
421 - if (parties < 0 || parties > ushortMask)
508 + if (parties >>> PARTIES_SHIFT != 0)
422 509 throw new IllegalArgumentException("Illegal number of parties");
423 - int phase = 0;
510 + long s = ((long) parties) | (((long) parties) << PARTIES_SHIFT);
424 511 this.parent = parent;
425 512 if (parent != null) {
426 - this.root = parent.root;
427 - phase = parent.register();
513 + Phaser r = parent.root;
514 + this.root = r;
515 + this.evenQ = r.evenQ;
516 + this.oddQ = r.oddQ;
517 + if (parties != 0)
518 + s |= ((long)(parent.doRegister(1))) << PHASE_SHIFT;
428 519 }
429 - else
520 + else {
430 521 this.root = this;
431 - this.state = trippedStateFor(phase, parties);
522 + this.evenQ = new AtomicReference<QNode>();
523 + this.oddQ = new AtomicReference<QNode>();
524 + }
525 + this.state = s;
432 526 }
433 527
434 528 /**
435 - * Adds a new unarrived party to this phaser.
529 + * Adds a new unarrived party to this phaser. If an ongoing
530 + * invocation of {@link #onAdvance} is in progress, this method
531 + * may await its completion before returning. If this phaser has
532 + * a parent, and this phaser previously had no registered parties,
533 + * this phaser is also registered with its parent.
436 534 *
437 535 * @return the arrival phase number to which this registration applied
438 536 * @throws IllegalStateException if attempting to register more
439 537 * than the maximum supported number of parties
440 538 */
441 539 public int register() {
442 540 return doRegister(1);
443 541 }
444 542
445 543 /**
446 544 * Adds the given number of new unarrived parties to this phaser.
545 + * If an ongoing invocation of {@link #onAdvance} is in progress,
546 + * this method may await its completion before returning. If this
547 + * phaser has a parent, and the given number of parities is
548 + * greater than zero, and this phaser previously had no registered
549 + * parties, this phaser is also registered with its parent.
447 550 *
448 - * @param parties the number of parties required to trip barrier
551 + * @param parties the number of additional parties required to
552 + * advance to the next phase
449 553 * @return the arrival phase number to which this registration applied
450 554 * @throws IllegalStateException if attempting to register more
451 555 * than the maximum supported number of parties
556 + * @throws IllegalArgumentException if {@code parties < 0}
452 557 */
453 558 public int bulkRegister(int parties) {
454 559 if (parties < 0)
455 560 throw new IllegalArgumentException();
456 561 if (parties == 0)
457 562 return getPhase();
458 563 return doRegister(parties);
459 564 }
460 565
461 566 /**
462 - * Shared code for register, bulkRegister
463 - */
464 - private int doRegister(int registrations) {
465 - int phase;
466 - for (;;) {
467 - long s = getReconciledState();
468 - phase = phaseOf(s);
469 - int unarrived = unarrivedOf(s) + registrations;
470 - int parties = partiesOf(s) + registrations;
471 - if (phase < 0)
472 - break;
473 - if (parties > ushortMask || unarrived > ushortMask)
474 - throw new IllegalStateException(badBounds(parties, unarrived));
475 - if (phase == phaseOf(root.state) &&
476 - casState(s, stateFor(phase, parties, unarrived)))
477 - break;
478 - }
479 - return phase;
480 - }
481 -
482 - /**
483 - * Arrives at the barrier, but does not wait for others. (You can
484 - * in turn wait for others via {@link #awaitAdvance}). It is an
485 - * unenforced usage error for an unregistered party to invoke this
486 - * method.
567 + * Arrives at this phaser, without waiting for others to arrive.
487 568 *
569 + * <p>It is a usage error for an unregistered party to invoke this
570 + * method. However, this error may result in an {@code
571 + * IllegalStateException} only upon some subsequent operation on
572 + * this phaser, if ever.
573 + *
488 574 * @return the arrival phase number, or a negative value if terminated
489 575 * @throws IllegalStateException if not terminated and the number
490 576 * of unarrived parties would become negative
491 577 */
492 578 public int arrive() {
493 - int phase;
494 - for (;;) {
495 - long s = state;
496 - phase = phaseOf(s);
497 - if (phase < 0)
498 - break;
499 - int parties = partiesOf(s);
500 - int unarrived = unarrivedOf(s) - 1;
501 - if (unarrived > 0) { // Not the last arrival
502 - if (casState(s, s - 1)) // s-1 adds one arrival
503 - break;
504 - }
505 - else if (unarrived == 0) { // the last arrival
506 - Phaser par = parent;
507 - if (par == null) { // directly trip
508 - if (casState
509 - (s,
510 - trippedStateFor(onAdvance(phase, parties) ? -1 :
511 - ((phase + 1) & phaseMask), parties))) {
512 - releaseWaiters(phase);
513 - break;
514 - }
515 - }
516 - else { // cascade to parent
517 - if (casState(s, s - 1)) { // zeroes unarrived
518 - par.arrive();
519 - reconcileState();
520 - break;
521 - }
522 - }
523 - }
524 - else if (phase != phaseOf(root.state)) // or if unreconciled
525 - reconcileState();
526 - else
527 - throw new IllegalStateException(badBounds(parties, unarrived));
528 - }
529 - return phase;
579 + return doArrive(ONE_ARRIVAL);
530 580 }
531 581
532 582 /**
533 - * Arrives at the barrier and deregisters from it without waiting
534 - * for others. Deregistration reduces the number of parties
535 - * required to trip the barrier in future phases. If this phaser
583 + * Arrives at this phaser and deregisters from it without waiting
584 + * for others to arrive. Deregistration reduces the number of
585 + * parties required to advance in future phases. If this phaser
536 586 * has a parent, and deregistration causes this phaser to have
537 - * zero parties, this phaser also arrives at and is deregistered
538 - * from its parent. It is an unenforced usage error for an
539 - * unregistered party to invoke this method.
587 + * zero parties, this phaser is also deregistered from its parent.
540 588 *
589 + * <p>It is a usage error for an unregistered party to invoke this
590 + * method. However, this error may result in an {@code
591 + * IllegalStateException} only upon some subsequent operation on
592 + * this phaser, if ever.
593 + *
541 594 * @return the arrival phase number, or a negative value if terminated
542 595 * @throws IllegalStateException if not terminated and the number
543 596 * of registered or unarrived parties would become negative
544 597 */
545 598 public int arriveAndDeregister() {
546 - // similar code to arrive, but too different to merge
547 - Phaser par = parent;
548 - int phase;
549 - for (;;) {
550 - long s = state;
551 - phase = phaseOf(s);
552 - if (phase < 0)
553 - break;
554 - int parties = partiesOf(s) - 1;
555 - int unarrived = unarrivedOf(s) - 1;
556 - if (parties >= 0) {
557 - if (unarrived > 0 || (unarrived == 0 && par != null)) {
558 - if (casState
559 - (s,
560 - stateFor(phase, parties, unarrived))) {
561 - if (unarrived == 0) {
562 - par.arriveAndDeregister();
563 - reconcileState();
564 - }
565 - break;
566 - }
567 - continue;
568 - }
569 - if (unarrived == 0) {
570 - if (casState
571 - (s,
572 - trippedStateFor(onAdvance(phase, parties) ? -1 :
573 - ((phase + 1) & phaseMask), parties))) {
574 - releaseWaiters(phase);
575 - break;
576 - }
577 - continue;
578 - }
579 - if (par != null && phase != phaseOf(root.state)) {
580 - reconcileState();
581 - continue;
582 - }
583 - }
584 - throw new IllegalStateException(badBounds(parties, unarrived));
585 - }
586 - return phase;
599 + return doArrive(ONE_ARRIVAL|ONE_PARTY);
587 600 }
588 601
589 602 /**
590 - * Arrives at the barrier and awaits others. Equivalent in effect
603 + * Arrives at this phaser and awaits others. Equivalent in effect
591 604 * to {@code awaitAdvance(arrive())}. If you need to await with
592 605 * interruption or timeout, you can arrange this with an analogous
593 - * construction using one of the other forms of the awaitAdvance
594 - * method. If instead you need to deregister upon arrival use
595 - * {@code arriveAndDeregister}. It is an unenforced usage error
596 - * for an unregistered party to invoke this method.
606 + * construction using one of the other forms of the {@code
607 + * awaitAdvance} method. If instead you need to deregister upon
608 + * arrival, use {@code awaitAdvance(arriveAndDeregister())}.
597 609 *
610 + * <p>It is a usage error for an unregistered party to invoke this
611 + * method. However, this error may result in an {@code
612 + * IllegalStateException} only upon some subsequent operation on
613 + * this phaser, if ever.
614 + *
598 615 * @return the arrival phase number, or a negative number if terminated
599 616 * @throws IllegalStateException if not terminated and the number
600 617 * of unarrived parties would become negative
601 618 */
602 619 public int arriveAndAwaitAdvance() {
603 - return awaitAdvance(arrive());
620 + return awaitAdvance(doArrive(ONE_ARRIVAL));
604 621 }
605 622
606 623 /**
607 - * Awaits the phase of the barrier to advance from the given phase
608 - * value, returning immediately if the current phase of the
609 - * barrier is not equal to the given phase value or this barrier
610 - * is terminated. It is an unenforced usage error for an
611 - * unregistered party to invoke this method.
624 + * Awaits the phase of this phaser to advance from the given phase
625 + * value, returning immediately if the current phase is not equal
626 + * to the given phase value or this phaser is terminated.
612 627 *
613 628 * @param phase an arrival phase number, or negative value if
614 629 * terminated; this argument is normally the value returned by a
615 - * previous call to {@code arrive} or its variants
630 + * previous call to {@code arrive} or {@code arriveAndDeregister}.
616 631 * @return the next arrival phase number, or a negative value
617 632 * if terminated or argument is negative
618 633 */
619 634 public int awaitAdvance(int phase) {
635 + Phaser rt;
636 + int p = (int)(state >>> PHASE_SHIFT);
620 637 if (phase < 0)
621 638 return phase;
622 - long s = getReconciledState();
623 - int p = phaseOf(s);
624 - if (p != phase)
625 - return p;
626 - if (unarrivedOf(s) == 0 && parent != null)
627 - parent.awaitAdvance(phase);
628 - // Fall here even if parent waited, to reconcile and help release
629 - return untimedWait(phase);
639 + if (p == phase &&
640 + (p = (int)((rt = root).state >>> PHASE_SHIFT)) == phase)
641 + return rt.internalAwaitAdvance(phase, null);
642 + return p;
630 643 }
631 644
632 645 /**
633 - * Awaits the phase of the barrier to advance from the given phase
646 + * Awaits the phase of this phaser to advance from the given phase
634 647 * value, throwing {@code InterruptedException} if interrupted
635 - * while waiting, or returning immediately if the current phase of
636 - * the barrier is not equal to the given phase value or this
637 - * barrier is terminated. It is an unenforced usage error for an
638 - * unregistered party to invoke this method.
648 + * while waiting, or returning immediately if the current phase is
649 + * not equal to the given phase value or this phaser is
650 + * terminated.
639 651 *
640 652 * @param phase an arrival phase number, or negative value if
641 653 * terminated; this argument is normally the value returned by a
642 - * previous call to {@code arrive} or its variants
654 + * previous call to {@code arrive} or {@code arriveAndDeregister}.
643 655 * @return the next arrival phase number, or a negative value
644 656 * if terminated or argument is negative
645 657 * @throws InterruptedException if thread interrupted while waiting
646 658 */
647 659 public int awaitAdvanceInterruptibly(int phase)
648 660 throws InterruptedException {
661 + Phaser rt;
662 + int p = (int)(state >>> PHASE_SHIFT);
649 663 if (phase < 0)
650 664 return phase;
651 - long s = getReconciledState();
652 - int p = phaseOf(s);
653 - if (p != phase)
654 - return p;
655 - if (unarrivedOf(s) == 0 && parent != null)
656 - parent.awaitAdvanceInterruptibly(phase);
657 - return interruptibleWait(phase);
665 + if (p == phase &&
666 + (p = (int)((rt = root).state >>> PHASE_SHIFT)) == phase) {
667 + QNode node = new QNode(this, phase, true, false, 0L);
668 + p = rt.internalAwaitAdvance(phase, node);
669 + if (node.wasInterrupted)
670 + throw new InterruptedException();
671 + }
672 + return p;
658 673 }
659 674
660 675 /**
661 - * Awaits the phase of the barrier to advance from the given phase
676 + * Awaits the phase of this phaser to advance from the given phase
662 677 * value or the given timeout to elapse, throwing {@code
663 678 * InterruptedException} if interrupted while waiting, or
664 - * returning immediately if the current phase of the barrier is
665 - * not equal to the given phase value or this barrier is
666 - * terminated. It is an unenforced usage error for an
667 - * unregistered party to invoke this method.
679 + * returning immediately if the current phase is not equal to the
680 + * given phase value or this phaser is terminated.
668 681 *
669 682 * @param phase an arrival phase number, or negative value if
670 683 * terminated; this argument is normally the value returned by a
671 - * previous call to {@code arrive} or its variants
684 + * previous call to {@code arrive} or {@code arriveAndDeregister}.
672 685 * @param timeout how long to wait before giving up, in units of
673 686 * {@code unit}
674 687 * @param unit a {@code TimeUnit} determining how to interpret the
675 688 * {@code timeout} parameter
676 689 * @return the next arrival phase number, or a negative value
677 690 * if terminated or argument is negative
678 691 * @throws InterruptedException if thread interrupted while waiting
679 692 * @throws TimeoutException if timed out while waiting
680 693 */
681 694 public int awaitAdvanceInterruptibly(int phase,
682 695 long timeout, TimeUnit unit)
683 696 throws InterruptedException, TimeoutException {
697 + long nanos = unit.toNanos(timeout);
698 + Phaser rt;
699 + int p = (int)(state >>> PHASE_SHIFT);
684 700 if (phase < 0)
685 701 return phase;
686 - long s = getReconciledState();
687 - int p = phaseOf(s);
688 - if (p != phase)
689 - return p;
690 - if (unarrivedOf(s) == 0 && parent != null)
691 - parent.awaitAdvanceInterruptibly(phase, timeout, unit);
692 - return timedWait(phase, unit.toNanos(timeout));
702 + if (p == phase &&
703 + (p = (int)((rt = root).state >>> PHASE_SHIFT)) == phase) {
704 + QNode node = new QNode(this, phase, true, true, nanos);
705 + p = rt.internalAwaitAdvance(phase, node);
706 + if (node.wasInterrupted)
707 + throw new InterruptedException();
708 + else if (p == phase)
709 + throw new TimeoutException();
710 + }
711 + return p;
693 712 }
694 713
695 714 /**
696 - * Forces this barrier to enter termination state. Counts of
697 - * arrived and registered parties are unaffected. If this phaser
698 - * has a parent, it too is terminated. This method may be useful
699 - * for coordinating recovery after one or more tasks encounter
700 - * unexpected exceptions.
715 + * Forces this phaser to enter termination state. Counts of
716 + * arrived and registered parties are unaffected. If this phaser
717 + * is a member of a tiered set of phasers, then all of the phasers
718 + * in the set are terminated. If this phaser is already
719 + * terminated, this method has no effect. This method may be
720 + * useful for coordinating recovery after one or more tasks
721 + * encounter unexpected exceptions.
701 722 */
702 723 public void forceTermination() {
703 - for (;;) {
704 - long s = getReconciledState();
705 - int phase = phaseOf(s);
706 - int parties = partiesOf(s);
707 - int unarrived = unarrivedOf(s);
708 - if (phase < 0 ||
709 - casState(s, stateFor(-1, parties, unarrived))) {
710 - releaseWaiters(0);
724 + // Only need to change root state
725 + final Phaser root = this.root;
726 + long s;
727 + while ((s = root.state) >= 0) {
728 + if (UNSAFE.compareAndSwapLong(root, stateOffset,
729 + s, s | TERMINATION_BIT)) {
730 + releaseWaiters(0); // signal all threads
711 731 releaseWaiters(1);
712 - if (parent != null)
713 - parent.forceTermination();
714 732 return;
715 733 }
716 734 }
717 735 }
718 736
719 737 /**
720 738 * Returns the current phase number. The maximum phase number is
721 739 * {@code Integer.MAX_VALUE}, after which it restarts at
722 - * zero. Upon termination, the phase number is negative.
740 + * zero. Upon termination, the phase number is negative,
741 + * in which case the prevailing phase prior to termination
742 + * may be obtained via {@code getPhase() + Integer.MIN_VALUE}.
723 743 *
724 744 * @return the phase number, or a negative value if terminated
725 745 */
726 746 public final int getPhase() {
727 - return phaseOf(getReconciledState());
747 + return (int)(root.state >>> PHASE_SHIFT);
728 748 }
729 749
730 750 /**
731 - * Returns the number of parties registered at this barrier.
751 + * Returns the number of parties registered at this phaser.
732 752 *
733 753 * @return the number of parties
734 754 */
735 755 public int getRegisteredParties() {
736 756 return partiesOf(state);
737 757 }
738 758
739 759 /**
740 760 * Returns the number of registered parties that have arrived at
741 - * the current phase of this barrier.
761 + * the current phase of this phaser.
742 762 *
743 763 * @return the number of arrived parties
744 764 */
745 765 public int getArrivedParties() {
746 - return arrivedOf(state);
766 + long s = state;
767 + int u = unarrivedOf(s); // only reconcile if possibly needed
768 + return (u != 0 || parent == null) ?
769 + partiesOf(s) - u :
770 + arrivedOf(reconcileState());
747 771 }
748 772
749 773 /**
750 774 * Returns the number of registered parties that have not yet
751 - * arrived at the current phase of this barrier.
775 + * arrived at the current phase of this phaser.
752 776 *
753 777 * @return the number of unarrived parties
754 778 */
755 779 public int getUnarrivedParties() {
756 - return unarrivedOf(state);
780 + int u = unarrivedOf(state);
781 + return (u != 0 || parent == null) ? u : unarrivedOf(reconcileState());
757 782 }
758 783
759 784 /**
760 785 * Returns the parent of this phaser, or {@code null} if none.
761 786 *
762 787 * @return the parent of this phaser, or {@code null} if none
763 788 */
764 789 public Phaser getParent() {
765 790 return parent;
766 791 }
767 792
768 793 /**
↓ open down ↓ |
2 lines elided |
↑ open up ↑ |
769 794 * Returns the root ancestor of this phaser, which is the same as
770 795 * this phaser if it has no parent.
771 796 *
772 797 * @return the root ancestor of this phaser
773 798 */
774 799 public Phaser getRoot() {
775 800 return root;
776 801 }
777 802
778 803 /**
779 - * Returns {@code true} if this barrier has been terminated.
804 + * Returns {@code true} if this phaser has been terminated.
780 805 *
781 - * @return {@code true} if this barrier has been terminated
806 + * @return {@code true} if this phaser has been terminated
782 807 */
783 808 public boolean isTerminated() {
784 - return getPhase() < 0;
809 + return root.state < 0L;
785 810 }
786 811
787 812 /**
788 813 * Overridable method to perform an action upon impending phase
789 814 * advance, and to control termination. This method is invoked
790 - * upon arrival of the party tripping the barrier (when all other
815 + * upon arrival of the party advancing this phaser (when all other
791 816 * waiting parties are dormant). If this method returns {@code
792 - * true}, then, rather than advance the phase number, this barrier
817 + * true}, then, rather than advance the phase number, this phaser
793 818 * will be set to a final termination state, and subsequent calls
794 819 * to {@link #isTerminated} will return true. Any (unchecked)
795 820 * Exception or Error thrown by an invocation of this method is
796 - * propagated to the party attempting to trip the barrier, in
821 + * propagated to the party attempting to advance this phaser, in
797 822 * which case no advance occurs.
798 823 *
799 824 * <p>The arguments to this method provide the state of the phaser
800 - * prevailing for the current transition. (When called from within
801 - * an implementation of {@code onAdvance} the values returned by
802 - * methods such as {@code getPhase} may or may not reliably
803 - * indicate the state to which this transition applies.)
825 + * prevailing for the current transition. The effects of invoking
826 + * arrival, registration, and waiting methods on this phaser from
827 + * within {@code onAdvance} are unspecified and should not be
828 + * relied on.
804 829 *
805 - * <p>The default version returns {@code true} when the number of
806 - * registered parties is zero. Normally, overrides that arrange
807 - * termination for other reasons should also preserve this
808 - * property.
830 + * <p>If this phaser is a member of a tiered set of phasers, then
831 + * {@code onAdvance} is invoked only for its root phaser on each
832 + * advance.
809 833 *
810 - * <p>You may override this method to perform an action with side
811 - * effects visible to participating tasks, but it is only sensible
812 - * to do so in designs where all parties register before any
813 - * arrive, and all {@link #awaitAdvance} at each phase.
814 - * Otherwise, you cannot ensure lack of interference from other
815 - * parties during the invocation of this method. Additionally,
816 - * method {@code onAdvance} may be invoked more than once per
817 - * transition if registrations are intermixed with arrivals.
834 + * <p>To support the most common use cases, the default
835 + * implementation of this method returns {@code true} when the
836 + * number of registered parties has become zero as the result of a
837 + * party invoking {@code arriveAndDeregister}. You can disable
838 + * this behavior, thus enabling continuation upon future
839 + * registrations, by overriding this method to always return
840 + * {@code false}:
818 841 *
819 - * @param phase the phase number on entering the barrier
842 + * <pre> {@code
843 + * Phaser phaser = new Phaser() {
844 + * protected boolean onAdvance(int phase, int parties) { return false; }
845 + * }}</pre>
846 + *
847 + * @param phase the current phase number on entry to this method,
848 + * before this phaser is advanced
820 849 * @param registeredParties the current number of registered parties
821 - * @return {@code true} if this barrier should terminate
850 + * @return {@code true} if this phaser should terminate
822 851 */
823 852 protected boolean onAdvance(int phase, int registeredParties) {
824 - return registeredParties <= 0;
853 + return registeredParties == 0;
825 854 }
826 855
827 856 /**
828 857 * Returns a string identifying this phaser, as well as its
829 858 * state. The state, in brackets, includes the String {@code
830 859 * "phase = "} followed by the phase number, {@code "parties = "}
831 860 * followed by the number of registered parties, and {@code
832 861 * "arrived = "} followed by the number of arrived parties.
833 862 *
834 - * @return a string identifying this barrier, as well as its state
863 + * @return a string identifying this phaser, as well as its state
835 864 */
836 865 public String toString() {
837 - long s = getReconciledState();
866 + return stateToString(reconcileState());
867 + }
868 +
869 + /**
870 + * Implementation of toString and string-based error messages
871 + */
872 + private String stateToString(long s) {
838 873 return super.toString() +
839 874 "[phase = " + phaseOf(s) +
840 875 " parties = " + partiesOf(s) +
841 876 " arrived = " + arrivedOf(s) + "]";
842 877 }
843 878
844 - // methods for waiting
879 + // Waiting mechanics
845 880
846 881 /**
847 - * Wait nodes for Treiber stack representing wait queue
882 + * Removes and signals threads from queue for phase.
848 883 */
849 - static final class QNode implements ForkJoinPool.ManagedBlocker {
850 - final Phaser phaser;
851 - final int phase;
852 - final long startTime;
853 - final long nanos;
854 - final boolean timed;
855 - final boolean interruptible;
856 - volatile boolean wasInterrupted = false;
857 - volatile Thread thread; // nulled to cancel wait
858 - QNode next;
859 - QNode(Phaser phaser, int phase, boolean interruptible,
860 - boolean timed, long startTime, long nanos) {
861 - this.phaser = phaser;
862 - this.phase = phase;
863 - this.timed = timed;
864 - this.interruptible = interruptible;
865 - this.startTime = startTime;
866 - this.nanos = nanos;
867 - thread = Thread.currentThread();
868 - }
869 - public boolean isReleasable() {
870 - return (thread == null ||
871 - phaser.getPhase() != phase ||
872 - (interruptible && wasInterrupted) ||
873 - (timed && (nanos - (System.nanoTime() - startTime)) <= 0));
874 - }
875 - public boolean block() {
876 - if (Thread.interrupted()) {
877 - wasInterrupted = true;
878 - if (interruptible)
879 - return true;
880 - }
881 - if (!timed)
882 - LockSupport.park(this);
883 - else {
884 - long waitTime = nanos - (System.nanoTime() - startTime);
885 - if (waitTime <= 0)
886 - return true;
887 - LockSupport.parkNanos(this, waitTime);
888 - }
889 - return isReleasable();
890 - }
891 - void signal() {
892 - Thread t = thread;
893 - if (t != null) {
894 - thread = null;
884 + private void releaseWaiters(int phase) {
885 + QNode q; // first element of queue
886 + int p; // its phase
887 + Thread t; // its thread
888 + AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
889 + while ((q = head.get()) != null &&
890 + ((p = q.phase) == phase ||
891 + (int)(root.state >>> PHASE_SHIFT) != p)) {
892 + if (head.compareAndSet(q, q.next) &&
893 + (t = q.thread) != null) {
894 + q.thread = null;
895 895 LockSupport.unpark(t);
896 896 }
897 897 }
898 - boolean doWait() {
899 - if (thread != null) {
900 - try {
901 - ForkJoinPool.managedBlock(this);
902 - } catch (InterruptedException ie) {
903 - }
904 - }
905 - return wasInterrupted;
906 - }
907 -
908 898 }
909 899
910 - /**
911 - * Removes and signals waiting threads from wait queue.
912 - */
913 - private void releaseWaiters(int phase) {
914 - AtomicReference<QNode> head = queueFor(phase);
915 - QNode q;
916 - while ((q = head.get()) != null) {
917 - if (head.compareAndSet(q, q.next))
918 - q.signal();
919 - }
920 - }
900 + /** The number of CPUs, for spin control */
901 + private static final int NCPU = Runtime.getRuntime().availableProcessors();
921 902
922 903 /**
923 - * Tries to enqueue given node in the appropriate wait queue.
924 - *
925 - * @return true if successful
904 + * The number of times to spin before blocking while waiting for
905 + * advance, per arrival while waiting. On multiprocessors, fully
906 + * blocking and waking up a large number of threads all at once is
907 + * usually a very slow process, so we use rechargeable spins to
908 + * avoid it when threads regularly arrive: When a thread in
909 + * internalAwaitAdvance notices another arrival before blocking,
910 + * and there appear to be enough CPUs available, it spins
911 + * SPINS_PER_ARRIVAL more times before blocking. The value trades
912 + * off good-citizenship vs big unnecessary slowdowns.
926 913 */
927 - private boolean tryEnqueue(QNode node) {
928 - AtomicReference<QNode> head = queueFor(node.phase);
929 - return head.compareAndSet(node.next = head.get(), node);
930 - }
914 + static final int SPINS_PER_ARRIVAL = (NCPU < 2) ? 1 : 1 << 8;
931 915
932 916 /**
933 - * Enqueues node and waits unless aborted or signalled.
917 + * Possibly blocks and waits for phase to advance unless aborted.
918 + * Call only from root node.
934 919 *
920 + * @param phase current phase
921 + * @param node if non-null, the wait node to track interrupt and timeout;
922 + * if null, denotes noninterruptible wait
935 923 * @return current phase
936 924 */
937 - private int untimedWait(int phase) {
938 - QNode node = null;
939 - boolean queued = false;
940 - boolean interrupted = false;
925 + private int internalAwaitAdvance(int phase, QNode node) {
926 + releaseWaiters(phase-1); // ensure old queue clean
927 + boolean queued = false; // true when node is enqueued
928 + int lastUnarrived = 0; // to increase spins upon change
929 + int spins = SPINS_PER_ARRIVAL;
930 + long s;
941 931 int p;
942 - while ((p = getPhase()) == phase) {
943 - if (Thread.interrupted())
944 - interrupted = true;
945 - else if (node == null)
946 - node = new QNode(this, phase, false, false, 0, 0);
947 - else if (!queued)
948 - queued = tryEnqueue(node);
949 - else
950 - interrupted = node.doWait();
932 + while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {
933 + if (node == null) { // spinning in noninterruptible mode
934 + int unarrived = (int)s & UNARRIVED_MASK;
935 + if (unarrived != lastUnarrived &&
936 + (lastUnarrived = unarrived) < NCPU)
937 + spins += SPINS_PER_ARRIVAL;
938 + boolean interrupted = Thread.interrupted();
939 + if (interrupted || --spins < 0) { // need node to record intr
940 + node = new QNode(this, phase, false, false, 0L);
941 + node.wasInterrupted = interrupted;
942 + }
943 + }
944 + else if (node.isReleasable()) // done or aborted
945 + break;
946 + else if (!queued) { // push onto queue
947 + AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
948 + QNode q = node.next = head.get();
949 + if ((q == null || q.phase == phase) &&
950 + (int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq
951 + queued = head.compareAndSet(q, node);
952 + }
953 + else {
954 + try {
955 + ForkJoinPool.managedBlock(node);
956 + } catch (InterruptedException ie) {
957 + node.wasInterrupted = true;
958 + }
959 + }
951 960 }
952 - if (node != null)
953 - node.thread = null;
961 +
962 + if (node != null) {
963 + if (node.thread != null)
964 + node.thread = null; // avoid need for unpark()
965 + if (node.wasInterrupted && !node.interruptible)
966 + Thread.currentThread().interrupt();
967 + if ((p = (int)(state >>> PHASE_SHIFT)) == phase)
968 + return p; // recheck abort
969 + }
954 970 releaseWaiters(phase);
955 - if (interrupted)
956 - Thread.currentThread().interrupt();
957 971 return p;
958 972 }
959 973
960 974 /**
961 - * Interruptible version
962 - * @return current phase
975 + * Wait nodes for Treiber stack representing wait queue
963 976 */
964 - private int interruptibleWait(int phase) throws InterruptedException {
965 - QNode node = null;
966 - boolean queued = false;
967 - boolean interrupted = false;
968 - int p;
969 - while ((p = getPhase()) == phase && !interrupted) {
970 - if (Thread.interrupted())
971 - interrupted = true;
972 - else if (node == null)
973 - node = new QNode(this, phase, true, false, 0, 0);
974 - else if (!queued)
975 - queued = tryEnqueue(node);
976 - else
977 - interrupted = node.doWait();
977 + static final class QNode implements ForkJoinPool.ManagedBlocker {
978 + final Phaser phaser;
979 + final int phase;
980 + final boolean interruptible;
981 + final boolean timed;
982 + boolean wasInterrupted;
983 + long nanos;
984 + long lastTime;
985 + volatile Thread thread; // nulled to cancel wait
986 + QNode next;
987 +
988 + QNode(Phaser phaser, int phase, boolean interruptible,
989 + boolean timed, long nanos) {
990 + this.phaser = phaser;
991 + this.phase = phase;
992 + this.interruptible = interruptible;
993 + this.nanos = nanos;
994 + this.timed = timed;
995 + this.lastTime = timed ? System.nanoTime() : 0L;
996 + thread = Thread.currentThread();
978 997 }
979 - if (node != null)
980 - node.thread = null;
981 - if (p != phase || (p = getPhase()) != phase)
982 - releaseWaiters(phase);
983 - if (interrupted)
984 - throw new InterruptedException();
985 - return p;
986 - }
987 998
988 - /**
989 - * Timeout version.
990 - * @return current phase
991 - */
992 - private int timedWait(int phase, long nanos)
993 - throws InterruptedException, TimeoutException {
994 - long startTime = System.nanoTime();
995 - QNode node = null;
996 - boolean queued = false;
997 - boolean interrupted = false;
998 - int p;
999 - while ((p = getPhase()) == phase && !interrupted) {
999 + public boolean isReleasable() {
1000 + if (thread == null)
1001 + return true;
1002 + if (phaser.getPhase() != phase) {
1003 + thread = null;
1004 + return true;
1005 + }
1000 1006 if (Thread.interrupted())
1001 - interrupted = true;
1002 - else if (nanos - (System.nanoTime() - startTime) <= 0)
1003 - break;
1004 - else if (node == null)
1005 - node = new QNode(this, phase, true, true, startTime, nanos);
1006 - else if (!queued)
1007 - queued = tryEnqueue(node);
1008 - else
1009 - interrupted = node.doWait();
1007 + wasInterrupted = true;
1008 + if (wasInterrupted && interruptible) {
1009 + thread = null;
1010 + return true;
1011 + }
1012 + if (timed) {
1013 + if (nanos > 0L) {
1014 + long now = System.nanoTime();
1015 + nanos -= now - lastTime;
1016 + lastTime = now;
1017 + }
1018 + if (nanos <= 0L) {
1019 + thread = null;
1020 + return true;
1021 + }
1022 + }
1023 + return false;
1010 1024 }
1011 - if (node != null)
1012 - node.thread = null;
1013 - if (p != phase || (p = getPhase()) != phase)
1014 - releaseWaiters(phase);
1015 - if (interrupted)
1016 - throw new InterruptedException();
1017 - if (p == phase)
1018 - throw new TimeoutException();
1019 - return p;
1025 +
1026 + public boolean block() {
1027 + if (isReleasable())
1028 + return true;
1029 + else if (!timed)
1030 + LockSupport.park(this);
1031 + else if (nanos > 0)
1032 + LockSupport.parkNanos(this, nanos);
1033 + return isReleasable();
1034 + }
1020 1035 }
1021 1036
1022 1037 // Unsafe mechanics
1023 1038
1024 1039 private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
1025 1040 private static final long stateOffset =
1026 1041 objectFieldOffset("state", Phaser.class);
1027 1042
1028 - private final boolean casState(long cmp, long val) {
1029 - return UNSAFE.compareAndSwapLong(this, stateOffset, cmp, val);
1030 - }
1031 -
1032 1043 private static long objectFieldOffset(String field, Class<?> klazz) {
1033 1044 try {
1034 1045 return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
1035 1046 } catch (NoSuchFieldException e) {
1036 1047 // Convert Exception to corresponding Error
1037 1048 NoSuchFieldError error = new NoSuchFieldError(field);
1038 1049 error.initCause(e);
1039 1050 throw error;
1040 1051 }
1041 1052 }
1042 1053 }
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX