Print this page
Split |
Close |
Expand all |
Collapse all |
--- old/src/share/classes/java/util/concurrent/ForkJoinWorkerThread.java
+++ new/src/share/classes/java/util/concurrent/ForkJoinWorkerThread.java
1 1 /*
2 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3 3 *
4 4 * This code is free software; you can redistribute it and/or modify it
5 5 * under the terms of the GNU General Public License version 2 only, as
6 6 * published by the Free Software Foundation. Oracle designates this
7 7 * particular file as subject to the "Classpath" exception as provided
8 8 * by Oracle in the LICENSE file that accompanied this code.
9 9 *
10 10 * This code is distributed in the hope that it will be useful, but WITHOUT
11 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 13 * version 2 for more details (a copy is included in the LICENSE file that
14 14 * accompanied this code).
15 15 *
16 16 * You should have received a copy of the GNU General Public License version
17 17 * 2 along with this work; if not, write to the Free Software Foundation,
18 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19 19 *
20 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21 21 * or visit www.oracle.com if you need additional information or have any
22 22 * questions.
23 23 */
24 24
25 25 /*
26 26 * This file is available under and governed by the GNU General Public
27 27 * License version 2 only, as published by the Free Software Foundation.
28 28 * However, the following notice accompanied the original version of this
29 29 * file:
30 30 *
↓ open down ↓ |
30 lines elided |
↑ open up ↑ |
31 31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 32 * Expert Group and released to the public domain, as explained at
33 33 * http://creativecommons.org/licenses/publicdomain
34 34 */
35 35
36 36 package java.util.concurrent;
37 37
38 38 import java.util.Random;
39 39 import java.util.Collection;
40 40 import java.util.concurrent.locks.LockSupport;
41 +import java.util.concurrent.RejectedExecutionException;
41 42
42 43 /**
43 - * A thread managed by a {@link ForkJoinPool}. This class is
44 - * subclassable solely for the sake of adding functionality -- there
45 - * are no overridable methods dealing with scheduling or execution.
46 - * However, you can override initialization and termination methods
47 - * surrounding the main task processing loop. If you do create such a
48 - * subclass, you will also need to supply a custom {@link
49 - * ForkJoinPool.ForkJoinWorkerThreadFactory} to use it in a {@code
50 - * ForkJoinPool}.
44 + * A thread managed by a {@link ForkJoinPool}, which executes
45 + * {@link ForkJoinTask}s.
46 + * This class is subclassable solely for the sake of adding
47 + * functionality -- there are no overridable methods dealing with
48 + * scheduling or execution. However, you can override initialization
49 + * and termination methods surrounding the main task processing loop.
50 + * If you do create such a subclass, you will also need to supply a
51 + * custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to use it
52 + * in a {@code ForkJoinPool}.
51 53 *
52 54 * @since 1.7
53 55 * @author Doug Lea
54 56 */
55 57 public class ForkJoinWorkerThread extends Thread {
56 58 /*
57 59 * Overview:
58 60 *
59 61 * ForkJoinWorkerThreads are managed by ForkJoinPools and perform
60 62 * ForkJoinTasks. This class includes bookkeeping in support of
61 63 * worker activation, suspension, and lifecycle control described
62 64 * in more detail in the internal documentation of class
63 65 * ForkJoinPool. And as described further below, this class also
64 66 * includes special-cased support for some ForkJoinTask
65 67 * methods. But the main mechanics involve work-stealing:
66 68 *
67 69 * Work-stealing queues are special forms of Deques that support
68 70 * only three of the four possible end-operations -- push, pop,
69 71 * and deq (aka steal), under the further constraints that push
70 72 * and pop are called only from the owning thread, while deq may
71 73 * be called from other threads. (If you are unfamiliar with
72 74 * them, you probably want to read Herlihy and Shavit's book "The
73 75 * Art of Multiprocessor programming", chapter 16 describing these
74 76 * in more detail before proceeding.) The main work-stealing
75 77 * queue design is roughly similar to those in the papers "Dynamic
76 78 * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
77 79 * (http://research.sun.com/scalable/pubs/index.html) and
78 80 * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
79 81 * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
80 82 * The main differences ultimately stem from gc requirements that
81 83 * we null out taken slots as soon as we can, to maintain as small
82 84 * a footprint as possible even in programs generating huge
83 85 * numbers of tasks. To accomplish this, we shift the CAS
84 86 * arbitrating pop vs deq (steal) from being on the indices
85 87 * ("base" and "sp") to the slots themselves (mainly via method
86 88 * "casSlotNull()"). So, both a successful pop and deq mainly
87 89 * entail a CAS of a slot from non-null to null. Because we rely
88 90 * on CASes of references, we do not need tag bits on base or sp.
89 91 * They are simple ints as used in any circular array-based queue
90 92 * (see for example ArrayDeque). Updates to the indices must
91 93 * still be ordered in a way that guarantees that sp == base means
92 94 * the queue is empty, but otherwise may err on the side of
93 95 * possibly making the queue appear nonempty when a push, pop, or
94 96 * deq have not fully committed. Note that this means that the deq
95 97 * operation, considered individually, is not wait-free. One thief
96 98 * cannot successfully continue until another in-progress one (or,
97 99 * if previously empty, a push) completes. However, in the
98 100 * aggregate, we ensure at least probabilistic non-blockingness.
99 101 * If an attempted steal fails, a thief always chooses a different
100 102 * random victim target to try next. So, in order for one thief to
101 103 * progress, it suffices for any in-progress deq or new push on
102 104 * any empty queue to complete. One reason this works well here is
103 105 * that apparently-nonempty often means soon-to-be-stealable,
104 106 * which gives threads a chance to set activation status if
105 107 * necessary before stealing.
106 108 *
107 109 * This approach also enables support for "async mode" where local
108 110 * task processing is in FIFO, not LIFO order; simply by using a
109 111 * version of deq rather than pop when locallyFifo is true (as set
110 112 * by the ForkJoinPool). This allows use in message-passing
111 113 * frameworks in which tasks are never joined.
112 114 *
113 115 * When a worker would otherwise be blocked waiting to join a
114 116 * task, it first tries a form of linear helping: Each worker
115 117 * records (in field currentSteal) the most recent task it stole
116 118 * from some other worker. Plus, it records (in field currentJoin)
117 119 * the task it is currently actively joining. Method joinTask uses
118 120 * these markers to try to find a worker to help (i.e., steal back
119 121 * a task from and execute it) that could hasten completion of the
120 122 * actively joined task. In essence, the joiner executes a task
121 123 * that would be on its own local deque had the to-be-joined task
122 124 * not been stolen. This may be seen as a conservative variant of
123 125 * the approach in Wagner & Calder "Leapfrogging: a portable
124 126 * technique for implementing efficient futures" SIGPLAN Notices,
125 127 * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs
126 128 * in that: (1) We only maintain dependency links across workers
127 129 * upon steals, rather than use per-task bookkeeping. This may
128 130 * require a linear scan of workers array to locate stealers, but
129 131 * usually doesn't because stealers leave hints (that may become
130 132 * stale/wrong) of where to locate them. This isolates cost to
131 133 * when it is needed, rather than adding to per-task overhead.
132 134 * (2) It is "shallow", ignoring nesting and potentially cyclic
133 135 * mutual steals. (3) It is intentionally racy: field currentJoin
134 136 * is updated only while actively joining, which means that we
135 137 * miss links in the chain during long-lived tasks, GC stalls etc
136 138 * (which is OK since blocking in such cases is usually a good
137 139 * idea). (4) We bound the number of attempts to find work (see
138 140 * MAX_HELP_DEPTH) and fall back to suspending the worker and if
139 141 * necessary replacing it with a spare (see
140 142 * ForkJoinPool.awaitJoin).
141 143 *
142 144 * Efficient implementation of these algorithms currently relies
143 145 * on an uncomfortable amount of "Unsafe" mechanics. To maintain
144 146 * correct orderings, reads and writes of variable base require
145 147 * volatile ordering. Variable sp does not require volatile
146 148 * writes but still needs store-ordering, which we accomplish by
147 149 * pre-incrementing sp before filling the slot with an ordered
148 150 * store. (Pre-incrementing also enables backouts used in
149 151 * joinTask.) Because they are protected by volatile base reads,
150 152 * reads of the queue array and its slots by other threads do not
151 153 * need volatile load semantics, but writes (in push) require
152 154 * store order and CASes (in pop and deq) require (volatile) CAS
153 155 * semantics. (Michael, Saraswat, and Vechev's algorithm has
154 156 * similar properties, but without support for nulling slots.)
155 157 * Since these combinations aren't supported using ordinary
156 158 * volatiles, the only way to accomplish these efficiently is to
157 159 * use direct Unsafe calls. (Using external AtomicIntegers and
158 160 * AtomicReferenceArrays for the indices and array is
159 161 * significantly slower because of memory locality and indirection
160 162 * effects.)
161 163 *
162 164 * Further, performance on most platforms is very sensitive to
163 165 * placement and sizing of the (resizable) queue array. Even
164 166 * though these queues don't usually become all that big, the
165 167 * initial size must be large enough to counteract cache
166 168 * contention effects across multiple queues (especially in the
167 169 * presence of GC cardmarking). Also, to improve thread-locality,
168 170 * queues are initialized after starting. All together, these
169 171 * low-level implementation choices produce as much as a factor of
170 172 * 4 performance improvement compared to naive implementations,
171 173 * and enable the processing of billions of tasks per second,
172 174 * sometimes at the expense of ugliness.
173 175 */
174 176
175 177 /**
176 178 * Generator for initial random seeds for random victim
177 179 * selection. This is used only to create initial seeds. Random
178 180 * steals use a cheaper xorshift generator per steal attempt. We
179 181 * expect only rare contention on seedGenerator, so just use a
180 182 * plain Random.
181 183 */
182 184 private static final Random seedGenerator = new Random();
183 185
184 186 /**
185 187 * The maximum stolen->joining link depth allowed in helpJoinTask.
186 188 * Depths for legitimate chains are unbounded, but we use a fixed
187 189 * constant to avoid (otherwise unchecked) cycles and bound
188 190 * staleness of traversal parameters at the expense of sometimes
189 191 * blocking when we could be helping.
190 192 */
191 193 private static final int MAX_HELP_DEPTH = 8;
192 194
193 195 /**
194 196 * Capacity of work-stealing queue array upon initialization.
195 197 * Must be a power of two. Initial size must be at least 4, but is
196 198 * padded to minimize cache effects.
197 199 */
198 200 private static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
199 201
200 202 /**
201 203 * Maximum work-stealing queue array size. Must be less than or
202 204 * equal to 1 << (31 - width of array entry) to ensure lack of
203 205 * index wraparound. The value is set in the static block
204 206 * at the end of this file after obtaining width.
205 207 */
206 208 private static final int MAXIMUM_QUEUE_CAPACITY;
207 209
208 210 /**
209 211 * The pool this thread works in. Accessed directly by ForkJoinTask.
210 212 */
211 213 final ForkJoinPool pool;
212 214
213 215 /**
214 216 * The work-stealing queue array. Size must be a power of two.
215 217 * Initialized in onStart, to improve memory locality.
216 218 */
217 219 private ForkJoinTask<?>[] queue;
218 220
219 221 /**
220 222 * Index (mod queue.length) of least valid queue slot, which is
221 223 * always the next position to steal from if nonempty.
222 224 */
223 225 private volatile int base;
224 226
225 227 /**
226 228 * Index (mod queue.length) of next queue slot to push to or pop
227 229 * from. It is written only by owner thread, and accessed by other
228 230 * threads only after reading (volatile) base. Both sp and base
229 231 * are allowed to wrap around on overflow, but (sp - base) still
230 232 * estimates size.
231 233 */
232 234 private int sp;
233 235
234 236 /**
235 237 * The index of most recent stealer, used as a hint to avoid
236 238 * traversal in method helpJoinTask. This is only a hint because a
237 239 * worker might have had multiple steals and this only holds one
238 240 * of them (usually the most current). Declared non-volatile,
239 241 * relying on other prevailing sync to keep reasonably current.
240 242 */
241 243 private int stealHint;
242 244
243 245 /**
244 246 * Run state of this worker. In addition to the usual run levels,
245 247 * tracks if this worker is suspended as a spare, and if it was
246 248 * killed (trimmed) while suspended. However, "active" status is
247 249 * maintained separately and modified only in conjunction with
248 250 * CASes of the pool's runState (which are currently sadly
249 251 * manually inlined for performance.) Accessed directly by pool
250 252 * to simplify checks for normal (zero) status.
251 253 */
252 254 volatile int runState;
253 255
254 256 private static final int TERMINATING = 0x01;
255 257 private static final int TERMINATED = 0x02;
256 258 private static final int SUSPENDED = 0x04; // inactive spare
257 259 private static final int TRIMMED = 0x08; // killed while suspended
258 260
259 261 /**
260 262 * Number of steals. Directly accessed (and reset) by
261 263 * pool.tryAccumulateStealCount when idle.
262 264 */
263 265 int stealCount;
264 266
265 267 /**
266 268 * Seed for random number generator for choosing steal victims.
267 269 * Uses Marsaglia xorshift. Must be initialized as nonzero.
268 270 */
269 271 private int seed;
270 272
271 273 /**
272 274 * Activity status. When true, this worker is considered active.
273 275 * Accessed directly by pool. Must be false upon construction.
274 276 */
275 277 boolean active;
276 278
277 279 /**
278 280 * True if use local fifo, not default lifo, for local polling.
279 281 * Shadows value from ForkJoinPool.
280 282 */
281 283 private final boolean locallyFifo;
282 284
283 285 /**
284 286 * Index of this worker in pool array. Set once by pool before
285 287 * running, and accessed directly by pool to locate this worker in
286 288 * its workers array.
287 289 */
288 290 int poolIndex;
289 291
290 292 /**
291 293 * The last pool event waited for. Accessed only by pool in
292 294 * callback methods invoked within this thread.
293 295 */
294 296 int lastEventCount;
295 297
296 298 /**
297 299 * Encoded index and event count of next event waiter. Accessed
298 300 * only by ForkJoinPool for managing event waiters.
299 301 */
300 302 volatile long nextWaiter;
301 303
302 304 /**
303 305 * Number of times this thread suspended as spare. Accessed only
304 306 * by pool.
305 307 */
306 308 int spareCount;
307 309
308 310 /**
309 311 * Encoded index and count of next spare waiter. Accessed only
310 312 * by ForkJoinPool for managing spares.
311 313 */
312 314 volatile int nextSpare;
313 315
314 316 /**
315 317 * The task currently being joined, set only when actively trying
316 318 * to help other stealers in helpJoinTask. Written only by this
317 319 * thread, but read by others.
318 320 */
319 321 private volatile ForkJoinTask<?> currentJoin;
320 322
321 323 /**
322 324 * The task most recently stolen from another worker (or
323 325 * submission queue). Written only by this thread, but read by
324 326 * others.
325 327 */
326 328 private volatile ForkJoinTask<?> currentSteal;
327 329
328 330 /**
329 331 * Creates a ForkJoinWorkerThread operating in the given pool.
330 332 *
331 333 * @param pool the pool this thread works in
332 334 * @throws NullPointerException if pool is null
333 335 */
334 336 protected ForkJoinWorkerThread(ForkJoinPool pool) {
335 337 this.pool = pool;
336 338 this.locallyFifo = pool.locallyFifo;
337 339 setDaemon(true);
338 340 // To avoid exposing construction details to subclasses,
339 341 // remaining initialization is in start() and onStart()
340 342 }
341 343
342 344 /**
343 345 * Performs additional initialization and starts this thread.
344 346 */
345 347 final void start(int poolIndex, UncaughtExceptionHandler ueh) {
346 348 this.poolIndex = poolIndex;
347 349 if (ueh != null)
348 350 setUncaughtExceptionHandler(ueh);
349 351 start();
350 352 }
351 353
352 354 // Public/protected methods
353 355
354 356 /**
355 357 * Returns the pool hosting this thread.
356 358 *
357 359 * @return the pool
358 360 */
359 361 public ForkJoinPool getPool() {
360 362 return pool;
361 363 }
362 364
363 365 /**
364 366 * Returns the index number of this thread in its pool. The
365 367 * returned value ranges from zero to the maximum number of
366 368 * threads (minus one) that have ever been created in the pool.
367 369 * This method may be useful for applications that track status or
368 370 * collect results per-worker rather than per-task.
↓ open down ↓ |
308 lines elided |
↑ open up ↑ |
369 371 *
370 372 * @return the index number
371 373 */
372 374 public int getPoolIndex() {
373 375 return poolIndex;
374 376 }
375 377
376 378 /**
377 379 * Initializes internal state after construction but before
378 380 * processing any tasks. If you override this method, you must
379 - * invoke @code{super.onStart()} at the beginning of the method.
381 + * invoke {@code super.onStart()} at the beginning of the method.
380 382 * Initialization requires care: Most fields must have legal
381 383 * default values, to ensure that attempted accesses from other
382 384 * threads work correctly even before this thread starts
383 385 * processing tasks.
384 386 */
385 387 protected void onStart() {
386 388 int rs = seedGenerator.nextInt();
387 - seed = rs == 0? 1 : rs; // seed must be nonzero
389 + seed = (rs == 0) ? 1 : rs; // seed must be nonzero
388 390
389 391 // Allocate name string and arrays in this thread
390 392 String pid = Integer.toString(pool.getPoolNumber());
391 393 String wid = Integer.toString(poolIndex);
392 394 setName("ForkJoinPool-" + pid + "-worker-" + wid);
393 395
394 396 queue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
395 397 }
396 398
397 399 /**
398 400 * Performs cleanup associated with termination of this worker
399 401 * thread. If you override this method, you must invoke
400 402 * {@code super.onTermination} at the end of the overridden method.
401 403 *
402 404 * @param exception the exception causing this thread to abort due
403 405 * to an unrecoverable error, or {@code null} if completed normally
404 406 */
405 407 protected void onTermination(Throwable exception) {
406 408 try {
407 409 ForkJoinPool p = pool;
408 410 if (active) {
409 411 int a; // inline p.tryDecrementActiveCount
410 412 active = false;
411 413 do {} while (!UNSAFE.compareAndSwapInt
412 414 (p, poolRunStateOffset, a = p.runState, a - 1));
413 415 }
414 416 cancelTasks();
415 417 setTerminated();
416 418 p.workerTerminated(this);
417 419 } catch (Throwable ex) { // Shouldn't ever happen
418 420 if (exception == null) // but if so, at least rethrown
↓ open down ↓ |
21 lines elided |
↑ open up ↑ |
419 421 exception = ex;
420 422 } finally {
421 423 if (exception != null)
422 424 UNSAFE.throwException(exception);
423 425 }
424 426 }
425 427
426 428 /**
427 429 * This method is required to be public, but should never be
428 430 * called explicitly. It performs the main run loop to execute
429 - * ForkJoinTasks.
431 + * {@link ForkJoinTask}s.
430 432 */
431 433 public void run() {
432 434 Throwable exception = null;
433 435 try {
434 436 onStart();
435 437 mainLoop();
436 438 } catch (Throwable ex) {
437 439 exception = ex;
438 440 } finally {
439 441 onTermination(exception);
440 442 }
441 443 }
442 444
443 445 // helpers for run()
444 446
445 447 /**
446 448 * Finds and executes tasks, and checks status while running.
447 449 */
448 450 private void mainLoop() {
449 451 boolean ran = false; // true if ran a task on last step
450 452 ForkJoinPool p = pool;
451 453 for (;;) {
452 454 p.preStep(this, ran);
453 455 if (runState != 0)
454 456 break;
455 457 ran = tryExecSteal() || tryExecSubmission();
456 458 }
457 459 }
458 460
459 461 /**
460 462 * Tries to steal a task and execute it.
461 463 *
462 464 * @return true if ran a task
463 465 */
464 466 private boolean tryExecSteal() {
465 467 ForkJoinTask<?> t;
466 468 if ((t = scan()) != null) {
467 469 t.quietlyExec();
468 470 UNSAFE.putOrderedObject(this, currentStealOffset, null);
469 471 if (sp != base)
470 472 execLocalTasks();
471 473 return true;
472 474 }
473 475 return false;
474 476 }
475 477
476 478 /**
477 479 * If a submission exists, try to activate and run it.
478 480 *
479 481 * @return true if ran a task
480 482 */
481 483 private boolean tryExecSubmission() {
482 484 ForkJoinPool p = pool;
483 485 // This loop is needed in case attempt to activate fails, in
484 486 // which case we only retry if there still appears to be a
485 487 // submission.
486 488 while (p.hasQueuedSubmissions()) {
487 489 ForkJoinTask<?> t; int a;
488 490 if (active || // inline p.tryIncrementActiveCount
489 491 (active = UNSAFE.compareAndSwapInt(p, poolRunStateOffset,
490 492 a = p.runState, a + 1))) {
491 493 if ((t = p.pollSubmission()) != null) {
492 494 UNSAFE.putOrderedObject(this, currentStealOffset, t);
493 495 t.quietlyExec();
494 496 UNSAFE.putOrderedObject(this, currentStealOffset, null);
495 497 if (sp != base)
496 498 execLocalTasks();
497 499 return true;
498 500 }
499 501 }
500 502 }
501 503 return false;
502 504 }
503 505
504 506 /**
505 507 * Runs local tasks until queue is empty or shut down. Call only
506 508 * while active.
507 509 */
508 510 private void execLocalTasks() {
509 511 while (runState == 0) {
510 512 ForkJoinTask<?> t = locallyFifo ? locallyDeqTask() : popTask();
511 513 if (t != null)
512 514 t.quietlyExec();
513 515 else if (sp == base)
514 516 break;
515 517 }
516 518 }
517 519
518 520 /*
519 521 * Intrinsics-based atomic writes for queue slots. These are
520 522 * basically the same as methods in AtomicReferenceArray, but
521 523 * specialized for (1) ForkJoinTask elements (2) requirement that
522 524 * nullness and bounds checks have already been performed by
523 525 * callers and (3) effective offsets are known not to overflow
524 526 * from int to long (because of MAXIMUM_QUEUE_CAPACITY). We don't
525 527 * need corresponding version for reads: plain array reads are OK
526 528 * because they are protected by other volatile reads and are
527 529 * confirmed by CASes.
528 530 *
529 531 * Most uses don't actually call these methods, but instead contain
530 532 * inlined forms that enable more predictable optimization. We
531 533 * don't define the version of write used in pushTask at all, but
532 534 * instead inline there a store-fenced array slot write.
533 535 */
534 536
535 537 /**
536 538 * CASes slot i of array q from t to null. Caller must ensure q is
537 539 * non-null and index is in range.
538 540 */
539 541 private static final boolean casSlotNull(ForkJoinTask<?>[] q, int i,
540 542 ForkJoinTask<?> t) {
541 543 return UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, t, null);
542 544 }
543 545
544 546 /**
545 547 * Performs a volatile write of the given task at given slot of
546 548 * array q. Caller must ensure q is non-null and index is in
547 549 * range. This method is used only during resets and backouts.
548 550 */
549 551 private static final void writeSlot(ForkJoinTask<?>[] q, int i,
550 552 ForkJoinTask<?> t) {
551 553 UNSAFE.putObjectVolatile(q, (i << qShift) + qBase, t);
552 554 }
553 555
554 556 // queue methods
555 557
556 558 /**
557 559 * Pushes a task. Call only from this thread.
558 560 *
559 561 * @param t the task. Caller must ensure non-null.
560 562 */
561 563 final void pushTask(ForkJoinTask<?> t) {
562 564 ForkJoinTask<?>[] q = queue;
563 565 int mask = q.length - 1; // implicit assert q != null
564 566 int s = sp++; // ok to increment sp before slot write
565 567 UNSAFE.putOrderedObject(q, ((s & mask) << qShift) + qBase, t);
566 568 if ((s -= base) == 0)
567 569 pool.signalWork(); // was empty
568 570 else if (s == mask)
569 571 growQueue(); // is full
570 572 }
571 573
572 574 /**
573 575 * Tries to take a task from the base of the queue, failing if
574 576 * empty or contended. Note: Specializations of this code appear
575 577 * in locallyDeqTask and elsewhere.
576 578 *
577 579 * @return a task, or null if none or contended
578 580 */
579 581 final ForkJoinTask<?> deqTask() {
580 582 ForkJoinTask<?> t;
581 583 ForkJoinTask<?>[] q;
582 584 int b, i;
583 585 if (sp != (b = base) &&
584 586 (q = queue) != null && // must read q after b
585 587 (t = q[i = (q.length - 1) & b]) != null && base == b &&
586 588 UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, t, null)) {
587 589 base = b + 1;
588 590 return t;
589 591 }
590 592 return null;
591 593 }
592 594
593 595 /**
594 596 * Tries to take a task from the base of own queue. Assumes active
595 597 * status. Called only by this thread.
596 598 *
597 599 * @return a task, or null if none
598 600 */
599 601 final ForkJoinTask<?> locallyDeqTask() {
600 602 ForkJoinTask<?>[] q = queue;
601 603 if (q != null) {
602 604 ForkJoinTask<?> t;
603 605 int b, i;
604 606 while (sp != (b = base)) {
605 607 if ((t = q[i = (q.length - 1) & b]) != null && base == b &&
606 608 UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase,
607 609 t, null)) {
608 610 base = b + 1;
609 611 return t;
610 612 }
611 613 }
612 614 }
613 615 return null;
614 616 }
615 617
616 618 /**
617 619 * Returns a popped task, or null if empty. Assumes active status.
618 620 * Called only by this thread.
619 621 */
620 622 private ForkJoinTask<?> popTask() {
↓ open down ↓ |
181 lines elided |
↑ open up ↑ |
621 623 ForkJoinTask<?>[] q = queue;
622 624 if (q != null) {
623 625 int s;
624 626 while ((s = sp) != base) {
625 627 int i = (q.length - 1) & --s;
626 628 long u = (i << qShift) + qBase; // raw offset
627 629 ForkJoinTask<?> t = q[i];
628 630 if (t == null) // lost to stealer
629 631 break;
630 632 if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
633 + /*
634 + * Note: here and in related methods, as a
635 + * performance (not correctness) issue, we'd like
636 + * to encourage compiler not to arbitrarily
637 + * postpone setting sp after successful CAS.
638 + * Currently there is no intrinsic for arranging
639 + * this, but using Unsafe putOrderedInt may be a
640 + * preferable strategy on some compilers even
641 + * though its main effect is a pre-, not post-
642 + * fence. To simplify possible changes, the option
643 + * is left in comments next to the associated
644 + * assignments.
645 + */
631 646 sp = s; // putOrderedInt may encourage more timely write
632 647 // UNSAFE.putOrderedInt(this, spOffset, s);
633 648 return t;
634 649 }
635 650 }
636 651 }
637 652 return null;
638 653 }
639 654
640 655 /**
641 656 * Specialized version of popTask to pop only if topmost element
642 657 * is the given task. Called only by this thread while active.
643 658 *
644 659 * @param t the task. Caller must ensure non-null.
645 660 */
646 661 final boolean unpushTask(ForkJoinTask<?> t) {
647 662 int s;
648 663 ForkJoinTask<?>[] q = queue;
649 664 if ((s = sp) != base && q != null &&
650 665 UNSAFE.compareAndSwapObject
651 666 (q, (((q.length - 1) & --s) << qShift) + qBase, t, null)) {
652 667 sp = s; // putOrderedInt may encourage more timely write
653 668 // UNSAFE.putOrderedInt(this, spOffset, s);
654 669 return true;
655 670 }
656 671 return false;
657 672 }
658 673
659 674 /**
660 675 * Returns next task, or null if empty or contended.
661 676 */
662 677 final ForkJoinTask<?> peekTask() {
663 678 ForkJoinTask<?>[] q = queue;
664 679 if (q == null)
665 680 return null;
666 681 int mask = q.length - 1;
667 682 int i = locallyFifo ? base : (sp - 1);
668 683 return q[i & mask];
669 684 }
670 685
671 686 /**
672 687 * Doubles queue array size. Transfers elements by emulating
673 688 * steals (deqs) from old array and placing, oldest first, into
674 689 * new array.
675 690 */
676 691 private void growQueue() {
677 692 ForkJoinTask<?>[] oldQ = queue;
678 693 int oldSize = oldQ.length;
679 694 int newSize = oldSize << 1;
680 695 if (newSize > MAXIMUM_QUEUE_CAPACITY)
681 696 throw new RejectedExecutionException("Queue capacity exceeded");
682 697 ForkJoinTask<?>[] newQ = queue = new ForkJoinTask<?>[newSize];
683 698
684 699 int b = base;
685 700 int bf = b + oldSize;
686 701 int oldMask = oldSize - 1;
687 702 int newMask = newSize - 1;
688 703 do {
689 704 int oldIndex = b & oldMask;
690 705 ForkJoinTask<?> t = oldQ[oldIndex];
691 706 if (t != null && !casSlotNull(oldQ, oldIndex, t))
692 707 t = null;
693 708 writeSlot(newQ, b & newMask, t);
694 709 } while (++b != bf);
695 710 pool.signalWork();
696 711 }
697 712
698 713 /**
699 714 * Computes next value for random victim probe in scan(). Scans
700 715 * don't require a very high quality generator, but also not a
701 716 * crummy one. Marsaglia xor-shift is cheap and works well enough.
702 717 * Note: This is manually inlined in scan().
703 718 */
704 719 private static final int xorShift(int r) {
705 720 r ^= r << 13;
706 721 r ^= r >>> 17;
707 722 return r ^ (r << 5);
708 723 }
709 724
710 725 /**
711 726 * Tries to steal a task from another worker. Starts at a random
712 727 * index of workers array, and probes workers until finding one
713 728 * with non-empty queue or finding that all are empty. It
714 729 * randomly selects the first n probes. If these are empty, it
715 730 * resorts to a circular sweep, which is necessary to accurately
716 731 * set active status. (The circular sweep uses steps of
717 732 * approximately half the array size plus 1, to avoid bias
718 733 * stemming from leftmost packing of the array in ForkJoinPool.)
719 734 *
720 735 * This method must be both fast and quiet -- usually avoiding
721 736 * memory accesses that could disrupt cache sharing etc other than
722 737 * those needed to check for and take tasks (or to activate if not
723 738 * already active). This accounts for, among other things,
724 739 * updating random seed in place without storing it until exit.
725 740 *
726 741 * @return a task, or null if none found
727 742 */
728 743 private ForkJoinTask<?> scan() {
729 744 ForkJoinPool p = pool;
730 745 ForkJoinWorkerThread[] ws; // worker array
731 746 int n; // upper bound of #workers
732 747 if ((ws = p.workers) != null && (n = ws.length) > 1) {
733 748 boolean canSteal = active; // shadow active status
734 749 int r = seed; // extract seed once
735 750 int mask = n - 1;
736 751 int j = -n; // loop counter
737 752 int k = r; // worker index, random if j < 0
738 753 for (;;) {
739 754 ForkJoinWorkerThread v = ws[k & mask];
740 755 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // inline xorshift
741 756 ForkJoinTask<?>[] q; ForkJoinTask<?> t; int b, a;
742 757 if (v != null && (b = v.base) != v.sp &&
743 758 (q = v.queue) != null) {
744 759 int i = (q.length - 1) & b;
745 760 long u = (i << qShift) + qBase; // raw offset
746 761 int pid = poolIndex;
747 762 if ((t = q[i]) != null) {
748 763 if (!canSteal && // inline p.tryIncrementActiveCount
749 764 UNSAFE.compareAndSwapInt(p, poolRunStateOffset,
750 765 a = p.runState, a + 1))
751 766 canSteal = active = true;
752 767 if (canSteal && v.base == b++ &&
753 768 UNSAFE.compareAndSwapObject(q, u, t, null)) {
754 769 v.base = b;
755 770 v.stealHint = pid;
756 771 UNSAFE.putOrderedObject(this,
757 772 currentStealOffset, t);
758 773 seed = r;
759 774 ++stealCount;
760 775 return t;
761 776 }
762 777 }
763 778 j = -n;
764 779 k = r; // restart on contention
765 780 }
766 781 else if (++j <= 0)
767 782 k = r;
768 783 else if (j <= n)
769 784 k += (n >>> 1) | 1;
↓ open down ↓ |
129 lines elided |
↑ open up ↑ |
770 785 else
771 786 break;
772 787 }
773 788 }
774 789 return null;
775 790 }
776 791
777 792 // Run State management
778 793
779 794 // status check methods used mainly by ForkJoinPool
780 - final boolean isRunning() { return runState == 0; }
781 - final boolean isTerminated() { return (runState & TERMINATED) != 0; }
782 - final boolean isSuspended() { return (runState & SUSPENDED) != 0; }
783 - final boolean isTrimmed() { return (runState & TRIMMED) != 0; }
795 + final boolean isRunning() { return runState == 0; }
796 + final boolean isTerminated() { return (runState & TERMINATED) != 0; }
797 + final boolean isSuspended() { return (runState & SUSPENDED) != 0; }
798 + final boolean isTrimmed() { return (runState & TRIMMED) != 0; }
784 799
785 800 final boolean isTerminating() {
786 801 if ((runState & TERMINATING) != 0)
787 802 return true;
788 803 if (pool.isAtLeastTerminating()) { // propagate pool state
789 804 shutdown();
790 805 return true;
791 806 }
792 807 return false;
793 808 }
794 809
795 810 /**
796 811 * Sets state to TERMINATING. Does NOT unpark or interrupt
797 812 * to wake up if currently blocked. Callers must do so if desired.
798 813 */
799 814 final void shutdown() {
800 815 for (;;) {
801 816 int s = runState;
802 817 if ((s & (TERMINATING|TERMINATED)) != 0)
803 818 break;
804 819 if ((s & SUSPENDED) != 0) { // kill and wakeup if suspended
805 820 if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
806 821 (s & ~SUSPENDED) |
807 822 (TRIMMED|TERMINATING)))
808 823 break;
809 824 }
810 825 else if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
811 826 s | TERMINATING))
812 827 break;
813 828 }
814 829 }
815 830
816 831 /**
817 832 * Sets state to TERMINATED. Called only by onTermination().
818 833 */
819 834 private void setTerminated() {
820 835 int s;
821 836 do {} while (!UNSAFE.compareAndSwapInt(this, runStateOffset,
822 837 s = runState,
823 838 s | (TERMINATING|TERMINATED)));
824 839 }
825 840
826 841 /**
827 842 * If suspended, tries to set status to unsuspended.
828 843 * Does NOT wake up if blocked.
829 844 *
830 845 * @return true if successful
831 846 */
832 847 final boolean tryUnsuspend() {
833 848 int s;
834 849 while (((s = runState) & SUSPENDED) != 0) {
835 850 if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
836 851 s & ~SUSPENDED))
837 852 return true;
838 853 }
839 854 return false;
840 855 }
841 856
842 857 /**
843 858 * Sets suspended status and blocks as spare until resumed
844 859 * or shutdown.
845 860 */
846 861 final void suspendAsSpare() {
847 862 for (;;) { // set suspended unless terminating
848 863 int s = runState;
849 864 if ((s & TERMINATING) != 0) { // must kill
850 865 if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
851 866 s | (TRIMMED | TERMINATING)))
852 867 return;
853 868 }
854 869 else if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
855 870 s | SUSPENDED))
856 871 break;
857 872 }
858 873 ForkJoinPool p = pool;
859 874 p.pushSpare(this);
860 875 while ((runState & SUSPENDED) != 0) {
861 876 if (p.tryAccumulateStealCount(this)) {
862 877 interrupted(); // clear/ignore interrupts
863 878 if ((runState & SUSPENDED) == 0)
864 879 break;
865 880 LockSupport.park(this);
866 881 }
867 882 }
868 883 }
869 884
870 885 // Misc support methods for ForkJoinPool
871 886
872 887 /**
873 888 * Returns an estimate of the number of tasks in the queue. Also
874 889 * used by ForkJoinTask.
875 890 */
876 891 final int getQueueSize() {
↓ open down ↓ |
83 lines elided |
↑ open up ↑ |
877 892 int n; // external calls must read base first
878 893 return (n = -base + sp) <= 0 ? 0 : n;
879 894 }
880 895
881 896 /**
882 897 * Removes and cancels all tasks in queue. Can be called from any
883 898 * thread.
884 899 */
885 900 final void cancelTasks() {
886 901 ForkJoinTask<?> cj = currentJoin; // try to cancel ongoing tasks
887 - if (cj != null) {
888 - currentJoin = null;
902 + if (cj != null && cj.status >= 0) {
889 903 cj.cancelIgnoringExceptions();
890 904 try {
891 905 this.interrupt(); // awaken wait
892 906 } catch (SecurityException ignore) {
893 907 }
894 908 }
895 909 ForkJoinTask<?> cs = currentSteal;
896 - if (cs != null) {
897 - currentSteal = null;
910 + if (cs != null && cs.status >= 0)
898 911 cs.cancelIgnoringExceptions();
899 - }
900 912 while (base != sp) {
901 913 ForkJoinTask<?> t = deqTask();
902 914 if (t != null)
903 915 t.cancelIgnoringExceptions();
904 916 }
905 917 }
906 918
907 919 /**
908 920 * Drains tasks to given collection c.
909 921 *
910 922 * @return the number of tasks drained
911 923 */
912 924 final int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
913 925 int n = 0;
914 926 while (base != sp) {
915 927 ForkJoinTask<?> t = deqTask();
916 928 if (t != null) {
917 929 c.add(t);
918 930 ++n;
919 931 }
920 932 }
921 933 return n;
922 934 }
923 935
924 936 // Support methods for ForkJoinTask
925 937
926 938 /**
927 939 * Gets and removes a local task.
928 940 *
929 941 * @return a task, if available
930 942 */
931 943 final ForkJoinTask<?> pollLocalTask() {
932 944 ForkJoinPool p = pool;
933 945 while (sp != base) {
934 946 int a; // inline p.tryIncrementActiveCount
935 947 if (active ||
936 948 (active = UNSAFE.compareAndSwapInt(p, poolRunStateOffset,
937 949 a = p.runState, a + 1)))
938 950 return locallyFifo ? locallyDeqTask() : popTask();
939 951 }
940 952 return null;
941 953 }
942 954
943 955 /**
944 956 * Gets and removes a local or stolen task.
945 957 *
946 958 * @return a task, if available
947 959 */
948 960 final ForkJoinTask<?> pollTask() {
949 961 ForkJoinTask<?> t = pollLocalTask();
950 962 if (t == null) {
951 963 t = scan();
↓ open down ↓ |
42 lines elided |
↑ open up ↑ |
952 964 // cannot retain/track/help steal
953 965 UNSAFE.putOrderedObject(this, currentStealOffset, null);
954 966 }
955 967 return t;
956 968 }
957 969
958 970 /**
959 971 * Possibly runs some tasks and/or blocks, until task is done.
960 972 *
961 973 * @param joinMe the task to join
974 + * @param timed true if use timed wait
975 + * @param nanos wait time if timed
962 976 */
963 - final void joinTask(ForkJoinTask<?> joinMe) {
977 + final void joinTask(ForkJoinTask<?> joinMe, boolean timed, long nanos) {
964 978 // currentJoin only written by this thread; only need ordered store
965 979 ForkJoinTask<?> prevJoin = currentJoin;
966 980 UNSAFE.putOrderedObject(this, currentJoinOffset, joinMe);
967 - if (sp != base)
968 - localHelpJoinTask(joinMe);
969 - if (joinMe.status >= 0)
970 - pool.awaitJoin(joinMe, this);
981 + pool.awaitJoin(joinMe, this, timed, nanos);
971 982 UNSAFE.putOrderedObject(this, currentJoinOffset, prevJoin);
972 983 }
973 984
974 985 /**
975 - * Run tasks in local queue until given task is done.
986 + * Tries to locate and help perform tasks for a stealer of the
987 + * given task, or in turn one of its stealers. Traces
988 + * currentSteal->currentJoin links looking for a thread working on
989 + * a descendant of the given task and with a non-empty queue to
990 + * steal back and execute tasks from.
976 991 *
977 - * @param joinMe the task to join
978 - */
979 - private void localHelpJoinTask(ForkJoinTask<?> joinMe) {
980 - int s;
981 - ForkJoinTask<?>[] q;
982 - while (joinMe.status >= 0 && (s = sp) != base && (q = queue) != null) {
983 - int i = (q.length - 1) & --s;
984 - long u = (i << qShift) + qBase; // raw offset
985 - ForkJoinTask<?> t = q[i];
986 - if (t == null) // lost to a stealer
987 - break;
988 - if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
989 - /*
990 - * This recheck (and similarly in helpJoinTask)
991 - * handles cases where joinMe is independently
992 - * cancelled or forced even though there is other work
993 - * available. Back out of the pop by putting t back
994 - * into slot before we commit by writing sp.
995 - */
996 - if (joinMe.status < 0) {
997 - UNSAFE.putObjectVolatile(q, u, t);
998 - break;
999 - }
1000 - sp = s;
1001 - // UNSAFE.putOrderedInt(this, spOffset, s);
1002 - t.quietlyExec();
1003 - }
1004 - }
1005 - }
1006 -
1007 - /**
1008 - * Unless terminating, tries to locate and help perform tasks for
1009 - * a stealer of the given task, or in turn one of its stealers.
1010 - * Traces currentSteal->currentJoin links looking for a thread
1011 - * working on a descendant of the given task and with a non-empty
1012 - * queue to steal back and execute tasks from.
1013 - *
1014 992 * The implementation is very branchy to cope with potential
1015 993 * inconsistencies or loops encountering chains that are stale,
1016 994 * unknown, or of length greater than MAX_HELP_DEPTH links. All
1017 995 * of these cases are dealt with by just returning back to the
1018 996 * caller, who is expected to retry if other join mechanisms also
1019 997 * don't work out.
1020 998 *
1021 999 * @param joinMe the task to join
1000 + * @param running if false, then must update pool count upon
1001 + * running a task
1002 + * @return value of running on exit
1022 1003 */
1023 - final void helpJoinTask(ForkJoinTask<?> joinMe) {
1024 - ForkJoinWorkerThread[] ws;
1025 - int n;
1026 - if (joinMe.status < 0) // already done
1027 - return;
1028 - if ((runState & TERMINATING) != 0) { // cancel if shutting down
1029 - joinMe.cancelIgnoringExceptions();
1030 - return;
1004 + final boolean helpJoinTask(ForkJoinTask<?> joinMe, boolean running) {
1005 + /*
1006 + * Initial checks to (1) abort if terminating; (2) clean out
1007 + * old cancelled tasks from local queue; (3) if joinMe is next
1008 + * task, run it; (4) omit scan if local queue nonempty (since
1009 + * it may contain non-descendents of joinMe).
1010 + */
1011 + ForkJoinPool p = pool;
1012 + for (;;) {
1013 + ForkJoinTask<?>[] q;
1014 + int s;
1015 + if (joinMe.status < 0)
1016 + return running;
1017 + else if ((runState & TERMINATING) != 0) {
1018 + joinMe.cancelIgnoringExceptions();
1019 + return running;
1020 + }
1021 + else if ((s = sp) == base || (q = queue) == null)
1022 + break; // queue empty
1023 + else {
1024 + int i = (q.length - 1) & --s;
1025 + long u = (i << qShift) + qBase; // raw offset
1026 + ForkJoinTask<?> t = q[i];
1027 + if (t == null)
1028 + break; // lost to a stealer
1029 + else if (t != joinMe && t.status >= 0)
1030 + return running; // cannot safely help
1031 + else if ((running ||
1032 + (running = p.tryIncrementRunningCount())) &&
1033 + UNSAFE.compareAndSwapObject(q, u, t, null)) {
1034 + sp = s; // putOrderedInt may encourage more timely write
1035 + // UNSAFE.putOrderedInt(this, spOffset, s);
1036 + t.quietlyExec();
1037 + }
1038 + }
1031 1039 }
1032 - if ((ws = pool.workers) == null || (n = ws.length) <= 1)
1033 - return; // need at least 2 workers
1034 1040
1035 - ForkJoinTask<?> task = joinMe; // base of chain
1036 - ForkJoinWorkerThread thread = this; // thread with stolen task
1037 - for (int d = 0; d < MAX_HELP_DEPTH; ++d) { // chain length
1038 - // Try to find v, the stealer of task, by first using hint
1039 - ForkJoinWorkerThread v = ws[thread.stealHint & (n - 1)];
1040 - if (v == null || v.currentSteal != task) {
1041 - for (int j = 0; ; ++j) { // search array
1042 - if (j < n) {
1043 - ForkJoinTask<?> vs;
1044 - if ((v = ws[j]) != null &&
1045 - (vs = v.currentSteal) != null) {
1046 - if (joinMe.status < 0 || task.status < 0)
1047 - return; // stale or done
1048 - if (vs == task) {
1049 - thread.stealHint = j;
1050 - break; // save hint for next time
1041 + int n; // worker array size
1042 + ForkJoinWorkerThread[] ws = p.workers;
1043 + if (ws != null && (n = ws.length) > 1) { // need at least 2 workers
1044 + ForkJoinTask<?> task = joinMe; // base of chain
1045 + ForkJoinWorkerThread thread = this; // thread with stolen task
1046 +
1047 + outer:for (int d = 0; d < MAX_HELP_DEPTH; ++d) { // chain length
1048 + // Try to find v, the stealer of task, by first using hint
1049 + ForkJoinWorkerThread v = ws[thread.stealHint & (n - 1)];
1050 + if (v == null || v.currentSteal != task) {
1051 + for (int j = 0; ; ++j) { // search array
1052 + if (j < n) {
1053 + ForkJoinTask<?> vs;
1054 + if ((v = ws[j]) != null &&
1055 + (vs = v.currentSteal) != null) {
1056 + if (joinMe.status < 0)
1057 + break outer;
1058 + if (vs == task) {
1059 + if (task.status < 0)
1060 + break outer; // stale
1061 + thread.stealHint = j;
1062 + break; // save hint for next time
1063 + }
1051 1064 }
1052 1065 }
1066 + else
1067 + break outer; // no stealer
1053 1068 }
1054 - else
1055 - return; // no stealer
1056 1069 }
1057 - }
1058 - for (;;) { // Try to help v, using specialized form of deqTask
1059 - if (joinMe.status < 0)
1060 - return;
1061 - int b = v.base;
1062 - ForkJoinTask<?>[] q = v.queue;
1063 - if (b == v.sp || q == null)
1064 - break;
1065 - int i = (q.length - 1) & b;
1066 - long u = (i << qShift) + qBase;
1067 - ForkJoinTask<?> t = q[i];
1068 - int pid = poolIndex;
1069 - ForkJoinTask<?> ps = currentSteal;
1070 - if (task.status < 0)
1071 - return; // stale or done
1072 - if (t != null && v.base == b++ &&
1073 - UNSAFE.compareAndSwapObject(q, u, t, null)) {
1074 - if (joinMe.status < 0) {
1075 - UNSAFE.putObjectVolatile(q, u, t);
1076 - return; // back out on cancel
1070 +
1071 + // Try to help v, using specialized form of deqTask
1072 + for (;;) {
1073 + if (joinMe.status < 0)
1074 + break outer;
1075 + int b = v.base;
1076 + ForkJoinTask<?>[] q = v.queue;
1077 + if (b == v.sp || q == null)
1078 + break; // empty
1079 + int i = (q.length - 1) & b;
1080 + long u = (i << qShift) + qBase;
1081 + ForkJoinTask<?> t = q[i];
1082 + if (task.status < 0)
1083 + break outer; // stale
1084 + if (t != null &&
1085 + (running ||
1086 + (running = p.tryIncrementRunningCount())) &&
1087 + v.base == b++ &&
1088 + UNSAFE.compareAndSwapObject(q, u, t, null)) {
1089 + if (t != joinMe && joinMe.status < 0) {
1090 + UNSAFE.putObjectVolatile(q, u, t);
1091 + break outer; // joinMe cancelled; back out
1092 + }
1093 + v.base = b;
1094 + if (t.status >= 0) {
1095 + ForkJoinTask<?> ps = currentSteal;
1096 + int pid = poolIndex;
1097 + v.stealHint = pid;
1098 + UNSAFE.putOrderedObject(this,
1099 + currentStealOffset, t);
1100 + t.quietlyExec();
1101 + UNSAFE.putOrderedObject(this,
1102 + currentStealOffset, ps);
1103 + }
1077 1104 }
1078 - v.base = b;
1079 - v.stealHint = pid;
1080 - UNSAFE.putOrderedObject(this, currentStealOffset, t);
1081 - t.quietlyExec();
1082 - UNSAFE.putOrderedObject(this, currentStealOffset, ps);
1105 + else if ((runState & TERMINATING) != 0) {
1106 + joinMe.cancelIgnoringExceptions();
1107 + break outer;
1108 + }
1083 1109 }
1110 +
1111 + // Try to descend to find v's stealer
1112 + ForkJoinTask<?> next = v.currentJoin;
1113 + if (task.status < 0 || next == null || next == task ||
1114 + joinMe.status < 0)
1115 + break; // done, stale, dead-end, or cyclic
1116 + task = next;
1117 + thread = v;
1084 1118 }
1085 - // Try to descend to find v's stealer
1086 - ForkJoinTask<?> next = v.currentJoin;
1087 - if (task.status < 0 || next == null || next == task ||
1088 - joinMe.status < 0)
1089 - return;
1090 - task = next;
1091 - thread = v;
1092 1119 }
1120 + return running;
1093 1121 }
1094 1122
1095 1123 /**
1096 1124 * Implements ForkJoinTask.getSurplusQueuedTaskCount().
1097 1125 * Returns an estimate of the number of tasks, offset by a
1098 1126 * function of number of idle workers.
1099 1127 *
1100 1128 * This method provides a cheap heuristic guide for task
1101 1129 * partitioning when programmers, frameworks, tools, or languages
1102 1130 * have little or no idea about task granularity. In essence by
1103 1131 * offering this method, we ask users only about tradeoffs in
1104 1132 * overhead vs expected throughput and its variance, rather than
1105 1133 * how finely to partition tasks.
1106 1134 *
1107 1135 * In a steady state strict (tree-structured) computation, each
1108 1136 * thread makes available for stealing enough tasks for other
1109 1137 * threads to remain active. Inductively, if all threads play by
1110 1138 * the same rules, each thread should make available only a
1111 1139 * constant number of tasks.
1112 1140 *
1113 1141 * The minimum useful constant is just 1. But using a value of 1
1114 1142 * would require immediate replenishment upon each steal to
1115 1143 * maintain enough tasks, which is infeasible. Further,
1116 1144 * partitionings/granularities of offered tasks should minimize
1117 1145 * steal rates, which in general means that threads nearer the top
1118 1146 * of computation tree should generate more than those nearer the
1119 1147 * bottom. In perfect steady state, each thread is at
1120 1148 * approximately the same level of computation tree. However,
1121 1149 * producing extra tasks amortizes the uncertainty of progress and
1122 1150 * diffusion assumptions.
1123 1151 *
1124 1152 * So, users will want to use values larger, but not much larger
1125 1153 * than 1 to both smooth over transient shortages and hedge
1126 1154 * against uneven progress; as traded off against the cost of
1127 1155 * extra task overhead. We leave the user to pick a threshold
1128 1156 * value to compare with the results of this call to guide
1129 1157 * decisions, but recommend values such as 3.
1130 1158 *
1131 1159 * When all threads are active, it is on average OK to estimate
1132 1160 * surplus strictly locally. In steady-state, if one thread is
1133 1161 * maintaining say 2 surplus tasks, then so are others. So we can
1134 1162 * just use estimated queue length (although note that (sp - base)
1135 1163 * can be an overestimate because of stealers lagging increments
1136 1164 * of base). However, this strategy alone leads to serious
1137 1165 * mis-estimates in some non-steady-state conditions (ramp-up,
1138 1166 * ramp-down, other stalls). We can detect many of these by
1139 1167 * further considering the number of "idle" threads, that are
1140 1168 * known to have zero queued tasks, so compensate by a factor of
1141 1169 * (#idle/#active) threads.
1142 1170 */
1143 1171 final int getEstimatedSurplusTaskCount() {
1144 1172 return sp - base - pool.idlePerActive();
1145 1173 }
1146 1174
1147 1175 /**
1148 1176 * Runs tasks until {@code pool.isQuiescent()}.
1149 1177 */
1150 1178 final void helpQuiescePool() {
1151 1179 ForkJoinTask<?> ps = currentSteal; // to restore below
1152 1180 for (;;) {
1153 1181 ForkJoinTask<?> t = pollLocalTask();
1154 1182 if (t != null || (t = scan()) != null)
1155 1183 t.quietlyExec();
1156 1184 else {
1157 1185 ForkJoinPool p = pool;
1158 1186 int a; // to inline CASes
1159 1187 if (active) {
1160 1188 if (!UNSAFE.compareAndSwapInt
1161 1189 (p, poolRunStateOffset, a = p.runState, a - 1))
1162 1190 continue; // retry later
1163 1191 active = false; // inactivate
1164 1192 UNSAFE.putOrderedObject(this, currentStealOffset, ps);
1165 1193 }
1166 1194 if (p.isQuiescent()) {
1167 1195 active = true; // re-activate
1168 1196 do {} while (!UNSAFE.compareAndSwapInt
1169 1197 (p, poolRunStateOffset, a = p.runState, a+1));
1170 1198 return;
1171 1199 }
1172 1200 }
1173 1201 }
1174 1202 }
1175 1203
1176 1204 // Unsafe mechanics
1177 1205
1178 1206 private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
1179 1207 private static final long spOffset =
1180 1208 objectFieldOffset("sp", ForkJoinWorkerThread.class);
1181 1209 private static final long runStateOffset =
1182 1210 objectFieldOffset("runState", ForkJoinWorkerThread.class);
1183 1211 private static final long currentJoinOffset =
1184 1212 objectFieldOffset("currentJoin", ForkJoinWorkerThread.class);
1185 1213 private static final long currentStealOffset =
1186 1214 objectFieldOffset("currentSteal", ForkJoinWorkerThread.class);
1187 1215 private static final long qBase =
1188 1216 UNSAFE.arrayBaseOffset(ForkJoinTask[].class);
1189 1217 private static final long poolRunStateOffset = // to inline CAS
1190 1218 objectFieldOffset("runState", ForkJoinPool.class);
1191 1219
1192 1220 private static final int qShift;
1193 1221
1194 1222 static {
1195 1223 int s = UNSAFE.arrayIndexScale(ForkJoinTask[].class);
1196 1224 if ((s & (s-1)) != 0)
1197 1225 throw new Error("data type scale not a power of two");
1198 1226 qShift = 31 - Integer.numberOfLeadingZeros(s);
1199 1227 MAXIMUM_QUEUE_CAPACITY = 1 << (31 - qShift);
1200 1228 }
1201 1229
1202 1230 private static long objectFieldOffset(String field, Class<?> klazz) {
1203 1231 try {
1204 1232 return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
1205 1233 } catch (NoSuchFieldException e) {
1206 1234 // Convert Exception to corresponding Error
1207 1235 NoSuchFieldError error = new NoSuchFieldError(field);
1208 1236 error.initCause(e);
1209 1237 throw error;
1210 1238 }
1211 1239 }
1212 1240 }
↓ open down ↓ |
110 lines elided |
↑ open up ↑ |
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX