1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent; 37 38 import java.util.Collection; 39 import java.util.concurrent.RejectedExecutionException; 40 41 /** 42 * A thread managed by a {@link ForkJoinPool}, which executes 43 * {@link ForkJoinTask}s. 44 * This class is subclassable solely for the sake of adding 45 * functionality -- there are no overridable methods dealing with 46 * scheduling or execution. However, you can override initialization 47 * and termination methods surrounding the main task processing loop. 48 * If you do create such a subclass, you will also need to supply a 49 * custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to use it 50 * in a {@code ForkJoinPool}. 51 * 52 * @since 1.7 53 * @author Doug Lea 54 */ 55 public class ForkJoinWorkerThread extends Thread { 56 /* 57 * Overview: 58 * 59 * ForkJoinWorkerThreads are managed by ForkJoinPools and perform 60 * ForkJoinTasks. This class includes bookkeeping in support of 61 * worker activation, suspension, and lifecycle control described 62 * in more detail in the internal documentation of class 63 * ForkJoinPool. And as described further below, this class also 64 * includes special-cased support for some ForkJoinTask 65 * methods. But the main mechanics involve work-stealing: 66 * 67 * Work-stealing queues are special forms of Deques that support 68 * only three of the four possible end-operations -- push, pop, 69 * and deq (aka steal), under the further constraints that push 70 * and pop are called only from the owning thread, while deq may 71 * be called from other threads. (If you are unfamiliar with 72 * them, you probably want to read Herlihy and Shavit's book "The 73 * Art of Multiprocessor programming", chapter 16 describing these 74 * in more detail before proceeding.) The main work-stealing 75 * queue design is roughly similar to those in the papers "Dynamic 76 * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005 77 * (http://research.sun.com/scalable/pubs/index.html) and 78 * "Idempotent work stealing" by Michael, Saraswat, and Vechev, 79 * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186). 80 * The main differences ultimately stem from gc requirements that 81 * we null out taken slots as soon as we can, to maintain as small 82 * a footprint as possible even in programs generating huge 83 * numbers of tasks. To accomplish this, we shift the CAS 84 * arbitrating pop vs deq (steal) from being on the indices 85 * ("queueBase" and "queueTop") to the slots themselves (mainly 86 * via method "casSlotNull()"). So, both a successful pop and deq 87 * mainly entail a CAS of a slot from non-null to null. Because 88 * we rely on CASes of references, we do not need tag bits on 89 * queueBase or queueTop. They are simple ints as used in any 90 * circular array-based queue (see for example ArrayDeque). 91 * Updates to the indices must still be ordered in a way that 92 * guarantees that queueTop == queueBase means the queue is empty, 93 * but otherwise may err on the side of possibly making the queue 94 * appear nonempty when a push, pop, or deq have not fully 95 * committed. Note that this means that the deq operation, 96 * considered individually, is not wait-free. One thief cannot 97 * successfully continue until another in-progress one (or, if 98 * previously empty, a push) completes. However, in the 99 * aggregate, we ensure at least probabilistic non-blockingness. 100 * If an attempted steal fails, a thief always chooses a different 101 * random victim target to try next. So, in order for one thief to 102 * progress, it suffices for any in-progress deq or new push on 103 * any empty queue to complete. 104 * 105 * This approach also enables support for "async mode" where local 106 * task processing is in FIFO, not LIFO order; simply by using a 107 * version of deq rather than pop when locallyFifo is true (as set 108 * by the ForkJoinPool). This allows use in message-passing 109 * frameworks in which tasks are never joined. However neither 110 * mode considers affinities, loads, cache localities, etc, so 111 * rarely provide the best possible performance on a given 112 * machine, but portably provide good throughput by averaging over 113 * these factors. (Further, even if we did try to use such 114 * information, we do not usually have a basis for exploiting 115 * it. For example, some sets of tasks profit from cache 116 * affinities, but others are harmed by cache pollution effects.) 117 * 118 * When a worker would otherwise be blocked waiting to join a 119 * task, it first tries a form of linear helping: Each worker 120 * records (in field currentSteal) the most recent task it stole 121 * from some other worker. Plus, it records (in field currentJoin) 122 * the task it is currently actively joining. Method joinTask uses 123 * these markers to try to find a worker to help (i.e., steal back 124 * a task from and execute it) that could hasten completion of the 125 * actively joined task. In essence, the joiner executes a task 126 * that would be on its own local deque had the to-be-joined task 127 * not been stolen. This may be seen as a conservative variant of 128 * the approach in Wagner & Calder "Leapfrogging: a portable 129 * technique for implementing efficient futures" SIGPLAN Notices, 130 * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs 131 * in that: (1) We only maintain dependency links across workers 132 * upon steals, rather than use per-task bookkeeping. This may 133 * require a linear scan of workers array to locate stealers, but 134 * usually doesn't because stealers leave hints (that may become 135 * stale/wrong) of where to locate them. This isolates cost to 136 * when it is needed, rather than adding to per-task overhead. 137 * (2) It is "shallow", ignoring nesting and potentially cyclic 138 * mutual steals. (3) It is intentionally racy: field currentJoin 139 * is updated only while actively joining, which means that we 140 * miss links in the chain during long-lived tasks, GC stalls etc 141 * (which is OK since blocking in such cases is usually a good 142 * idea). (4) We bound the number of attempts to find work (see 143 * MAX_HELP) and fall back to suspending the worker and if 144 * necessary replacing it with another. 145 * 146 * Efficient implementation of these algorithms currently relies 147 * on an uncomfortable amount of "Unsafe" mechanics. To maintain 148 * correct orderings, reads and writes of variable queueBase 149 * require volatile ordering. Variable queueTop need not be 150 * volatile because non-local reads always follow those of 151 * queueBase. Similarly, because they are protected by volatile 152 * queueBase reads, reads of the queue array and its slots by 153 * other threads do not need volatile load semantics, but writes 154 * (in push) require store order and CASes (in pop and deq) 155 * require (volatile) CAS semantics. (Michael, Saraswat, and 156 * Vechev's algorithm has similar properties, but without support 157 * for nulling slots.) Since these combinations aren't supported 158 * using ordinary volatiles, the only way to accomplish these 159 * efficiently is to use direct Unsafe calls. (Using external 160 * AtomicIntegers and AtomicReferenceArrays for the indices and 161 * array is significantly slower because of memory locality and 162 * indirection effects.) 163 * 164 * Further, performance on most platforms is very sensitive to 165 * placement and sizing of the (resizable) queue array. Even 166 * though these queues don't usually become all that big, the 167 * initial size must be large enough to counteract cache 168 * contention effects across multiple queues (especially in the 169 * presence of GC cardmarking). Also, to improve thread-locality, 170 * queues are initialized after starting. 171 */ 172 173 /** 174 * Mask for pool indices encoded as shorts 175 */ 176 private static final int SMASK = 0xffff; 177 178 /** 179 * Capacity of work-stealing queue array upon initialization. 180 * Must be a power of two. Initial size must be at least 4, but is 181 * padded to minimize cache effects. 182 */ 183 private static final int INITIAL_QUEUE_CAPACITY = 1 << 13; 184 185 /** 186 * Maximum size for queue array. Must be a power of two 187 * less than or equal to 1 << (31 - width of array entry) to 188 * ensure lack of index wraparound, but is capped at a lower 189 * value to help users trap runaway computations. 190 */ 191 private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 24; // 16M 192 193 /** 194 * The work-stealing queue array. Size must be a power of two. 195 * Initialized when started (as oposed to when constructed), to 196 * improve memory locality. 197 */ 198 ForkJoinTask<?>[] queue; 199 200 /** 201 * The pool this thread works in. Accessed directly by ForkJoinTask. 202 */ 203 final ForkJoinPool pool; 204 205 /** 206 * Index (mod queue.length) of next queue slot to push to or pop 207 * from. It is written only by owner thread, and accessed by other 208 * threads only after reading (volatile) queueBase. Both queueTop 209 * and queueBase are allowed to wrap around on overflow, but 210 * (queueTop - queueBase) still estimates size. 211 */ 212 int queueTop; 213 214 /** 215 * Index (mod queue.length) of least valid queue slot, which is 216 * always the next position to steal from if nonempty. 217 */ 218 volatile int queueBase; 219 220 /** 221 * The index of most recent stealer, used as a hint to avoid 222 * traversal in method helpJoinTask. This is only a hint because a 223 * worker might have had multiple steals and this only holds one 224 * of them (usually the most current). Declared non-volatile, 225 * relying on other prevailing sync to keep reasonably current. 226 */ 227 int stealHint; 228 229 /** 230 * Index of this worker in pool array. Set once by pool before 231 * running, and accessed directly by pool to locate this worker in 232 * its workers array. 233 */ 234 final int poolIndex; 235 236 /** 237 * Encoded record for pool task waits. Usages are always 238 * surrounded by volatile reads/writes 239 */ 240 int nextWait; 241 242 /** 243 * Complement of poolIndex, offset by count of entries of task 244 * waits. Accessed by ForkJoinPool to manage event waiters. 245 */ 246 volatile int eventCount; 247 248 /** 249 * Seed for random number generator for choosing steal victims. 250 * Uses Marsaglia xorshift. Must be initialized as nonzero. 251 */ 252 int seed; 253 254 /** 255 * Number of steals. Directly accessed (and reset) by pool when 256 * idle. 257 */ 258 int stealCount; 259 260 /** 261 * True if this worker should or did terminate 262 */ 263 volatile boolean terminate; 264 265 /** 266 * Set to true before LockSupport.park; false on return 267 */ 268 volatile boolean parked; 269 270 /** 271 * True if use local fifo, not default lifo, for local polling. 272 * Shadows value from ForkJoinPool. 273 */ 274 final boolean locallyFifo; 275 276 /** 277 * The task most recently stolen from another worker (or 278 * submission queue). All uses are surrounded by enough volatile 279 * reads/writes to maintain as non-volatile. 280 */ 281 ForkJoinTask<?> currentSteal; 282 283 /** 284 * The task currently being joined, set only when actively trying 285 * to help other stealers in helpJoinTask. All uses are surrounded 286 * by enough volatile reads/writes to maintain as non-volatile. 287 */ 288 ForkJoinTask<?> currentJoin; 289 290 /** 291 * Creates a ForkJoinWorkerThread operating in the given pool. 292 * 293 * @param pool the pool this thread works in 294 * @throws NullPointerException if pool is null 295 */ 296 protected ForkJoinWorkerThread(ForkJoinPool pool) { 297 super(pool.nextWorkerName()); 298 this.pool = pool; 299 int k = pool.registerWorker(this); 300 poolIndex = k; 301 eventCount = ~k & SMASK; // clear wait count 302 locallyFifo = pool.locallyFifo; 303 Thread.UncaughtExceptionHandler ueh = pool.ueh; 304 if (ueh != null) 305 setUncaughtExceptionHandler(ueh); 306 setDaemon(true); 307 } 308 309 // Public methods 310 311 /** 312 * Returns the pool hosting this thread. 313 * 314 * @return the pool 315 */ 316 public ForkJoinPool getPool() { 317 return pool; 318 } 319 320 /** 321 * Returns the index number of this thread in its pool. The 322 * returned value ranges from zero to the maximum number of 323 * threads (minus one) that have ever been created in the pool. 324 * This method may be useful for applications that track status or 325 * collect results per-worker rather than per-task. 326 * 327 * @return the index number 328 */ 329 public int getPoolIndex() { 330 return poolIndex; 331 } 332 333 // Randomization 334 335 /** 336 * Computes next value for random victim probes and backoffs. 337 * Scans don't require a very high quality generator, but also not 338 * a crummy one. Marsaglia xor-shift is cheap and works well 339 * enough. Note: This is manually inlined in FJP.scan() to avoid 340 * writes inside busy loops. 341 */ 342 private int nextSeed() { 343 int r = seed; 344 r ^= r << 13; 345 r ^= r >>> 17; 346 r ^= r << 5; 347 return seed = r; 348 } 349 350 // Run State management 351 352 /** 353 * Initializes internal state after construction but before 354 * processing any tasks. If you override this method, you must 355 * invoke {@code super.onStart()} at the beginning of the method. 356 * Initialization requires care: Most fields must have legal 357 * default values, to ensure that attempted accesses from other 358 * threads work correctly even before this thread starts 359 * processing tasks. 360 */ 361 protected void onStart() { 362 queue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY]; 363 int r = pool.workerSeedGenerator.nextInt(); 364 seed = (r == 0) ? 1 : r; // must be nonzero 365 } 366 367 /** 368 * Performs cleanup associated with termination of this worker 369 * thread. If you override this method, you must invoke 370 * {@code super.onTermination} at the end of the overridden method. 371 * 372 * @param exception the exception causing this thread to abort due 373 * to an unrecoverable error, or {@code null} if completed normally 374 */ 375 protected void onTermination(Throwable exception) { 376 try { 377 terminate = true; 378 cancelTasks(); 379 pool.deregisterWorker(this, exception); 380 } catch (Throwable ex) { // Shouldn't ever happen 381 if (exception == null) // but if so, at least rethrown 382 exception = ex; 383 } finally { 384 if (exception != null) 385 UNSAFE.throwException(exception); 386 } 387 } 388 389 /** 390 * This method is required to be public, but should never be 391 * called explicitly. It performs the main run loop to execute 392 * {@link ForkJoinTask}s. 393 */ 394 public void run() { 395 Throwable exception = null; 396 try { 397 onStart(); 398 pool.work(this); 399 } catch (Throwable ex) { 400 exception = ex; 401 } finally { 402 onTermination(exception); 403 } 404 } 405 406 /* 407 * Intrinsics-based atomic writes for queue slots. These are 408 * basically the same as methods in AtomicReferenceArray, but 409 * specialized for (1) ForkJoinTask elements (2) requirement that 410 * nullness and bounds checks have already been performed by 411 * callers and (3) effective offsets are known not to overflow 412 * from int to long (because of MAXIMUM_QUEUE_CAPACITY). We don't 413 * need corresponding version for reads: plain array reads are OK 414 * because they are protected by other volatile reads and are 415 * confirmed by CASes. 416 * 417 * Most uses don't actually call these methods, but instead 418 * contain inlined forms that enable more predictable 419 * optimization. We don't define the version of write used in 420 * pushTask at all, but instead inline there a store-fenced array 421 * slot write. 422 * 423 * Also in most methods, as a performance (not correctness) issue, 424 * we'd like to encourage compilers not to arbitrarily postpone 425 * setting queueTop after writing slot. Currently there is no 426 * intrinsic for arranging this, but using Unsafe putOrderedInt 427 * may be a preferable strategy on some compilers even though its 428 * main effect is a pre-, not post- fence. To simplify possible 429 * changes, the option is left in comments next to the associated 430 * assignments. 431 */ 432 433 /** 434 * CASes slot i of array q from t to null. Caller must ensure q is 435 * non-null and index is in range. 436 */ 437 private static final boolean casSlotNull(ForkJoinTask<?>[] q, int i, 438 ForkJoinTask<?> t) { 439 return UNSAFE.compareAndSwapObject(q, (i << ASHIFT) + ABASE, t, null); 440 } 441 442 /** 443 * Performs a volatile write of the given task at given slot of 444 * array q. Caller must ensure q is non-null and index is in 445 * range. This method is used only during resets and backouts. 446 */ 447 private static final void writeSlot(ForkJoinTask<?>[] q, int i, 448 ForkJoinTask<?> t) { 449 UNSAFE.putObjectVolatile(q, (i << ASHIFT) + ABASE, t); 450 } 451 452 // queue methods 453 454 /** 455 * Pushes a task. Call only from this thread. 456 * 457 * @param t the task. Caller must ensure non-null. 458 */ 459 final void pushTask(ForkJoinTask<?> t) { 460 ForkJoinTask<?>[] q; int s, m; 461 if ((q = queue) != null) { // ignore if queue removed 462 long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE; 463 UNSAFE.putOrderedObject(q, u, t); 464 queueTop = s + 1; // or use putOrderedInt 465 if ((s -= queueBase) <= 2) 466 pool.signalWork(); 467 else if (s == m) 468 growQueue(); 469 } 470 } 471 472 /** 473 * Creates or doubles queue array. Transfers elements by 474 * emulating steals (deqs) from old array and placing, oldest 475 * first, into new array. 476 */ 477 private void growQueue() { 478 ForkJoinTask<?>[] oldQ = queue; 479 int size = oldQ != null ? oldQ.length << 1 : INITIAL_QUEUE_CAPACITY; 480 if (size > MAXIMUM_QUEUE_CAPACITY) 481 throw new RejectedExecutionException("Queue capacity exceeded"); 482 if (size < INITIAL_QUEUE_CAPACITY) 483 size = INITIAL_QUEUE_CAPACITY; 484 ForkJoinTask<?>[] q = queue = new ForkJoinTask<?>[size]; 485 int mask = size - 1; 486 int top = queueTop; 487 int oldMask; 488 if (oldQ != null && (oldMask = oldQ.length - 1) >= 0) { 489 for (int b = queueBase; b != top; ++b) { 490 long u = ((b & oldMask) << ASHIFT) + ABASE; 491 Object x = UNSAFE.getObjectVolatile(oldQ, u); 492 if (x != null && UNSAFE.compareAndSwapObject(oldQ, u, x, null)) 493 UNSAFE.putObjectVolatile 494 (q, ((b & mask) << ASHIFT) + ABASE, x); 495 } 496 } 497 } 498 499 /** 500 * Tries to take a task from the base of the queue, failing if 501 * empty or contended. Note: Specializations of this code appear 502 * in locallyDeqTask and elsewhere. 503 * 504 * @return a task, or null if none or contended 505 */ 506 final ForkJoinTask<?> deqTask() { 507 ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i; 508 if (queueTop != (b = queueBase) && 509 (q = queue) != null && // must read q after b 510 (i = (q.length - 1) & b) >= 0 && 511 (t = q[i]) != null && queueBase == b && 512 UNSAFE.compareAndSwapObject(q, (i << ASHIFT) + ABASE, t, null)) { 513 queueBase = b + 1; 514 return t; 515 } 516 return null; 517 } 518 519 /** 520 * Tries to take a task from the base of own queue. Called only 521 * by this thread. 522 * 523 * @return a task, or null if none 524 */ 525 final ForkJoinTask<?> locallyDeqTask() { 526 ForkJoinTask<?> t; int m, b, i; 527 ForkJoinTask<?>[] q = queue; 528 if (q != null && (m = q.length - 1) >= 0) { 529 while (queueTop != (b = queueBase)) { 530 if ((t = q[i = m & b]) != null && 531 queueBase == b && 532 UNSAFE.compareAndSwapObject(q, (i << ASHIFT) + ABASE, 533 t, null)) { 534 queueBase = b + 1; 535 return t; 536 } 537 } 538 } 539 return null; 540 } 541 542 /** 543 * Returns a popped task, or null if empty. 544 * Called only by this thread. 545 */ 546 private ForkJoinTask<?> popTask() { 547 int m; 548 ForkJoinTask<?>[] q = queue; 549 if (q != null && (m = q.length - 1) >= 0) { 550 for (int s; (s = queueTop) != queueBase;) { 551 int i = m & --s; 552 long u = (i << ASHIFT) + ABASE; // raw offset 553 ForkJoinTask<?> t = q[i]; 554 if (t == null) // lost to stealer 555 break; 556 if (UNSAFE.compareAndSwapObject(q, u, t, null)) { 557 queueTop = s; // or putOrderedInt 558 return t; 559 } 560 } 561 } 562 return null; 563 } 564 565 /** 566 * Specialized version of popTask to pop only if topmost element 567 * is the given task. Called only by this thread. 568 * 569 * @param t the task. Caller must ensure non-null. 570 */ 571 final boolean unpushTask(ForkJoinTask<?> t) { 572 ForkJoinTask<?>[] q; 573 int s; 574 if ((q = queue) != null && (s = queueTop) != queueBase && 575 UNSAFE.compareAndSwapObject 576 (q, (((q.length - 1) & --s) << ASHIFT) + ABASE, t, null)) { 577 queueTop = s; // or putOrderedInt 578 return true; 579 } 580 return false; 581 } 582 583 /** 584 * Returns next task, or null if empty or contended. 585 */ 586 final ForkJoinTask<?> peekTask() { 587 int m; 588 ForkJoinTask<?>[] q = queue; 589 if (q == null || (m = q.length - 1) < 0) 590 return null; 591 int i = locallyFifo ? queueBase : (queueTop - 1); 592 return q[i & m]; 593 } 594 595 // Support methods for ForkJoinPool 596 597 /** 598 * Runs the given task, plus any local tasks until queue is empty 599 */ 600 final void execTask(ForkJoinTask<?> t) { 601 currentSteal = t; 602 for (;;) { 603 if (t != null) 604 t.doExec(); 605 if (queueTop == queueBase) 606 break; 607 t = locallyFifo ? locallyDeqTask() : popTask(); 608 } 609 ++stealCount; 610 currentSteal = null; 611 } 612 613 /** 614 * Removes and cancels all tasks in queue. Can be called from any 615 * thread. 616 */ 617 final void cancelTasks() { 618 ForkJoinTask<?> cj = currentJoin; // try to cancel ongoing tasks 619 if (cj != null && cj.status >= 0) 620 cj.cancelIgnoringExceptions(); 621 ForkJoinTask<?> cs = currentSteal; 622 if (cs != null && cs.status >= 0) 623 cs.cancelIgnoringExceptions(); 624 while (queueBase != queueTop) { 625 ForkJoinTask<?> t = deqTask(); 626 if (t != null) 627 t.cancelIgnoringExceptions(); 628 } 629 } 630 631 /** 632 * Drains tasks to given collection c. 633 * 634 * @return the number of tasks drained 635 */ 636 final int drainTasksTo(Collection<? super ForkJoinTask<?>> c) { 637 int n = 0; 638 while (queueBase != queueTop) { 639 ForkJoinTask<?> t = deqTask(); 640 if (t != null) { 641 c.add(t); 642 ++n; 643 } 644 } 645 return n; 646 } 647 648 // Support methods for ForkJoinTask 649 650 /** 651 * Returns an estimate of the number of tasks in the queue. 652 */ 653 final int getQueueSize() { 654 return queueTop - queueBase; 655 } 656 657 /** 658 * Gets and removes a local task. 659 * 660 * @return a task, if available 661 */ 662 final ForkJoinTask<?> pollLocalTask() { 663 return locallyFifo ? locallyDeqTask() : popTask(); 664 } 665 666 /** 667 * Gets and removes a local or stolen task. 668 * 669 * @return a task, if available 670 */ 671 final ForkJoinTask<?> pollTask() { 672 ForkJoinWorkerThread[] ws; 673 ForkJoinTask<?> t = pollLocalTask(); 674 if (t != null || (ws = pool.workers) == null) 675 return t; 676 int n = ws.length; // cheap version of FJP.scan 677 int steps = n << 1; 678 int r = nextSeed(); 679 int i = 0; 680 while (i < steps) { 681 ForkJoinWorkerThread w = ws[(i++ + r) & (n - 1)]; 682 if (w != null && w.queueBase != w.queueTop && w.queue != null) { 683 if ((t = w.deqTask()) != null) 684 return t; 685 i = 0; 686 } 687 } 688 return null; 689 } 690 691 /** 692 * The maximum stolen->joining link depth allowed in helpJoinTask, 693 * as well as the maximum number of retries (allowing on average 694 * one staleness retry per level) per attempt to instead try 695 * compensation. Depths for legitimate chains are unbounded, but 696 * we use a fixed constant to avoid (otherwise unchecked) cycles 697 * and bound staleness of traversal parameters at the expense of 698 * sometimes blocking when we could be helping. 699 */ 700 private static final int MAX_HELP = 16; 701 702 /** 703 * Possibly runs some tasks and/or blocks, until joinMe is done. 704 * 705 * @param joinMe the task to join 706 * @return completion status on exit 707 */ 708 final int joinTask(ForkJoinTask<?> joinMe) { 709 ForkJoinTask<?> prevJoin = currentJoin; 710 currentJoin = joinMe; 711 for (int s, retries = MAX_HELP;;) { 712 if ((s = joinMe.status) < 0) { 713 currentJoin = prevJoin; 714 return s; 715 } 716 if (retries > 0) { 717 if (queueTop != queueBase) { 718 if (!localHelpJoinTask(joinMe)) 719 retries = 0; // cannot help 720 } 721 else if (retries == MAX_HELP >>> 1) { 722 --retries; // check uncommon case 723 if (tryDeqAndExec(joinMe) >= 0) 724 Thread.yield(); // for politeness 725 } 726 else 727 retries = helpJoinTask(joinMe) ? MAX_HELP : retries - 1; 728 } 729 else { 730 retries = MAX_HELP; // restart if not done 731 pool.tryAwaitJoin(joinMe); 732 } 733 } 734 } 735 736 /** 737 * If present, pops and executes the given task, or any other 738 * cancelled task 739 * 740 * @return false if any other non-cancelled task exists in local queue 741 */ 742 private boolean localHelpJoinTask(ForkJoinTask<?> joinMe) { 743 int s, i; ForkJoinTask<?>[] q; ForkJoinTask<?> t; 744 if ((s = queueTop) != queueBase && (q = queue) != null && 745 (i = (q.length - 1) & --s) >= 0 && 746 (t = q[i]) != null) { 747 if (t != joinMe && t.status >= 0) 748 return false; 749 if (UNSAFE.compareAndSwapObject 750 (q, (i << ASHIFT) + ABASE, t, null)) { 751 queueTop = s; // or putOrderedInt 752 t.doExec(); 753 } 754 } 755 return true; 756 } 757 758 /** 759 * Tries to locate and execute tasks for a stealer of the given 760 * task, or in turn one of its stealers, Traces 761 * currentSteal->currentJoin links looking for a thread working on 762 * a descendant of the given task and with a non-empty queue to 763 * steal back and execute tasks from. The implementation is very 764 * branchy to cope with potential inconsistencies or loops 765 * encountering chains that are stale, unknown, or of length 766 * greater than MAX_HELP links. All of these cases are dealt with 767 * by just retrying by caller. 768 * 769 * @param joinMe the task to join 770 * @param canSteal true if local queue is empty 771 * @return true if ran a task 772 */ 773 private boolean helpJoinTask(ForkJoinTask<?> joinMe) { 774 boolean helped = false; 775 int m = pool.scanGuard & SMASK; 776 ForkJoinWorkerThread[] ws = pool.workers; 777 if (ws != null && ws.length > m && joinMe.status >= 0) { 778 int levels = MAX_HELP; // remaining chain length 779 ForkJoinTask<?> task = joinMe; // base of chain 780 outer:for (ForkJoinWorkerThread thread = this;;) { 781 // Try to find v, the stealer of task, by first using hint 782 ForkJoinWorkerThread v = ws[thread.stealHint & m]; 783 if (v == null || v.currentSteal != task) { 784 for (int j = 0; ;) { // search array 785 if ((v = ws[j]) != null && v.currentSteal == task) { 786 thread.stealHint = j; 787 break; // save hint for next time 788 } 789 if (++j > m) 790 break outer; // can't find stealer 791 } 792 } 793 // Try to help v, using specialized form of deqTask 794 for (;;) { 795 ForkJoinTask<?>[] q; int b, i; 796 if (joinMe.status < 0) 797 break outer; 798 if ((b = v.queueBase) == v.queueTop || 799 (q = v.queue) == null || 800 (i = (q.length-1) & b) < 0) 801 break; // empty 802 long u = (i << ASHIFT) + ABASE; 803 ForkJoinTask<?> t = q[i]; 804 if (task.status < 0) 805 break outer; // stale 806 if (t != null && v.queueBase == b && 807 UNSAFE.compareAndSwapObject(q, u, t, null)) { 808 v.queueBase = b + 1; 809 v.stealHint = poolIndex; 810 ForkJoinTask<?> ps = currentSteal; 811 currentSteal = t; 812 t.doExec(); 813 currentSteal = ps; 814 helped = true; 815 } 816 } 817 // Try to descend to find v's stealer 818 ForkJoinTask<?> next = v.currentJoin; 819 if (--levels > 0 && task.status >= 0 && 820 next != null && next != task) { 821 task = next; 822 thread = v; 823 } 824 else 825 break; // max levels, stale, dead-end, or cyclic 826 } 827 } 828 return helped; 829 } 830 831 /** 832 * Performs an uncommon case for joinTask: If task t is at base of 833 * some workers queue, steals and executes it. 834 * 835 * @param t the task 836 * @return t's status 837 */ 838 private int tryDeqAndExec(ForkJoinTask<?> t) { 839 int m = pool.scanGuard & SMASK; 840 ForkJoinWorkerThread[] ws = pool.workers; 841 if (ws != null && ws.length > m && t.status >= 0) { 842 for (int j = 0; j <= m; ++j) { 843 ForkJoinTask<?>[] q; int b, i; 844 ForkJoinWorkerThread v = ws[j]; 845 if (v != null && 846 (b = v.queueBase) != v.queueTop && 847 (q = v.queue) != null && 848 (i = (q.length - 1) & b) >= 0 && 849 q[i] == t) { 850 long u = (i << ASHIFT) + ABASE; 851 if (v.queueBase == b && 852 UNSAFE.compareAndSwapObject(q, u, t, null)) { 853 v.queueBase = b + 1; 854 v.stealHint = poolIndex; 855 ForkJoinTask<?> ps = currentSteal; 856 currentSteal = t; 857 t.doExec(); 858 currentSteal = ps; 859 } 860 break; 861 } 862 } 863 } 864 return t.status; 865 } 866 867 /** 868 * Implements ForkJoinTask.getSurplusQueuedTaskCount(). Returns 869 * an estimate of the number of tasks, offset by a function of 870 * number of idle workers. 871 * 872 * This method provides a cheap heuristic guide for task 873 * partitioning when programmers, frameworks, tools, or languages 874 * have little or no idea about task granularity. In essence by 875 * offering this method, we ask users only about tradeoffs in 876 * overhead vs expected throughput and its variance, rather than 877 * how finely to partition tasks. 878 * 879 * In a steady state strict (tree-structured) computation, each 880 * thread makes available for stealing enough tasks for other 881 * threads to remain active. Inductively, if all threads play by 882 * the same rules, each thread should make available only a 883 * constant number of tasks. 884 * 885 * The minimum useful constant is just 1. But using a value of 1 886 * would require immediate replenishment upon each steal to 887 * maintain enough tasks, which is infeasible. Further, 888 * partitionings/granularities of offered tasks should minimize 889 * steal rates, which in general means that threads nearer the top 890 * of computation tree should generate more than those nearer the 891 * bottom. In perfect steady state, each thread is at 892 * approximately the same level of computation tree. However, 893 * producing extra tasks amortizes the uncertainty of progress and 894 * diffusion assumptions. 895 * 896 * So, users will want to use values larger, but not much larger 897 * than 1 to both smooth over transient shortages and hedge 898 * against uneven progress; as traded off against the cost of 899 * extra task overhead. We leave the user to pick a threshold 900 * value to compare with the results of this call to guide 901 * decisions, but recommend values such as 3. 902 * 903 * When all threads are active, it is on average OK to estimate 904 * surplus strictly locally. In steady-state, if one thread is 905 * maintaining say 2 surplus tasks, then so are others. So we can 906 * just use estimated queue length (although note that (queueTop - 907 * queueBase) can be an overestimate because of stealers lagging 908 * increments of queueBase). However, this strategy alone leads 909 * to serious mis-estimates in some non-steady-state conditions 910 * (ramp-up, ramp-down, other stalls). We can detect many of these 911 * by further considering the number of "idle" threads, that are 912 * known to have zero queued tasks, so compensate by a factor of 913 * (#idle/#active) threads. 914 */ 915 final int getEstimatedSurplusTaskCount() { 916 return queueTop - queueBase - pool.idlePerActive(); 917 } 918 919 /** 920 * Runs tasks until {@code pool.isQuiescent()}. We piggyback on 921 * pool's active count ctl maintenance, but rather than blocking 922 * when tasks cannot be found, we rescan until all others cannot 923 * find tasks either. The bracketing by pool quiescerCounts 924 * updates suppresses pool auto-shutdown mechanics that could 925 * otherwise prematurely terminate the pool because all threads 926 * appear to be inactive. 927 */ 928 final void helpQuiescePool() { 929 boolean active = true; 930 ForkJoinTask<?> ps = currentSteal; // to restore below 931 ForkJoinPool p = pool; 932 p.addQuiescerCount(1); 933 for (;;) { 934 ForkJoinWorkerThread[] ws = p.workers; 935 ForkJoinWorkerThread v = null; 936 int n; 937 if (queueTop != queueBase) 938 v = this; 939 else if (ws != null && (n = ws.length) > 1) { 940 ForkJoinWorkerThread w; 941 int r = nextSeed(); // cheap version of FJP.scan 942 int steps = n << 1; 943 for (int i = 0; i < steps; ++i) { 944 if ((w = ws[(i + r) & (n - 1)]) != null && 945 w.queueBase != w.queueTop) { 946 v = w; 947 break; 948 } 949 } 950 } 951 if (v != null) { 952 ForkJoinTask<?> t; 953 if (!active) { 954 active = true; 955 p.addActiveCount(1); 956 } 957 if ((t = (v != this) ? v.deqTask() : 958 locallyFifo ? locallyDeqTask() : popTask()) != null) { 959 currentSteal = t; 960 t.doExec(); 961 currentSteal = ps; 962 } 963 } 964 else { 965 if (active) { 966 active = false; 967 p.addActiveCount(-1); 968 } 969 if (p.isQuiescent()) { 970 p.addActiveCount(1); 971 p.addQuiescerCount(-1); 972 break; 973 } 974 } 975 } 976 } 977 978 // Unsafe mechanics 979 private static final sun.misc.Unsafe UNSAFE; 980 private static final long ABASE; 981 private static final int ASHIFT; 982 983 static { 984 int s; 985 try { 986 UNSAFE = sun.misc.Unsafe.getUnsafe(); 987 Class a = ForkJoinTask[].class; 988 ABASE = UNSAFE.arrayBaseOffset(a); 989 s = UNSAFE.arrayIndexScale(a); 990 } catch (Exception e) { 991 throw new Error(e); 992 } 993 if ((s & (s-1)) != 0) 994 throw new Error("data type scale not a power of two"); 995 ASHIFT = 31 - Integer.numberOfLeadingZeros(s); 996 } 997 998 }