1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/licenses/publicdomain 34 */ 35 36 package java.util.concurrent; 37 38 import java.util.Random; 39 import java.util.Collection; 40 import java.util.concurrent.locks.LockSupport; 41 import java.util.concurrent.RejectedExecutionException; 42 43 /** 44 * A thread managed by a {@link ForkJoinPool}, which executes 45 * {@link ForkJoinTask}s. 46 * This class is subclassable solely for the sake of adding 47 * functionality -- there are no overridable methods dealing with 48 * scheduling or execution. However, you can override initialization 49 * and termination methods surrounding the main task processing loop. 50 * If you do create such a subclass, you will also need to supply a 51 * custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to use it 52 * in a {@code ForkJoinPool}. 53 * 54 * @since 1.7 55 * @author Doug Lea 56 */ 57 public class ForkJoinWorkerThread extends Thread { 58 /* 59 * Overview: 60 * 61 * ForkJoinWorkerThreads are managed by ForkJoinPools and perform 62 * ForkJoinTasks. This class includes bookkeeping in support of 63 * worker activation, suspension, and lifecycle control described 64 * in more detail in the internal documentation of class 65 * ForkJoinPool. And as described further below, this class also 66 * includes special-cased support for some ForkJoinTask 67 * methods. But the main mechanics involve work-stealing: 68 * 69 * Work-stealing queues are special forms of Deques that support 70 * only three of the four possible end-operations -- push, pop, 71 * and deq (aka steal), under the further constraints that push 72 * and pop are called only from the owning thread, while deq may 73 * be called from other threads. (If you are unfamiliar with 74 * them, you probably want to read Herlihy and Shavit's book "The 75 * Art of Multiprocessor programming", chapter 16 describing these 76 * in more detail before proceeding.) The main work-stealing 77 * queue design is roughly similar to those in the papers "Dynamic 78 * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005 79 * (http://research.sun.com/scalable/pubs/index.html) and 80 * "Idempotent work stealing" by Michael, Saraswat, and Vechev, 81 * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186). 82 * The main differences ultimately stem from gc requirements that 83 * we null out taken slots as soon as we can, to maintain as small 84 * a footprint as possible even in programs generating huge 85 * numbers of tasks. To accomplish this, we shift the CAS 86 * arbitrating pop vs deq (steal) from being on the indices 87 * ("base" and "sp") to the slots themselves (mainly via method 88 * "casSlotNull()"). So, both a successful pop and deq mainly 89 * entail a CAS of a slot from non-null to null. Because we rely 90 * on CASes of references, we do not need tag bits on base or sp. 91 * They are simple ints as used in any circular array-based queue 92 * (see for example ArrayDeque). Updates to the indices must 93 * still be ordered in a way that guarantees that sp == base means 94 * the queue is empty, but otherwise may err on the side of 95 * possibly making the queue appear nonempty when a push, pop, or 96 * deq have not fully committed. Note that this means that the deq 97 * operation, considered individually, is not wait-free. One thief 98 * cannot successfully continue until another in-progress one (or, 99 * if previously empty, a push) completes. However, in the 100 * aggregate, we ensure at least probabilistic non-blockingness. 101 * If an attempted steal fails, a thief always chooses a different 102 * random victim target to try next. So, in order for one thief to 103 * progress, it suffices for any in-progress deq or new push on 104 * any empty queue to complete. One reason this works well here is 105 * that apparently-nonempty often means soon-to-be-stealable, 106 * which gives threads a chance to set activation status if 107 * necessary before stealing. 108 * 109 * This approach also enables support for "async mode" where local 110 * task processing is in FIFO, not LIFO order; simply by using a 111 * version of deq rather than pop when locallyFifo is true (as set 112 * by the ForkJoinPool). This allows use in message-passing 113 * frameworks in which tasks are never joined. 114 * 115 * When a worker would otherwise be blocked waiting to join a 116 * task, it first tries a form of linear helping: Each worker 117 * records (in field currentSteal) the most recent task it stole 118 * from some other worker. Plus, it records (in field currentJoin) 119 * the task it is currently actively joining. Method joinTask uses 120 * these markers to try to find a worker to help (i.e., steal back 121 * a task from and execute it) that could hasten completion of the 122 * actively joined task. In essence, the joiner executes a task 123 * that would be on its own local deque had the to-be-joined task 124 * not been stolen. This may be seen as a conservative variant of 125 * the approach in Wagner & Calder "Leapfrogging: a portable 126 * technique for implementing efficient futures" SIGPLAN Notices, 127 * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs 128 * in that: (1) We only maintain dependency links across workers 129 * upon steals, rather than use per-task bookkeeping. This may 130 * require a linear scan of workers array to locate stealers, but 131 * usually doesn't because stealers leave hints (that may become 132 * stale/wrong) of where to locate them. This isolates cost to 133 * when it is needed, rather than adding to per-task overhead. 134 * (2) It is "shallow", ignoring nesting and potentially cyclic 135 * mutual steals. (3) It is intentionally racy: field currentJoin 136 * is updated only while actively joining, which means that we 137 * miss links in the chain during long-lived tasks, GC stalls etc 138 * (which is OK since blocking in such cases is usually a good 139 * idea). (4) We bound the number of attempts to find work (see 140 * MAX_HELP_DEPTH) and fall back to suspending the worker and if 141 * necessary replacing it with a spare (see 142 * ForkJoinPool.awaitJoin). 143 * 144 * Efficient implementation of these algorithms currently relies 145 * on an uncomfortable amount of "Unsafe" mechanics. To maintain 146 * correct orderings, reads and writes of variable base require 147 * volatile ordering. Variable sp does not require volatile 148 * writes but still needs store-ordering, which we accomplish by 149 * pre-incrementing sp before filling the slot with an ordered 150 * store. (Pre-incrementing also enables backouts used in 151 * joinTask.) Because they are protected by volatile base reads, 152 * reads of the queue array and its slots by other threads do not 153 * need volatile load semantics, but writes (in push) require 154 * store order and CASes (in pop and deq) require (volatile) CAS 155 * semantics. (Michael, Saraswat, and Vechev's algorithm has 156 * similar properties, but without support for nulling slots.) 157 * Since these combinations aren't supported using ordinary 158 * volatiles, the only way to accomplish these efficiently is to 159 * use direct Unsafe calls. (Using external AtomicIntegers and 160 * AtomicReferenceArrays for the indices and array is 161 * significantly slower because of memory locality and indirection 162 * effects.) 163 * 164 * Further, performance on most platforms is very sensitive to 165 * placement and sizing of the (resizable) queue array. Even 166 * though these queues don't usually become all that big, the 167 * initial size must be large enough to counteract cache 168 * contention effects across multiple queues (especially in the 169 * presence of GC cardmarking). Also, to improve thread-locality, 170 * queues are initialized after starting. All together, these 171 * low-level implementation choices produce as much as a factor of 172 * 4 performance improvement compared to naive implementations, 173 * and enable the processing of billions of tasks per second, 174 * sometimes at the expense of ugliness. 175 */ 176 177 /** 178 * Generator for initial random seeds for random victim 179 * selection. This is used only to create initial seeds. Random 180 * steals use a cheaper xorshift generator per steal attempt. We 181 * expect only rare contention on seedGenerator, so just use a 182 * plain Random. 183 */ 184 private static final Random seedGenerator = new Random(); 185 186 /** 187 * The maximum stolen->joining link depth allowed in helpJoinTask. 188 * Depths for legitimate chains are unbounded, but we use a fixed 189 * constant to avoid (otherwise unchecked) cycles and bound 190 * staleness of traversal parameters at the expense of sometimes 191 * blocking when we could be helping. 192 */ 193 private static final int MAX_HELP_DEPTH = 8; 194 195 /** 196 * Capacity of work-stealing queue array upon initialization. 197 * Must be a power of two. Initial size must be at least 4, but is 198 * padded to minimize cache effects. 199 */ 200 private static final int INITIAL_QUEUE_CAPACITY = 1 << 13; 201 202 /** 203 * Maximum work-stealing queue array size. Must be less than or 204 * equal to 1 << (31 - width of array entry) to ensure lack of 205 * index wraparound. The value is set in the static block 206 * at the end of this file after obtaining width. 207 */ 208 private static final int MAXIMUM_QUEUE_CAPACITY; 209 210 /** 211 * The pool this thread works in. Accessed directly by ForkJoinTask. 212 */ 213 final ForkJoinPool pool; 214 215 /** 216 * The work-stealing queue array. Size must be a power of two. 217 * Initialized in onStart, to improve memory locality. 218 */ 219 private ForkJoinTask<?>[] queue; 220 221 /** 222 * Index (mod queue.length) of least valid queue slot, which is 223 * always the next position to steal from if nonempty. 224 */ 225 private volatile int base; 226 227 /** 228 * Index (mod queue.length) of next queue slot to push to or pop 229 * from. It is written only by owner thread, and accessed by other 230 * threads only after reading (volatile) base. Both sp and base 231 * are allowed to wrap around on overflow, but (sp - base) still 232 * estimates size. 233 */ 234 private int sp; 235 236 /** 237 * The index of most recent stealer, used as a hint to avoid 238 * traversal in method helpJoinTask. This is only a hint because a 239 * worker might have had multiple steals and this only holds one 240 * of them (usually the most current). Declared non-volatile, 241 * relying on other prevailing sync to keep reasonably current. 242 */ 243 private int stealHint; 244 245 /** 246 * Run state of this worker. In addition to the usual run levels, 247 * tracks if this worker is suspended as a spare, and if it was 248 * killed (trimmed) while suspended. However, "active" status is 249 * maintained separately and modified only in conjunction with 250 * CASes of the pool's runState (which are currently sadly 251 * manually inlined for performance.) Accessed directly by pool 252 * to simplify checks for normal (zero) status. 253 */ 254 volatile int runState; 255 256 private static final int TERMINATING = 0x01; 257 private static final int TERMINATED = 0x02; 258 private static final int SUSPENDED = 0x04; // inactive spare 259 private static final int TRIMMED = 0x08; // killed while suspended 260 261 /** 262 * Number of steals. Directly accessed (and reset) by 263 * pool.tryAccumulateStealCount when idle. 264 */ 265 int stealCount; 266 267 /** 268 * Seed for random number generator for choosing steal victims. 269 * Uses Marsaglia xorshift. Must be initialized as nonzero. 270 */ 271 private int seed; 272 273 /** 274 * Activity status. When true, this worker is considered active. 275 * Accessed directly by pool. Must be false upon construction. 276 */ 277 boolean active; 278 279 /** 280 * True if use local fifo, not default lifo, for local polling. 281 * Shadows value from ForkJoinPool. 282 */ 283 private final boolean locallyFifo; 284 285 /** 286 * Index of this worker in pool array. Set once by pool before 287 * running, and accessed directly by pool to locate this worker in 288 * its workers array. 289 */ 290 int poolIndex; 291 292 /** 293 * The last pool event waited for. Accessed only by pool in 294 * callback methods invoked within this thread. 295 */ 296 int lastEventCount; 297 298 /** 299 * Encoded index and event count of next event waiter. Accessed 300 * only by ForkJoinPool for managing event waiters. 301 */ 302 volatile long nextWaiter; 303 304 /** 305 * Number of times this thread suspended as spare. Accessed only 306 * by pool. 307 */ 308 int spareCount; 309 310 /** 311 * Encoded index and count of next spare waiter. Accessed only 312 * by ForkJoinPool for managing spares. 313 */ 314 volatile int nextSpare; 315 316 /** 317 * The task currently being joined, set only when actively trying 318 * to help other stealers in helpJoinTask. Written only by this 319 * thread, but read by others. 320 */ 321 private volatile ForkJoinTask<?> currentJoin; 322 323 /** 324 * The task most recently stolen from another worker (or 325 * submission queue). Written only by this thread, but read by 326 * others. 327 */ 328 private volatile ForkJoinTask<?> currentSteal; 329 330 /** 331 * Creates a ForkJoinWorkerThread operating in the given pool. 332 * 333 * @param pool the pool this thread works in 334 * @throws NullPointerException if pool is null 335 */ 336 protected ForkJoinWorkerThread(ForkJoinPool pool) { 337 this.pool = pool; 338 this.locallyFifo = pool.locallyFifo; 339 setDaemon(true); 340 // To avoid exposing construction details to subclasses, 341 // remaining initialization is in start() and onStart() 342 } 343 344 /** 345 * Performs additional initialization and starts this thread. 346 */ 347 final void start(int poolIndex, UncaughtExceptionHandler ueh) { 348 this.poolIndex = poolIndex; 349 if (ueh != null) 350 setUncaughtExceptionHandler(ueh); 351 start(); 352 } 353 354 // Public/protected methods 355 356 /** 357 * Returns the pool hosting this thread. 358 * 359 * @return the pool 360 */ 361 public ForkJoinPool getPool() { 362 return pool; 363 } 364 365 /** 366 * Returns the index number of this thread in its pool. The 367 * returned value ranges from zero to the maximum number of 368 * threads (minus one) that have ever been created in the pool. 369 * This method may be useful for applications that track status or 370 * collect results per-worker rather than per-task. 371 * 372 * @return the index number 373 */ 374 public int getPoolIndex() { 375 return poolIndex; 376 } 377 378 /** 379 * Initializes internal state after construction but before 380 * processing any tasks. If you override this method, you must 381 * invoke {@code super.onStart()} at the beginning of the method. 382 * Initialization requires care: Most fields must have legal 383 * default values, to ensure that attempted accesses from other 384 * threads work correctly even before this thread starts 385 * processing tasks. 386 */ 387 protected void onStart() { 388 int rs = seedGenerator.nextInt(); 389 seed = (rs == 0) ? 1 : rs; // seed must be nonzero 390 391 // Allocate name string and arrays in this thread 392 String pid = Integer.toString(pool.getPoolNumber()); 393 String wid = Integer.toString(poolIndex); 394 setName("ForkJoinPool-" + pid + "-worker-" + wid); 395 396 queue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY]; 397 } 398 399 /** 400 * Performs cleanup associated with termination of this worker 401 * thread. If you override this method, you must invoke 402 * {@code super.onTermination} at the end of the overridden method. 403 * 404 * @param exception the exception causing this thread to abort due 405 * to an unrecoverable error, or {@code null} if completed normally 406 */ 407 protected void onTermination(Throwable exception) { 408 try { 409 ForkJoinPool p = pool; 410 if (active) { 411 int a; // inline p.tryDecrementActiveCount 412 active = false; 413 do {} while (!UNSAFE.compareAndSwapInt 414 (p, poolRunStateOffset, a = p.runState, a - 1)); 415 } 416 cancelTasks(); 417 setTerminated(); 418 p.workerTerminated(this); 419 } catch (Throwable ex) { // Shouldn't ever happen 420 if (exception == null) // but if so, at least rethrown 421 exception = ex; 422 } finally { 423 if (exception != null) 424 UNSAFE.throwException(exception); 425 } 426 } 427 428 /** 429 * This method is required to be public, but should never be 430 * called explicitly. It performs the main run loop to execute 431 * {@link ForkJoinTask}s. 432 */ 433 public void run() { 434 Throwable exception = null; 435 try { 436 onStart(); 437 mainLoop(); 438 } catch (Throwable ex) { 439 exception = ex; 440 } finally { 441 onTermination(exception); 442 } 443 } 444 445 // helpers for run() 446 447 /** 448 * Finds and executes tasks, and checks status while running. 449 */ 450 private void mainLoop() { 451 boolean ran = false; // true if ran a task on last step 452 ForkJoinPool p = pool; 453 for (;;) { 454 p.preStep(this, ran); 455 if (runState != 0) 456 break; 457 ran = tryExecSteal() || tryExecSubmission(); 458 } 459 } 460 461 /** 462 * Tries to steal a task and execute it. 463 * 464 * @return true if ran a task 465 */ 466 private boolean tryExecSteal() { 467 ForkJoinTask<?> t; 468 if ((t = scan()) != null) { 469 t.quietlyExec(); 470 UNSAFE.putOrderedObject(this, currentStealOffset, null); 471 if (sp != base) 472 execLocalTasks(); 473 return true; 474 } 475 return false; 476 } 477 478 /** 479 * If a submission exists, try to activate and run it. 480 * 481 * @return true if ran a task 482 */ 483 private boolean tryExecSubmission() { 484 ForkJoinPool p = pool; 485 // This loop is needed in case attempt to activate fails, in 486 // which case we only retry if there still appears to be a 487 // submission. 488 while (p.hasQueuedSubmissions()) { 489 ForkJoinTask<?> t; int a; 490 if (active || // inline p.tryIncrementActiveCount 491 (active = UNSAFE.compareAndSwapInt(p, poolRunStateOffset, 492 a = p.runState, a + 1))) { 493 if ((t = p.pollSubmission()) != null) { 494 UNSAFE.putOrderedObject(this, currentStealOffset, t); 495 t.quietlyExec(); 496 UNSAFE.putOrderedObject(this, currentStealOffset, null); 497 if (sp != base) 498 execLocalTasks(); 499 return true; 500 } 501 } 502 } 503 return false; 504 } 505 506 /** 507 * Runs local tasks until queue is empty or shut down. Call only 508 * while active. 509 */ 510 private void execLocalTasks() { 511 while (runState == 0) { 512 ForkJoinTask<?> t = locallyFifo ? locallyDeqTask() : popTask(); 513 if (t != null) 514 t.quietlyExec(); 515 else if (sp == base) 516 break; 517 } 518 } 519 520 /* 521 * Intrinsics-based atomic writes for queue slots. These are 522 * basically the same as methods in AtomicReferenceArray, but 523 * specialized for (1) ForkJoinTask elements (2) requirement that 524 * nullness and bounds checks have already been performed by 525 * callers and (3) effective offsets are known not to overflow 526 * from int to long (because of MAXIMUM_QUEUE_CAPACITY). We don't 527 * need corresponding version for reads: plain array reads are OK 528 * because they are protected by other volatile reads and are 529 * confirmed by CASes. 530 * 531 * Most uses don't actually call these methods, but instead contain 532 * inlined forms that enable more predictable optimization. We 533 * don't define the version of write used in pushTask at all, but 534 * instead inline there a store-fenced array slot write. 535 */ 536 537 /** 538 * CASes slot i of array q from t to null. Caller must ensure q is 539 * non-null and index is in range. 540 */ 541 private static final boolean casSlotNull(ForkJoinTask<?>[] q, int i, 542 ForkJoinTask<?> t) { 543 return UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, t, null); 544 } 545 546 /** 547 * Performs a volatile write of the given task at given slot of 548 * array q. Caller must ensure q is non-null and index is in 549 * range. This method is used only during resets and backouts. 550 */ 551 private static final void writeSlot(ForkJoinTask<?>[] q, int i, 552 ForkJoinTask<?> t) { 553 UNSAFE.putObjectVolatile(q, (i << qShift) + qBase, t); 554 } 555 556 // queue methods 557 558 /** 559 * Pushes a task. Call only from this thread. 560 * 561 * @param t the task. Caller must ensure non-null. 562 */ 563 final void pushTask(ForkJoinTask<?> t) { 564 ForkJoinTask<?>[] q = queue; 565 int mask = q.length - 1; // implicit assert q != null 566 int s = sp++; // ok to increment sp before slot write 567 UNSAFE.putOrderedObject(q, ((s & mask) << qShift) + qBase, t); 568 if ((s -= base) == 0) 569 pool.signalWork(); // was empty 570 else if (s == mask) 571 growQueue(); // is full 572 } 573 574 /** 575 * Tries to take a task from the base of the queue, failing if 576 * empty or contended. Note: Specializations of this code appear 577 * in locallyDeqTask and elsewhere. 578 * 579 * @return a task, or null if none or contended 580 */ 581 final ForkJoinTask<?> deqTask() { 582 ForkJoinTask<?> t; 583 ForkJoinTask<?>[] q; 584 int b, i; 585 if (sp != (b = base) && 586 (q = queue) != null && // must read q after b 587 (t = q[i = (q.length - 1) & b]) != null && base == b && 588 UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, t, null)) { 589 base = b + 1; 590 return t; 591 } 592 return null; 593 } 594 595 /** 596 * Tries to take a task from the base of own queue. Assumes active 597 * status. Called only by this thread. 598 * 599 * @return a task, or null if none 600 */ 601 final ForkJoinTask<?> locallyDeqTask() { 602 ForkJoinTask<?>[] q = queue; 603 if (q != null) { 604 ForkJoinTask<?> t; 605 int b, i; 606 while (sp != (b = base)) { 607 if ((t = q[i = (q.length - 1) & b]) != null && base == b && 608 UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, 609 t, null)) { 610 base = b + 1; 611 return t; 612 } 613 } 614 } 615 return null; 616 } 617 618 /** 619 * Returns a popped task, or null if empty. Assumes active status. 620 * Called only by this thread. 621 */ 622 private ForkJoinTask<?> popTask() { 623 ForkJoinTask<?>[] q = queue; 624 if (q != null) { 625 int s; 626 while ((s = sp) != base) { 627 int i = (q.length - 1) & --s; 628 long u = (i << qShift) + qBase; // raw offset 629 ForkJoinTask<?> t = q[i]; 630 if (t == null) // lost to stealer 631 break; 632 if (UNSAFE.compareAndSwapObject(q, u, t, null)) { 633 /* 634 * Note: here and in related methods, as a 635 * performance (not correctness) issue, we'd like 636 * to encourage compiler not to arbitrarily 637 * postpone setting sp after successful CAS. 638 * Currently there is no intrinsic for arranging 639 * this, but using Unsafe putOrderedInt may be a 640 * preferable strategy on some compilers even 641 * though its main effect is a pre-, not post- 642 * fence. To simplify possible changes, the option 643 * is left in comments next to the associated 644 * assignments. 645 */ 646 sp = s; // putOrderedInt may encourage more timely write 647 // UNSAFE.putOrderedInt(this, spOffset, s); 648 return t; 649 } 650 } 651 } 652 return null; 653 } 654 655 /** 656 * Specialized version of popTask to pop only if topmost element 657 * is the given task. Called only by this thread while active. 658 * 659 * @param t the task. Caller must ensure non-null. 660 */ 661 final boolean unpushTask(ForkJoinTask<?> t) { 662 int s; 663 ForkJoinTask<?>[] q = queue; 664 if ((s = sp) != base && q != null && 665 UNSAFE.compareAndSwapObject 666 (q, (((q.length - 1) & --s) << qShift) + qBase, t, null)) { 667 sp = s; // putOrderedInt may encourage more timely write 668 // UNSAFE.putOrderedInt(this, spOffset, s); 669 return true; 670 } 671 return false; 672 } 673 674 /** 675 * Returns next task, or null if empty or contended. 676 */ 677 final ForkJoinTask<?> peekTask() { 678 ForkJoinTask<?>[] q = queue; 679 if (q == null) 680 return null; 681 int mask = q.length - 1; 682 int i = locallyFifo ? base : (sp - 1); 683 return q[i & mask]; 684 } 685 686 /** 687 * Doubles queue array size. Transfers elements by emulating 688 * steals (deqs) from old array and placing, oldest first, into 689 * new array. 690 */ 691 private void growQueue() { 692 ForkJoinTask<?>[] oldQ = queue; 693 int oldSize = oldQ.length; 694 int newSize = oldSize << 1; 695 if (newSize > MAXIMUM_QUEUE_CAPACITY) 696 throw new RejectedExecutionException("Queue capacity exceeded"); 697 ForkJoinTask<?>[] newQ = queue = new ForkJoinTask<?>[newSize]; 698 699 int b = base; 700 int bf = b + oldSize; 701 int oldMask = oldSize - 1; 702 int newMask = newSize - 1; 703 do { 704 int oldIndex = b & oldMask; 705 ForkJoinTask<?> t = oldQ[oldIndex]; 706 if (t != null && !casSlotNull(oldQ, oldIndex, t)) 707 t = null; 708 writeSlot(newQ, b & newMask, t); 709 } while (++b != bf); 710 pool.signalWork(); 711 } 712 713 /** 714 * Computes next value for random victim probe in scan(). Scans 715 * don't require a very high quality generator, but also not a 716 * crummy one. Marsaglia xor-shift is cheap and works well enough. 717 * Note: This is manually inlined in scan(). 718 */ 719 private static final int xorShift(int r) { 720 r ^= r << 13; 721 r ^= r >>> 17; 722 return r ^ (r << 5); 723 } 724 725 /** 726 * Tries to steal a task from another worker. Starts at a random 727 * index of workers array, and probes workers until finding one 728 * with non-empty queue or finding that all are empty. It 729 * randomly selects the first n probes. If these are empty, it 730 * resorts to a circular sweep, which is necessary to accurately 731 * set active status. (The circular sweep uses steps of 732 * approximately half the array size plus 1, to avoid bias 733 * stemming from leftmost packing of the array in ForkJoinPool.) 734 * 735 * This method must be both fast and quiet -- usually avoiding 736 * memory accesses that could disrupt cache sharing etc other than 737 * those needed to check for and take tasks (or to activate if not 738 * already active). This accounts for, among other things, 739 * updating random seed in place without storing it until exit. 740 * 741 * @return a task, or null if none found 742 */ 743 private ForkJoinTask<?> scan() { 744 ForkJoinPool p = pool; 745 ForkJoinWorkerThread[] ws; // worker array 746 int n; // upper bound of #workers 747 if ((ws = p.workers) != null && (n = ws.length) > 1) { 748 boolean canSteal = active; // shadow active status 749 int r = seed; // extract seed once 750 int mask = n - 1; 751 int j = -n; // loop counter 752 int k = r; // worker index, random if j < 0 753 for (;;) { 754 ForkJoinWorkerThread v = ws[k & mask]; 755 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // inline xorshift 756 ForkJoinTask<?>[] q; ForkJoinTask<?> t; int b, a; 757 if (v != null && (b = v.base) != v.sp && 758 (q = v.queue) != null) { 759 int i = (q.length - 1) & b; 760 long u = (i << qShift) + qBase; // raw offset 761 int pid = poolIndex; 762 if ((t = q[i]) != null) { 763 if (!canSteal && // inline p.tryIncrementActiveCount 764 UNSAFE.compareAndSwapInt(p, poolRunStateOffset, 765 a = p.runState, a + 1)) 766 canSteal = active = true; 767 if (canSteal && v.base == b++ && 768 UNSAFE.compareAndSwapObject(q, u, t, null)) { 769 v.base = b; 770 v.stealHint = pid; 771 UNSAFE.putOrderedObject(this, 772 currentStealOffset, t); 773 seed = r; 774 ++stealCount; 775 return t; 776 } 777 } 778 j = -n; 779 k = r; // restart on contention 780 } 781 else if (++j <= 0) 782 k = r; 783 else if (j <= n) 784 k += (n >>> 1) | 1; 785 else 786 break; 787 } 788 } 789 return null; 790 } 791 792 // Run State management 793 794 // status check methods used mainly by ForkJoinPool 795 final boolean isRunning() { return runState == 0; } 796 final boolean isTerminated() { return (runState & TERMINATED) != 0; } 797 final boolean isSuspended() { return (runState & SUSPENDED) != 0; } 798 final boolean isTrimmed() { return (runState & TRIMMED) != 0; } 799 800 final boolean isTerminating() { 801 if ((runState & TERMINATING) != 0) 802 return true; 803 if (pool.isAtLeastTerminating()) { // propagate pool state 804 shutdown(); 805 return true; 806 } 807 return false; 808 } 809 810 /** 811 * Sets state to TERMINATING. Does NOT unpark or interrupt 812 * to wake up if currently blocked. Callers must do so if desired. 813 */ 814 final void shutdown() { 815 for (;;) { 816 int s = runState; 817 if ((s & (TERMINATING|TERMINATED)) != 0) 818 break; 819 if ((s & SUSPENDED) != 0) { // kill and wakeup if suspended 820 if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, 821 (s & ~SUSPENDED) | 822 (TRIMMED|TERMINATING))) 823 break; 824 } 825 else if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, 826 s | TERMINATING)) 827 break; 828 } 829 } 830 831 /** 832 * Sets state to TERMINATED. Called only by onTermination(). 833 */ 834 private void setTerminated() { 835 int s; 836 do {} while (!UNSAFE.compareAndSwapInt(this, runStateOffset, 837 s = runState, 838 s | (TERMINATING|TERMINATED))); 839 } 840 841 /** 842 * If suspended, tries to set status to unsuspended. 843 * Does NOT wake up if blocked. 844 * 845 * @return true if successful 846 */ 847 final boolean tryUnsuspend() { 848 int s; 849 while (((s = runState) & SUSPENDED) != 0) { 850 if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, 851 s & ~SUSPENDED)) 852 return true; 853 } 854 return false; 855 } 856 857 /** 858 * Sets suspended status and blocks as spare until resumed 859 * or shutdown. 860 */ 861 final void suspendAsSpare() { 862 for (;;) { // set suspended unless terminating 863 int s = runState; 864 if ((s & TERMINATING) != 0) { // must kill 865 if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, 866 s | (TRIMMED | TERMINATING))) 867 return; 868 } 869 else if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, 870 s | SUSPENDED)) 871 break; 872 } 873 ForkJoinPool p = pool; 874 p.pushSpare(this); 875 while ((runState & SUSPENDED) != 0) { 876 if (p.tryAccumulateStealCount(this)) { 877 interrupted(); // clear/ignore interrupts 878 if ((runState & SUSPENDED) == 0) 879 break; 880 LockSupport.park(this); 881 } 882 } 883 } 884 885 // Misc support methods for ForkJoinPool 886 887 /** 888 * Returns an estimate of the number of tasks in the queue. Also 889 * used by ForkJoinTask. 890 */ 891 final int getQueueSize() { 892 int n; // external calls must read base first 893 return (n = -base + sp) <= 0 ? 0 : n; 894 } 895 896 /** 897 * Removes and cancels all tasks in queue. Can be called from any 898 * thread. 899 */ 900 final void cancelTasks() { 901 ForkJoinTask<?> cj = currentJoin; // try to cancel ongoing tasks 902 if (cj != null && cj.status >= 0) { 903 cj.cancelIgnoringExceptions(); 904 try { 905 this.interrupt(); // awaken wait 906 } catch (SecurityException ignore) { 907 } 908 } 909 ForkJoinTask<?> cs = currentSteal; 910 if (cs != null && cs.status >= 0) 911 cs.cancelIgnoringExceptions(); 912 while (base != sp) { 913 ForkJoinTask<?> t = deqTask(); 914 if (t != null) 915 t.cancelIgnoringExceptions(); 916 } 917 } 918 919 /** 920 * Drains tasks to given collection c. 921 * 922 * @return the number of tasks drained 923 */ 924 final int drainTasksTo(Collection<? super ForkJoinTask<?>> c) { 925 int n = 0; 926 while (base != sp) { 927 ForkJoinTask<?> t = deqTask(); 928 if (t != null) { 929 c.add(t); 930 ++n; 931 } 932 } 933 return n; 934 } 935 936 // Support methods for ForkJoinTask 937 938 /** 939 * Gets and removes a local task. 940 * 941 * @return a task, if available 942 */ 943 final ForkJoinTask<?> pollLocalTask() { 944 ForkJoinPool p = pool; 945 while (sp != base) { 946 int a; // inline p.tryIncrementActiveCount 947 if (active || 948 (active = UNSAFE.compareAndSwapInt(p, poolRunStateOffset, 949 a = p.runState, a + 1))) 950 return locallyFifo ? locallyDeqTask() : popTask(); 951 } 952 return null; 953 } 954 955 /** 956 * Gets and removes a local or stolen task. 957 * 958 * @return a task, if available 959 */ 960 final ForkJoinTask<?> pollTask() { 961 ForkJoinTask<?> t = pollLocalTask(); 962 if (t == null) { 963 t = scan(); 964 // cannot retain/track/help steal 965 UNSAFE.putOrderedObject(this, currentStealOffset, null); 966 } 967 return t; 968 } 969 970 /** 971 * Possibly runs some tasks and/or blocks, until task is done. 972 * 973 * @param joinMe the task to join 974 * @param timed true if use timed wait 975 * @param nanos wait time if timed 976 */ 977 final void joinTask(ForkJoinTask<?> joinMe, boolean timed, long nanos) { 978 // currentJoin only written by this thread; only need ordered store 979 ForkJoinTask<?> prevJoin = currentJoin; 980 UNSAFE.putOrderedObject(this, currentJoinOffset, joinMe); 981 pool.awaitJoin(joinMe, this, timed, nanos); 982 UNSAFE.putOrderedObject(this, currentJoinOffset, prevJoin); 983 } 984 985 /** 986 * Tries to locate and help perform tasks for a stealer of the 987 * given task, or in turn one of its stealers. Traces 988 * currentSteal->currentJoin links looking for a thread working on 989 * a descendant of the given task and with a non-empty queue to 990 * steal back and execute tasks from. 991 * 992 * The implementation is very branchy to cope with potential 993 * inconsistencies or loops encountering chains that are stale, 994 * unknown, or of length greater than MAX_HELP_DEPTH links. All 995 * of these cases are dealt with by just returning back to the 996 * caller, who is expected to retry if other join mechanisms also 997 * don't work out. 998 * 999 * @param joinMe the task to join 1000 * @param running if false, then must update pool count upon 1001 * running a task 1002 * @return value of running on exit 1003 */ 1004 final boolean helpJoinTask(ForkJoinTask<?> joinMe, boolean running) { 1005 /* 1006 * Initial checks to (1) abort if terminating; (2) clean out 1007 * old cancelled tasks from local queue; (3) if joinMe is next 1008 * task, run it; (4) omit scan if local queue nonempty (since 1009 * it may contain non-descendents of joinMe). 1010 */ 1011 ForkJoinPool p = pool; 1012 for (;;) { 1013 ForkJoinTask<?>[] q; 1014 int s; 1015 if (joinMe.status < 0) 1016 return running; 1017 else if ((runState & TERMINATING) != 0) { 1018 joinMe.cancelIgnoringExceptions(); 1019 return running; 1020 } 1021 else if ((s = sp) == base || (q = queue) == null) 1022 break; // queue empty 1023 else { 1024 int i = (q.length - 1) & --s; 1025 long u = (i << qShift) + qBase; // raw offset 1026 ForkJoinTask<?> t = q[i]; 1027 if (t == null) 1028 break; // lost to a stealer 1029 else if (t != joinMe && t.status >= 0) 1030 return running; // cannot safely help 1031 else if ((running || 1032 (running = p.tryIncrementRunningCount())) && 1033 UNSAFE.compareAndSwapObject(q, u, t, null)) { 1034 sp = s; // putOrderedInt may encourage more timely write 1035 // UNSAFE.putOrderedInt(this, spOffset, s); 1036 t.quietlyExec(); 1037 } 1038 } 1039 } 1040 1041 int n; // worker array size 1042 ForkJoinWorkerThread[] ws = p.workers; 1043 if (ws != null && (n = ws.length) > 1) { // need at least 2 workers 1044 ForkJoinTask<?> task = joinMe; // base of chain 1045 ForkJoinWorkerThread thread = this; // thread with stolen task 1046 1047 outer:for (int d = 0; d < MAX_HELP_DEPTH; ++d) { // chain length 1048 // Try to find v, the stealer of task, by first using hint 1049 ForkJoinWorkerThread v = ws[thread.stealHint & (n - 1)]; 1050 if (v == null || v.currentSteal != task) { 1051 for (int j = 0; ; ++j) { // search array 1052 if (j < n) { 1053 ForkJoinTask<?> vs; 1054 if ((v = ws[j]) != null && 1055 (vs = v.currentSteal) != null) { 1056 if (joinMe.status < 0) 1057 break outer; 1058 if (vs == task) { 1059 if (task.status < 0) 1060 break outer; // stale 1061 thread.stealHint = j; 1062 break; // save hint for next time 1063 } 1064 } 1065 } 1066 else 1067 break outer; // no stealer 1068 } 1069 } 1070 1071 // Try to help v, using specialized form of deqTask 1072 for (;;) { 1073 if (joinMe.status < 0) 1074 break outer; 1075 int b = v.base; 1076 ForkJoinTask<?>[] q = v.queue; 1077 if (b == v.sp || q == null) 1078 break; // empty 1079 int i = (q.length - 1) & b; 1080 long u = (i << qShift) + qBase; 1081 ForkJoinTask<?> t = q[i]; 1082 if (task.status < 0) 1083 break outer; // stale 1084 if (t != null && 1085 (running || 1086 (running = p.tryIncrementRunningCount())) && 1087 v.base == b++ && 1088 UNSAFE.compareAndSwapObject(q, u, t, null)) { 1089 if (t != joinMe && joinMe.status < 0) { 1090 UNSAFE.putObjectVolatile(q, u, t); 1091 break outer; // joinMe cancelled; back out 1092 } 1093 v.base = b; 1094 if (t.status >= 0) { 1095 ForkJoinTask<?> ps = currentSteal; 1096 int pid = poolIndex; 1097 v.stealHint = pid; 1098 UNSAFE.putOrderedObject(this, 1099 currentStealOffset, t); 1100 t.quietlyExec(); 1101 UNSAFE.putOrderedObject(this, 1102 currentStealOffset, ps); 1103 } 1104 } 1105 else if ((runState & TERMINATING) != 0) { 1106 joinMe.cancelIgnoringExceptions(); 1107 break outer; 1108 } 1109 } 1110 1111 // Try to descend to find v's stealer 1112 ForkJoinTask<?> next = v.currentJoin; 1113 if (task.status < 0 || next == null || next == task || 1114 joinMe.status < 0) 1115 break; // done, stale, dead-end, or cyclic 1116 task = next; 1117 thread = v; 1118 } 1119 } 1120 return running; 1121 } 1122 1123 /** 1124 * Implements ForkJoinTask.getSurplusQueuedTaskCount(). 1125 * Returns an estimate of the number of tasks, offset by a 1126 * function of number of idle workers. 1127 * 1128 * This method provides a cheap heuristic guide for task 1129 * partitioning when programmers, frameworks, tools, or languages 1130 * have little or no idea about task granularity. In essence by 1131 * offering this method, we ask users only about tradeoffs in 1132 * overhead vs expected throughput and its variance, rather than 1133 * how finely to partition tasks. 1134 * 1135 * In a steady state strict (tree-structured) computation, each 1136 * thread makes available for stealing enough tasks for other 1137 * threads to remain active. Inductively, if all threads play by 1138 * the same rules, each thread should make available only a 1139 * constant number of tasks. 1140 * 1141 * The minimum useful constant is just 1. But using a value of 1 1142 * would require immediate replenishment upon each steal to 1143 * maintain enough tasks, which is infeasible. Further, 1144 * partitionings/granularities of offered tasks should minimize 1145 * steal rates, which in general means that threads nearer the top 1146 * of computation tree should generate more than those nearer the 1147 * bottom. In perfect steady state, each thread is at 1148 * approximately the same level of computation tree. However, 1149 * producing extra tasks amortizes the uncertainty of progress and 1150 * diffusion assumptions. 1151 * 1152 * So, users will want to use values larger, but not much larger 1153 * than 1 to both smooth over transient shortages and hedge 1154 * against uneven progress; as traded off against the cost of 1155 * extra task overhead. We leave the user to pick a threshold 1156 * value to compare with the results of this call to guide 1157 * decisions, but recommend values such as 3. 1158 * 1159 * When all threads are active, it is on average OK to estimate 1160 * surplus strictly locally. In steady-state, if one thread is 1161 * maintaining say 2 surplus tasks, then so are others. So we can 1162 * just use estimated queue length (although note that (sp - base) 1163 * can be an overestimate because of stealers lagging increments 1164 * of base). However, this strategy alone leads to serious 1165 * mis-estimates in some non-steady-state conditions (ramp-up, 1166 * ramp-down, other stalls). We can detect many of these by 1167 * further considering the number of "idle" threads, that are 1168 * known to have zero queued tasks, so compensate by a factor of 1169 * (#idle/#active) threads. 1170 */ 1171 final int getEstimatedSurplusTaskCount() { 1172 return sp - base - pool.idlePerActive(); 1173 } 1174 1175 /** 1176 * Runs tasks until {@code pool.isQuiescent()}. 1177 */ 1178 final void helpQuiescePool() { 1179 ForkJoinTask<?> ps = currentSteal; // to restore below 1180 for (;;) { 1181 ForkJoinTask<?> t = pollLocalTask(); 1182 if (t != null || (t = scan()) != null) 1183 t.quietlyExec(); 1184 else { 1185 ForkJoinPool p = pool; 1186 int a; // to inline CASes 1187 if (active) { 1188 if (!UNSAFE.compareAndSwapInt 1189 (p, poolRunStateOffset, a = p.runState, a - 1)) 1190 continue; // retry later 1191 active = false; // inactivate 1192 UNSAFE.putOrderedObject(this, currentStealOffset, ps); 1193 } 1194 if (p.isQuiescent()) { 1195 active = true; // re-activate 1196 do {} while (!UNSAFE.compareAndSwapInt 1197 (p, poolRunStateOffset, a = p.runState, a+1)); 1198 return; 1199 } 1200 } 1201 } 1202 } 1203 1204 // Unsafe mechanics 1205 1206 private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe(); 1207 private static final long spOffset = 1208 objectFieldOffset("sp", ForkJoinWorkerThread.class); 1209 private static final long runStateOffset = 1210 objectFieldOffset("runState", ForkJoinWorkerThread.class); 1211 private static final long currentJoinOffset = 1212 objectFieldOffset("currentJoin", ForkJoinWorkerThread.class); 1213 private static final long currentStealOffset = 1214 objectFieldOffset("currentSteal", ForkJoinWorkerThread.class); 1215 private static final long qBase = 1216 UNSAFE.arrayBaseOffset(ForkJoinTask[].class); 1217 private static final long poolRunStateOffset = // to inline CAS 1218 objectFieldOffset("runState", ForkJoinPool.class); 1219 1220 private static final int qShift; 1221 1222 static { 1223 int s = UNSAFE.arrayIndexScale(ForkJoinTask[].class); 1224 if ((s & (s-1)) != 0) 1225 throw new Error("data type scale not a power of two"); 1226 qShift = 31 - Integer.numberOfLeadingZeros(s); 1227 MAXIMUM_QUEUE_CAPACITY = 1 << (31 - qShift); 1228 } 1229 1230 private static long objectFieldOffset(String field, Class<?> klazz) { 1231 try { 1232 return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field)); 1233 } catch (NoSuchFieldException e) { 1234 // Convert Exception to corresponding Error 1235 NoSuchFieldError error = new NoSuchFieldError(field); 1236 error.initCause(e); 1237 throw error; 1238 } 1239 } 1240 } --- EOF ---