1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/licenses/publicdomain 34 */ 35 36 package java.util.concurrent; 37 38 import java.util.Random; 39 import java.util.Collection; 40 import java.util.concurrent.locks.LockSupport; 41 42 /** 43 * A thread managed by a {@link ForkJoinPool}. This class is 44 * subclassable solely for the sake of adding functionality -- there 45 * are no overridable methods dealing with scheduling or execution. 46 * However, you can override initialization and termination methods 47 * surrounding the main task processing loop. If you do create such a 48 * subclass, you will also need to supply a custom {@link 49 * ForkJoinPool.ForkJoinWorkerThreadFactory} to use it in a {@code 50 * ForkJoinPool}. 51 * 52 * @since 1.7 53 * @author Doug Lea 54 */ 55 public class ForkJoinWorkerThread extends Thread { 56 /* 57 * Overview: 58 * 59 * ForkJoinWorkerThreads are managed by ForkJoinPools and perform 60 * ForkJoinTasks. This class includes bookkeeping in support of 61 * worker activation, suspension, and lifecycle control described 62 * in more detail in the internal documentation of class 63 * ForkJoinPool. And as described further below, this class also 64 * includes special-cased support for some ForkJoinTask 65 * methods. But the main mechanics involve work-stealing: 66 * 67 * Work-stealing queues are special forms of Deques that support 68 * only three of the four possible end-operations -- push, pop, 69 * and deq (aka steal), under the further constraints that push 70 * and pop are called only from the owning thread, while deq may 71 * be called from other threads. (If you are unfamiliar with 72 * them, you probably want to read Herlihy and Shavit's book "The 73 * Art of Multiprocessor programming", chapter 16 describing these 74 * in more detail before proceeding.) The main work-stealing 75 * queue design is roughly similar to those in the papers "Dynamic 76 * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005 77 * (http://research.sun.com/scalable/pubs/index.html) and 78 * "Idempotent work stealing" by Michael, Saraswat, and Vechev, 79 * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186). 80 * The main differences ultimately stem from gc requirements that 81 * we null out taken slots as soon as we can, to maintain as small 82 * a footprint as possible even in programs generating huge 83 * numbers of tasks. To accomplish this, we shift the CAS 84 * arbitrating pop vs deq (steal) from being on the indices 85 * ("base" and "sp") to the slots themselves (mainly via method 86 * "casSlotNull()"). So, both a successful pop and deq mainly 87 * entail a CAS of a slot from non-null to null. Because we rely 88 * on CASes of references, we do not need tag bits on base or sp. 89 * They are simple ints as used in any circular array-based queue 90 * (see for example ArrayDeque). Updates to the indices must 91 * still be ordered in a way that guarantees that sp == base means 92 * the queue is empty, but otherwise may err on the side of 93 * possibly making the queue appear nonempty when a push, pop, or 94 * deq have not fully committed. Note that this means that the deq 95 * operation, considered individually, is not wait-free. One thief 96 * cannot successfully continue until another in-progress one (or, 97 * if previously empty, a push) completes. However, in the 98 * aggregate, we ensure at least probabilistic non-blockingness. 99 * If an attempted steal fails, a thief always chooses a different 100 * random victim target to try next. So, in order for one thief to 101 * progress, it suffices for any in-progress deq or new push on 102 * any empty queue to complete. One reason this works well here is 103 * that apparently-nonempty often means soon-to-be-stealable, 104 * which gives threads a chance to set activation status if 105 * necessary before stealing. 106 * 107 * This approach also enables support for "async mode" where local 108 * task processing is in FIFO, not LIFO order; simply by using a 109 * version of deq rather than pop when locallyFifo is true (as set 110 * by the ForkJoinPool). This allows use in message-passing 111 * frameworks in which tasks are never joined. 112 * 113 * When a worker would otherwise be blocked waiting to join a 114 * task, it first tries a form of linear helping: Each worker 115 * records (in field currentSteal) the most recent task it stole 116 * from some other worker. Plus, it records (in field currentJoin) 117 * the task it is currently actively joining. Method joinTask uses 118 * these markers to try to find a worker to help (i.e., steal back 119 * a task from and execute it) that could hasten completion of the 120 * actively joined task. In essence, the joiner executes a task 121 * that would be on its own local deque had the to-be-joined task 122 * not been stolen. This may be seen as a conservative variant of 123 * the approach in Wagner & Calder "Leapfrogging: a portable 124 * technique for implementing efficient futures" SIGPLAN Notices, 125 * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs 126 * in that: (1) We only maintain dependency links across workers 127 * upon steals, rather than use per-task bookkeeping. This may 128 * require a linear scan of workers array to locate stealers, but 129 * usually doesn't because stealers leave hints (that may become 130 * stale/wrong) of where to locate them. This isolates cost to 131 * when it is needed, rather than adding to per-task overhead. 132 * (2) It is "shallow", ignoring nesting and potentially cyclic 133 * mutual steals. (3) It is intentionally racy: field currentJoin 134 * is updated only while actively joining, which means that we 135 * miss links in the chain during long-lived tasks, GC stalls etc 136 * (which is OK since blocking in such cases is usually a good 137 * idea). (4) We bound the number of attempts to find work (see 138 * MAX_HELP_DEPTH) and fall back to suspending the worker and if 139 * necessary replacing it with a spare (see 140 * ForkJoinPool.awaitJoin). 141 * 142 * Efficient implementation of these algorithms currently relies 143 * on an uncomfortable amount of "Unsafe" mechanics. To maintain 144 * correct orderings, reads and writes of variable base require 145 * volatile ordering. Variable sp does not require volatile 146 * writes but still needs store-ordering, which we accomplish by 147 * pre-incrementing sp before filling the slot with an ordered 148 * store. (Pre-incrementing also enables backouts used in 149 * joinTask.) Because they are protected by volatile base reads, 150 * reads of the queue array and its slots by other threads do not 151 * need volatile load semantics, but writes (in push) require 152 * store order and CASes (in pop and deq) require (volatile) CAS 153 * semantics. (Michael, Saraswat, and Vechev's algorithm has 154 * similar properties, but without support for nulling slots.) 155 * Since these combinations aren't supported using ordinary 156 * volatiles, the only way to accomplish these efficiently is to 157 * use direct Unsafe calls. (Using external AtomicIntegers and 158 * AtomicReferenceArrays for the indices and array is 159 * significantly slower because of memory locality and indirection 160 * effects.) 161 * 162 * Further, performance on most platforms is very sensitive to 163 * placement and sizing of the (resizable) queue array. Even 164 * though these queues don't usually become all that big, the 165 * initial size must be large enough to counteract cache 166 * contention effects across multiple queues (especially in the 167 * presence of GC cardmarking). Also, to improve thread-locality, 168 * queues are initialized after starting. All together, these 169 * low-level implementation choices produce as much as a factor of 170 * 4 performance improvement compared to naive implementations, 171 * and enable the processing of billions of tasks per second, 172 * sometimes at the expense of ugliness. 173 */ 174 175 /** 176 * Generator for initial random seeds for random victim 177 * selection. This is used only to create initial seeds. Random 178 * steals use a cheaper xorshift generator per steal attempt. We 179 * expect only rare contention on seedGenerator, so just use a 180 * plain Random. 181 */ 182 private static final Random seedGenerator = new Random(); 183 184 /** 185 * The maximum stolen->joining link depth allowed in helpJoinTask. 186 * Depths for legitimate chains are unbounded, but we use a fixed 187 * constant to avoid (otherwise unchecked) cycles and bound 188 * staleness of traversal parameters at the expense of sometimes 189 * blocking when we could be helping. 190 */ 191 private static final int MAX_HELP_DEPTH = 8; 192 193 /** 194 * Capacity of work-stealing queue array upon initialization. 195 * Must be a power of two. Initial size must be at least 4, but is 196 * padded to minimize cache effects. 197 */ 198 private static final int INITIAL_QUEUE_CAPACITY = 1 << 13; 199 200 /** 201 * Maximum work-stealing queue array size. Must be less than or 202 * equal to 1 << (31 - width of array entry) to ensure lack of 203 * index wraparound. The value is set in the static block 204 * at the end of this file after obtaining width. 205 */ 206 private static final int MAXIMUM_QUEUE_CAPACITY; 207 208 /** 209 * The pool this thread works in. Accessed directly by ForkJoinTask. 210 */ 211 final ForkJoinPool pool; 212 213 /** 214 * The work-stealing queue array. Size must be a power of two. 215 * Initialized in onStart, to improve memory locality. 216 */ 217 private ForkJoinTask<?>[] queue; 218 219 /** 220 * Index (mod queue.length) of least valid queue slot, which is 221 * always the next position to steal from if nonempty. 222 */ 223 private volatile int base; 224 225 /** 226 * Index (mod queue.length) of next queue slot to push to or pop 227 * from. It is written only by owner thread, and accessed by other 228 * threads only after reading (volatile) base. Both sp and base 229 * are allowed to wrap around on overflow, but (sp - base) still 230 * estimates size. 231 */ 232 private int sp; 233 234 /** 235 * The index of most recent stealer, used as a hint to avoid 236 * traversal in method helpJoinTask. This is only a hint because a 237 * worker might have had multiple steals and this only holds one 238 * of them (usually the most current). Declared non-volatile, 239 * relying on other prevailing sync to keep reasonably current. 240 */ 241 private int stealHint; 242 243 /** 244 * Run state of this worker. In addition to the usual run levels, 245 * tracks if this worker is suspended as a spare, and if it was 246 * killed (trimmed) while suspended. However, "active" status is 247 * maintained separately and modified only in conjunction with 248 * CASes of the pool's runState (which are currently sadly 249 * manually inlined for performance.) Accessed directly by pool 250 * to simplify checks for normal (zero) status. 251 */ 252 volatile int runState; 253 254 private static final int TERMINATING = 0x01; 255 private static final int TERMINATED = 0x02; 256 private static final int SUSPENDED = 0x04; // inactive spare 257 private static final int TRIMMED = 0x08; // killed while suspended 258 259 /** 260 * Number of steals. Directly accessed (and reset) by 261 * pool.tryAccumulateStealCount when idle. 262 */ 263 int stealCount; 264 265 /** 266 * Seed for random number generator for choosing steal victims. 267 * Uses Marsaglia xorshift. Must be initialized as nonzero. 268 */ 269 private int seed; 270 271 /** 272 * Activity status. When true, this worker is considered active. 273 * Accessed directly by pool. Must be false upon construction. 274 */ 275 boolean active; 276 277 /** 278 * True if use local fifo, not default lifo, for local polling. 279 * Shadows value from ForkJoinPool. 280 */ 281 private final boolean locallyFifo; 282 283 /** 284 * Index of this worker in pool array. Set once by pool before 285 * running, and accessed directly by pool to locate this worker in 286 * its workers array. 287 */ 288 int poolIndex; 289 290 /** 291 * The last pool event waited for. Accessed only by pool in 292 * callback methods invoked within this thread. 293 */ 294 int lastEventCount; 295 296 /** 297 * Encoded index and event count of next event waiter. Accessed 298 * only by ForkJoinPool for managing event waiters. 299 */ 300 volatile long nextWaiter; 301 302 /** 303 * Number of times this thread suspended as spare. Accessed only 304 * by pool. 305 */ 306 int spareCount; 307 308 /** 309 * Encoded index and count of next spare waiter. Accessed only 310 * by ForkJoinPool for managing spares. 311 */ 312 volatile int nextSpare; 313 314 /** 315 * The task currently being joined, set only when actively trying 316 * to help other stealers in helpJoinTask. Written only by this 317 * thread, but read by others. 318 */ 319 private volatile ForkJoinTask<?> currentJoin; 320 321 /** 322 * The task most recently stolen from another worker (or 323 * submission queue). Written only by this thread, but read by 324 * others. 325 */ 326 private volatile ForkJoinTask<?> currentSteal; 327 328 /** 329 * Creates a ForkJoinWorkerThread operating in the given pool. 330 * 331 * @param pool the pool this thread works in 332 * @throws NullPointerException if pool is null 333 */ 334 protected ForkJoinWorkerThread(ForkJoinPool pool) { 335 this.pool = pool; 336 this.locallyFifo = pool.locallyFifo; 337 setDaemon(true); 338 // To avoid exposing construction details to subclasses, 339 // remaining initialization is in start() and onStart() 340 } 341 342 /** 343 * Performs additional initialization and starts this thread. 344 */ 345 final void start(int poolIndex, UncaughtExceptionHandler ueh) { 346 this.poolIndex = poolIndex; 347 if (ueh != null) 348 setUncaughtExceptionHandler(ueh); 349 start(); 350 } 351 352 // Public/protected methods 353 354 /** 355 * Returns the pool hosting this thread. 356 * 357 * @return the pool 358 */ 359 public ForkJoinPool getPool() { 360 return pool; 361 } 362 363 /** 364 * Returns the index number of this thread in its pool. The 365 * returned value ranges from zero to the maximum number of 366 * threads (minus one) that have ever been created in the pool. 367 * This method may be useful for applications that track status or 368 * collect results per-worker rather than per-task. 369 * 370 * @return the index number 371 */ 372 public int getPoolIndex() { 373 return poolIndex; 374 } 375 376 /** 377 * Initializes internal state after construction but before 378 * processing any tasks. If you override this method, you must 379 * invoke @code{super.onStart()} at the beginning of the method. 380 * Initialization requires care: Most fields must have legal 381 * default values, to ensure that attempted accesses from other 382 * threads work correctly even before this thread starts 383 * processing tasks. 384 */ 385 protected void onStart() { 386 int rs = seedGenerator.nextInt(); 387 seed = rs == 0? 1 : rs; // seed must be nonzero 388 389 // Allocate name string and arrays in this thread 390 String pid = Integer.toString(pool.getPoolNumber()); 391 String wid = Integer.toString(poolIndex); 392 setName("ForkJoinPool-" + pid + "-worker-" + wid); 393 394 queue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY]; 395 } 396 397 /** 398 * Performs cleanup associated with termination of this worker 399 * thread. If you override this method, you must invoke 400 * {@code super.onTermination} at the end of the overridden method. 401 * 402 * @param exception the exception causing this thread to abort due 403 * to an unrecoverable error, or {@code null} if completed normally 404 */ 405 protected void onTermination(Throwable exception) { 406 try { 407 ForkJoinPool p = pool; 408 if (active) { 409 int a; // inline p.tryDecrementActiveCount 410 active = false; 411 do {} while (!UNSAFE.compareAndSwapInt 412 (p, poolRunStateOffset, a = p.runState, a - 1)); 413 } 414 cancelTasks(); 415 setTerminated(); 416 p.workerTerminated(this); 417 } catch (Throwable ex) { // Shouldn't ever happen 418 if (exception == null) // but if so, at least rethrown 419 exception = ex; 420 } finally { 421 if (exception != null) 422 UNSAFE.throwException(exception); 423 } 424 } 425 426 /** 427 * This method is required to be public, but should never be 428 * called explicitly. It performs the main run loop to execute 429 * ForkJoinTasks. 430 */ 431 public void run() { 432 Throwable exception = null; 433 try { 434 onStart(); 435 mainLoop(); 436 } catch (Throwable ex) { 437 exception = ex; 438 } finally { 439 onTermination(exception); 440 } 441 } 442 443 // helpers for run() 444 445 /** 446 * Finds and executes tasks, and checks status while running. 447 */ 448 private void mainLoop() { 449 boolean ran = false; // true if ran a task on last step 450 ForkJoinPool p = pool; 451 for (;;) { 452 p.preStep(this, ran); 453 if (runState != 0) 454 break; 455 ran = tryExecSteal() || tryExecSubmission(); 456 } 457 } 458 459 /** 460 * Tries to steal a task and execute it. 461 * 462 * @return true if ran a task 463 */ 464 private boolean tryExecSteal() { 465 ForkJoinTask<?> t; 466 if ((t = scan()) != null) { 467 t.quietlyExec(); 468 UNSAFE.putOrderedObject(this, currentStealOffset, null); 469 if (sp != base) 470 execLocalTasks(); 471 return true; 472 } 473 return false; 474 } 475 476 /** 477 * If a submission exists, try to activate and run it. 478 * 479 * @return true if ran a task 480 */ 481 private boolean tryExecSubmission() { 482 ForkJoinPool p = pool; 483 // This loop is needed in case attempt to activate fails, in 484 // which case we only retry if there still appears to be a 485 // submission. 486 while (p.hasQueuedSubmissions()) { 487 ForkJoinTask<?> t; int a; 488 if (active || // inline p.tryIncrementActiveCount 489 (active = UNSAFE.compareAndSwapInt(p, poolRunStateOffset, 490 a = p.runState, a + 1))) { 491 if ((t = p.pollSubmission()) != null) { 492 UNSAFE.putOrderedObject(this, currentStealOffset, t); 493 t.quietlyExec(); 494 UNSAFE.putOrderedObject(this, currentStealOffset, null); 495 if (sp != base) 496 execLocalTasks(); 497 return true; 498 } 499 } 500 } 501 return false; 502 } 503 504 /** 505 * Runs local tasks until queue is empty or shut down. Call only 506 * while active. 507 */ 508 private void execLocalTasks() { 509 while (runState == 0) { 510 ForkJoinTask<?> t = locallyFifo ? locallyDeqTask() : popTask(); 511 if (t != null) 512 t.quietlyExec(); 513 else if (sp == base) 514 break; 515 } 516 } 517 518 /* 519 * Intrinsics-based atomic writes for queue slots. These are 520 * basically the same as methods in AtomicReferenceArray, but 521 * specialized for (1) ForkJoinTask elements (2) requirement that 522 * nullness and bounds checks have already been performed by 523 * callers and (3) effective offsets are known not to overflow 524 * from int to long (because of MAXIMUM_QUEUE_CAPACITY). We don't 525 * need corresponding version for reads: plain array reads are OK 526 * because they are protected by other volatile reads and are 527 * confirmed by CASes. 528 * 529 * Most uses don't actually call these methods, but instead contain 530 * inlined forms that enable more predictable optimization. We 531 * don't define the version of write used in pushTask at all, but 532 * instead inline there a store-fenced array slot write. 533 */ 534 535 /** 536 * CASes slot i of array q from t to null. Caller must ensure q is 537 * non-null and index is in range. 538 */ 539 private static final boolean casSlotNull(ForkJoinTask<?>[] q, int i, 540 ForkJoinTask<?> t) { 541 return UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, t, null); 542 } 543 544 /** 545 * Performs a volatile write of the given task at given slot of 546 * array q. Caller must ensure q is non-null and index is in 547 * range. This method is used only during resets and backouts. 548 */ 549 private static final void writeSlot(ForkJoinTask<?>[] q, int i, 550 ForkJoinTask<?> t) { 551 UNSAFE.putObjectVolatile(q, (i << qShift) + qBase, t); 552 } 553 554 // queue methods 555 556 /** 557 * Pushes a task. Call only from this thread. 558 * 559 * @param t the task. Caller must ensure non-null. 560 */ 561 final void pushTask(ForkJoinTask<?> t) { 562 ForkJoinTask<?>[] q = queue; 563 int mask = q.length - 1; // implicit assert q != null 564 int s = sp++; // ok to increment sp before slot write 565 UNSAFE.putOrderedObject(q, ((s & mask) << qShift) + qBase, t); 566 if ((s -= base) == 0) 567 pool.signalWork(); // was empty 568 else if (s == mask) 569 growQueue(); // is full 570 } 571 572 /** 573 * Tries to take a task from the base of the queue, failing if 574 * empty or contended. Note: Specializations of this code appear 575 * in locallyDeqTask and elsewhere. 576 * 577 * @return a task, or null if none or contended 578 */ 579 final ForkJoinTask<?> deqTask() { 580 ForkJoinTask<?> t; 581 ForkJoinTask<?>[] q; 582 int b, i; 583 if (sp != (b = base) && 584 (q = queue) != null && // must read q after b 585 (t = q[i = (q.length - 1) & b]) != null && base == b && 586 UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, t, null)) { 587 base = b + 1; 588 return t; 589 } 590 return null; 591 } 592 593 /** 594 * Tries to take a task from the base of own queue. Assumes active 595 * status. Called only by this thread. 596 * 597 * @return a task, or null if none 598 */ 599 final ForkJoinTask<?> locallyDeqTask() { 600 ForkJoinTask<?>[] q = queue; 601 if (q != null) { 602 ForkJoinTask<?> t; 603 int b, i; 604 while (sp != (b = base)) { 605 if ((t = q[i = (q.length - 1) & b]) != null && base == b && 606 UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, 607 t, null)) { 608 base = b + 1; 609 return t; 610 } 611 } 612 } 613 return null; 614 } 615 616 /** 617 * Returns a popped task, or null if empty. Assumes active status. 618 * Called only by this thread. 619 */ 620 private ForkJoinTask<?> popTask() { 621 ForkJoinTask<?>[] q = queue; 622 if (q != null) { 623 int s; 624 while ((s = sp) != base) { 625 int i = (q.length - 1) & --s; 626 long u = (i << qShift) + qBase; // raw offset 627 ForkJoinTask<?> t = q[i]; 628 if (t == null) // lost to stealer 629 break; 630 if (UNSAFE.compareAndSwapObject(q, u, t, null)) { 631 sp = s; // putOrderedInt may encourage more timely write 632 // UNSAFE.putOrderedInt(this, spOffset, s); 633 return t; 634 } 635 } 636 } 637 return null; 638 } 639 640 /** 641 * Specialized version of popTask to pop only if topmost element 642 * is the given task. Called only by this thread while active. 643 * 644 * @param t the task. Caller must ensure non-null. 645 */ 646 final boolean unpushTask(ForkJoinTask<?> t) { 647 int s; 648 ForkJoinTask<?>[] q = queue; 649 if ((s = sp) != base && q != null && 650 UNSAFE.compareAndSwapObject 651 (q, (((q.length - 1) & --s) << qShift) + qBase, t, null)) { 652 sp = s; // putOrderedInt may encourage more timely write 653 // UNSAFE.putOrderedInt(this, spOffset, s); 654 return true; 655 } 656 return false; 657 } 658 659 /** 660 * Returns next task, or null if empty or contended. 661 */ 662 final ForkJoinTask<?> peekTask() { 663 ForkJoinTask<?>[] q = queue; 664 if (q == null) 665 return null; 666 int mask = q.length - 1; 667 int i = locallyFifo ? base : (sp - 1); 668 return q[i & mask]; 669 } 670 671 /** 672 * Doubles queue array size. Transfers elements by emulating 673 * steals (deqs) from old array and placing, oldest first, into 674 * new array. 675 */ 676 private void growQueue() { 677 ForkJoinTask<?>[] oldQ = queue; 678 int oldSize = oldQ.length; 679 int newSize = oldSize << 1; 680 if (newSize > MAXIMUM_QUEUE_CAPACITY) 681 throw new RejectedExecutionException("Queue capacity exceeded"); 682 ForkJoinTask<?>[] newQ = queue = new ForkJoinTask<?>[newSize]; 683 684 int b = base; 685 int bf = b + oldSize; 686 int oldMask = oldSize - 1; 687 int newMask = newSize - 1; 688 do { 689 int oldIndex = b & oldMask; 690 ForkJoinTask<?> t = oldQ[oldIndex]; 691 if (t != null && !casSlotNull(oldQ, oldIndex, t)) 692 t = null; 693 writeSlot(newQ, b & newMask, t); 694 } while (++b != bf); 695 pool.signalWork(); 696 } 697 698 /** 699 * Computes next value for random victim probe in scan(). Scans 700 * don't require a very high quality generator, but also not a 701 * crummy one. Marsaglia xor-shift is cheap and works well enough. 702 * Note: This is manually inlined in scan(). 703 */ 704 private static final int xorShift(int r) { 705 r ^= r << 13; 706 r ^= r >>> 17; 707 return r ^ (r << 5); 708 } 709 710 /** 711 * Tries to steal a task from another worker. Starts at a random 712 * index of workers array, and probes workers until finding one 713 * with non-empty queue or finding that all are empty. It 714 * randomly selects the first n probes. If these are empty, it 715 * resorts to a circular sweep, which is necessary to accurately 716 * set active status. (The circular sweep uses steps of 717 * approximately half the array size plus 1, to avoid bias 718 * stemming from leftmost packing of the array in ForkJoinPool.) 719 * 720 * This method must be both fast and quiet -- usually avoiding 721 * memory accesses that could disrupt cache sharing etc other than 722 * those needed to check for and take tasks (or to activate if not 723 * already active). This accounts for, among other things, 724 * updating random seed in place without storing it until exit. 725 * 726 * @return a task, or null if none found 727 */ 728 private ForkJoinTask<?> scan() { 729 ForkJoinPool p = pool; 730 ForkJoinWorkerThread[] ws; // worker array 731 int n; // upper bound of #workers 732 if ((ws = p.workers) != null && (n = ws.length) > 1) { 733 boolean canSteal = active; // shadow active status 734 int r = seed; // extract seed once 735 int mask = n - 1; 736 int j = -n; // loop counter 737 int k = r; // worker index, random if j < 0 738 for (;;) { 739 ForkJoinWorkerThread v = ws[k & mask]; 740 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // inline xorshift 741 ForkJoinTask<?>[] q; ForkJoinTask<?> t; int b, a; 742 if (v != null && (b = v.base) != v.sp && 743 (q = v.queue) != null) { 744 int i = (q.length - 1) & b; 745 long u = (i << qShift) + qBase; // raw offset 746 int pid = poolIndex; 747 if ((t = q[i]) != null) { 748 if (!canSteal && // inline p.tryIncrementActiveCount 749 UNSAFE.compareAndSwapInt(p, poolRunStateOffset, 750 a = p.runState, a + 1)) 751 canSteal = active = true; 752 if (canSteal && v.base == b++ && 753 UNSAFE.compareAndSwapObject(q, u, t, null)) { 754 v.base = b; 755 v.stealHint = pid; 756 UNSAFE.putOrderedObject(this, 757 currentStealOffset, t); 758 seed = r; 759 ++stealCount; 760 return t; 761 } 762 } 763 j = -n; 764 k = r; // restart on contention 765 } 766 else if (++j <= 0) 767 k = r; 768 else if (j <= n) 769 k += (n >>> 1) | 1; 770 else 771 break; 772 } 773 } 774 return null; 775 } 776 777 // Run State management 778 779 // status check methods used mainly by ForkJoinPool 780 final boolean isRunning() { return runState == 0; } 781 final boolean isTerminated() { return (runState & TERMINATED) != 0; } 782 final boolean isSuspended() { return (runState & SUSPENDED) != 0; } 783 final boolean isTrimmed() { return (runState & TRIMMED) != 0; } 784 785 final boolean isTerminating() { 786 if ((runState & TERMINATING) != 0) 787 return true; 788 if (pool.isAtLeastTerminating()) { // propagate pool state 789 shutdown(); 790 return true; 791 } 792 return false; 793 } 794 795 /** 796 * Sets state to TERMINATING. Does NOT unpark or interrupt 797 * to wake up if currently blocked. Callers must do so if desired. 798 */ 799 final void shutdown() { 800 for (;;) { 801 int s = runState; 802 if ((s & (TERMINATING|TERMINATED)) != 0) 803 break; 804 if ((s & SUSPENDED) != 0) { // kill and wakeup if suspended 805 if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, 806 (s & ~SUSPENDED) | 807 (TRIMMED|TERMINATING))) 808 break; 809 } 810 else if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, 811 s | TERMINATING)) 812 break; 813 } 814 } 815 816 /** 817 * Sets state to TERMINATED. Called only by onTermination(). 818 */ 819 private void setTerminated() { 820 int s; 821 do {} while (!UNSAFE.compareAndSwapInt(this, runStateOffset, 822 s = runState, 823 s | (TERMINATING|TERMINATED))); 824 } 825 826 /** 827 * If suspended, tries to set status to unsuspended. 828 * Does NOT wake up if blocked. 829 * 830 * @return true if successful 831 */ 832 final boolean tryUnsuspend() { 833 int s; 834 while (((s = runState) & SUSPENDED) != 0) { 835 if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, 836 s & ~SUSPENDED)) 837 return true; 838 } 839 return false; 840 } 841 842 /** 843 * Sets suspended status and blocks as spare until resumed 844 * or shutdown. 845 */ 846 final void suspendAsSpare() { 847 for (;;) { // set suspended unless terminating 848 int s = runState; 849 if ((s & TERMINATING) != 0) { // must kill 850 if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, 851 s | (TRIMMED | TERMINATING))) 852 return; 853 } 854 else if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, 855 s | SUSPENDED)) 856 break; 857 } 858 ForkJoinPool p = pool; 859 p.pushSpare(this); 860 while ((runState & SUSPENDED) != 0) { 861 if (p.tryAccumulateStealCount(this)) { 862 interrupted(); // clear/ignore interrupts 863 if ((runState & SUSPENDED) == 0) 864 break; 865 LockSupport.park(this); 866 } 867 } 868 } 869 870 // Misc support methods for ForkJoinPool 871 872 /** 873 * Returns an estimate of the number of tasks in the queue. Also 874 * used by ForkJoinTask. 875 */ 876 final int getQueueSize() { 877 int n; // external calls must read base first 878 return (n = -base + sp) <= 0 ? 0 : n; 879 } 880 881 /** 882 * Removes and cancels all tasks in queue. Can be called from any 883 * thread. 884 */ 885 final void cancelTasks() { 886 ForkJoinTask<?> cj = currentJoin; // try to cancel ongoing tasks 887 if (cj != null) { 888 currentJoin = null; 889 cj.cancelIgnoringExceptions(); 890 try { 891 this.interrupt(); // awaken wait 892 } catch (SecurityException ignore) { 893 } 894 } 895 ForkJoinTask<?> cs = currentSteal; 896 if (cs != null) { 897 currentSteal = null; 898 cs.cancelIgnoringExceptions(); 899 } 900 while (base != sp) { 901 ForkJoinTask<?> t = deqTask(); 902 if (t != null) 903 t.cancelIgnoringExceptions(); 904 } 905 } 906 907 /** 908 * Drains tasks to given collection c. 909 * 910 * @return the number of tasks drained 911 */ 912 final int drainTasksTo(Collection<? super ForkJoinTask<?>> c) { 913 int n = 0; 914 while (base != sp) { 915 ForkJoinTask<?> t = deqTask(); 916 if (t != null) { 917 c.add(t); 918 ++n; 919 } 920 } 921 return n; 922 } 923 924 // Support methods for ForkJoinTask 925 926 /** 927 * Gets and removes a local task. 928 * 929 * @return a task, if available 930 */ 931 final ForkJoinTask<?> pollLocalTask() { 932 ForkJoinPool p = pool; 933 while (sp != base) { 934 int a; // inline p.tryIncrementActiveCount 935 if (active || 936 (active = UNSAFE.compareAndSwapInt(p, poolRunStateOffset, 937 a = p.runState, a + 1))) 938 return locallyFifo ? locallyDeqTask() : popTask(); 939 } 940 return null; 941 } 942 943 /** 944 * Gets and removes a local or stolen task. 945 * 946 * @return a task, if available 947 */ 948 final ForkJoinTask<?> pollTask() { 949 ForkJoinTask<?> t = pollLocalTask(); 950 if (t == null) { 951 t = scan(); 952 // cannot retain/track/help steal 953 UNSAFE.putOrderedObject(this, currentStealOffset, null); 954 } 955 return t; 956 } 957 958 /** 959 * Possibly runs some tasks and/or blocks, until task is done. 960 * 961 * @param joinMe the task to join 962 */ 963 final void joinTask(ForkJoinTask<?> joinMe) { 964 // currentJoin only written by this thread; only need ordered store 965 ForkJoinTask<?> prevJoin = currentJoin; 966 UNSAFE.putOrderedObject(this, currentJoinOffset, joinMe); 967 if (sp != base) 968 localHelpJoinTask(joinMe); 969 if (joinMe.status >= 0) 970 pool.awaitJoin(joinMe, this); 971 UNSAFE.putOrderedObject(this, currentJoinOffset, prevJoin); 972 } 973 974 /** 975 * Run tasks in local queue until given task is done. 976 * 977 * @param joinMe the task to join 978 */ 979 private void localHelpJoinTask(ForkJoinTask<?> joinMe) { 980 int s; 981 ForkJoinTask<?>[] q; 982 while (joinMe.status >= 0 && (s = sp) != base && (q = queue) != null) { 983 int i = (q.length - 1) & --s; 984 long u = (i << qShift) + qBase; // raw offset 985 ForkJoinTask<?> t = q[i]; 986 if (t == null) // lost to a stealer 987 break; 988 if (UNSAFE.compareAndSwapObject(q, u, t, null)) { 989 /* 990 * This recheck (and similarly in helpJoinTask) 991 * handles cases where joinMe is independently 992 * cancelled or forced even though there is other work 993 * available. Back out of the pop by putting t back 994 * into slot before we commit by writing sp. 995 */ 996 if (joinMe.status < 0) { 997 UNSAFE.putObjectVolatile(q, u, t); 998 break; 999 } 1000 sp = s; 1001 // UNSAFE.putOrderedInt(this, spOffset, s); 1002 t.quietlyExec(); 1003 } 1004 } 1005 } 1006 1007 /** 1008 * Unless terminating, tries to locate and help perform tasks for 1009 * a stealer of the given task, or in turn one of its stealers. 1010 * Traces currentSteal->currentJoin links looking for a thread 1011 * working on a descendant of the given task and with a non-empty 1012 * queue to steal back and execute tasks from. 1013 * 1014 * The implementation is very branchy to cope with potential 1015 * inconsistencies or loops encountering chains that are stale, 1016 * unknown, or of length greater than MAX_HELP_DEPTH links. All 1017 * of these cases are dealt with by just returning back to the 1018 * caller, who is expected to retry if other join mechanisms also 1019 * don't work out. 1020 * 1021 * @param joinMe the task to join 1022 */ 1023 final void helpJoinTask(ForkJoinTask<?> joinMe) { 1024 ForkJoinWorkerThread[] ws; 1025 int n; 1026 if (joinMe.status < 0) // already done 1027 return; 1028 if ((runState & TERMINATING) != 0) { // cancel if shutting down 1029 joinMe.cancelIgnoringExceptions(); 1030 return; 1031 } 1032 if ((ws = pool.workers) == null || (n = ws.length) <= 1) 1033 return; // need at least 2 workers 1034 1035 ForkJoinTask<?> task = joinMe; // base of chain 1036 ForkJoinWorkerThread thread = this; // thread with stolen task 1037 for (int d = 0; d < MAX_HELP_DEPTH; ++d) { // chain length 1038 // Try to find v, the stealer of task, by first using hint 1039 ForkJoinWorkerThread v = ws[thread.stealHint & (n - 1)]; 1040 if (v == null || v.currentSteal != task) { 1041 for (int j = 0; ; ++j) { // search array 1042 if (j < n) { 1043 ForkJoinTask<?> vs; 1044 if ((v = ws[j]) != null && 1045 (vs = v.currentSteal) != null) { 1046 if (joinMe.status < 0 || task.status < 0) 1047 return; // stale or done 1048 if (vs == task) { 1049 thread.stealHint = j; 1050 break; // save hint for next time 1051 } 1052 } 1053 } 1054 else 1055 return; // no stealer 1056 } 1057 } 1058 for (;;) { // Try to help v, using specialized form of deqTask 1059 if (joinMe.status < 0) 1060 return; 1061 int b = v.base; 1062 ForkJoinTask<?>[] q = v.queue; 1063 if (b == v.sp || q == null) 1064 break; 1065 int i = (q.length - 1) & b; 1066 long u = (i << qShift) + qBase; 1067 ForkJoinTask<?> t = q[i]; 1068 int pid = poolIndex; 1069 ForkJoinTask<?> ps = currentSteal; 1070 if (task.status < 0) 1071 return; // stale or done 1072 if (t != null && v.base == b++ && 1073 UNSAFE.compareAndSwapObject(q, u, t, null)) { 1074 if (joinMe.status < 0) { 1075 UNSAFE.putObjectVolatile(q, u, t); 1076 return; // back out on cancel 1077 } 1078 v.base = b; 1079 v.stealHint = pid; 1080 UNSAFE.putOrderedObject(this, currentStealOffset, t); 1081 t.quietlyExec(); 1082 UNSAFE.putOrderedObject(this, currentStealOffset, ps); 1083 } 1084 } 1085 // Try to descend to find v's stealer 1086 ForkJoinTask<?> next = v.currentJoin; 1087 if (task.status < 0 || next == null || next == task || 1088 joinMe.status < 0) 1089 return; 1090 task = next; 1091 thread = v; 1092 } 1093 } 1094 1095 /** 1096 * Implements ForkJoinTask.getSurplusQueuedTaskCount(). 1097 * Returns an estimate of the number of tasks, offset by a 1098 * function of number of idle workers. 1099 * 1100 * This method provides a cheap heuristic guide for task 1101 * partitioning when programmers, frameworks, tools, or languages 1102 * have little or no idea about task granularity. In essence by 1103 * offering this method, we ask users only about tradeoffs in 1104 * overhead vs expected throughput and its variance, rather than 1105 * how finely to partition tasks. 1106 * 1107 * In a steady state strict (tree-structured) computation, each 1108 * thread makes available for stealing enough tasks for other 1109 * threads to remain active. Inductively, if all threads play by 1110 * the same rules, each thread should make available only a 1111 * constant number of tasks. 1112 * 1113 * The minimum useful constant is just 1. But using a value of 1 1114 * would require immediate replenishment upon each steal to 1115 * maintain enough tasks, which is infeasible. Further, 1116 * partitionings/granularities of offered tasks should minimize 1117 * steal rates, which in general means that threads nearer the top 1118 * of computation tree should generate more than those nearer the 1119 * bottom. In perfect steady state, each thread is at 1120 * approximately the same level of computation tree. However, 1121 * producing extra tasks amortizes the uncertainty of progress and 1122 * diffusion assumptions. 1123 * 1124 * So, users will want to use values larger, but not much larger 1125 * than 1 to both smooth over transient shortages and hedge 1126 * against uneven progress; as traded off against the cost of 1127 * extra task overhead. We leave the user to pick a threshold 1128 * value to compare with the results of this call to guide 1129 * decisions, but recommend values such as 3. 1130 * 1131 * When all threads are active, it is on average OK to estimate 1132 * surplus strictly locally. In steady-state, if one thread is 1133 * maintaining say 2 surplus tasks, then so are others. So we can 1134 * just use estimated queue length (although note that (sp - base) 1135 * can be an overestimate because of stealers lagging increments 1136 * of base). However, this strategy alone leads to serious 1137 * mis-estimates in some non-steady-state conditions (ramp-up, 1138 * ramp-down, other stalls). We can detect many of these by 1139 * further considering the number of "idle" threads, that are 1140 * known to have zero queued tasks, so compensate by a factor of 1141 * (#idle/#active) threads. 1142 */ 1143 final int getEstimatedSurplusTaskCount() { 1144 return sp - base - pool.idlePerActive(); 1145 } 1146 1147 /** 1148 * Runs tasks until {@code pool.isQuiescent()}. 1149 */ 1150 final void helpQuiescePool() { 1151 ForkJoinTask<?> ps = currentSteal; // to restore below 1152 for (;;) { 1153 ForkJoinTask<?> t = pollLocalTask(); 1154 if (t != null || (t = scan()) != null) 1155 t.quietlyExec(); 1156 else { 1157 ForkJoinPool p = pool; 1158 int a; // to inline CASes 1159 if (active) { 1160 if (!UNSAFE.compareAndSwapInt 1161 (p, poolRunStateOffset, a = p.runState, a - 1)) 1162 continue; // retry later 1163 active = false; // inactivate 1164 UNSAFE.putOrderedObject(this, currentStealOffset, ps); 1165 } 1166 if (p.isQuiescent()) { 1167 active = true; // re-activate 1168 do {} while (!UNSAFE.compareAndSwapInt 1169 (p, poolRunStateOffset, a = p.runState, a+1)); 1170 return; 1171 } 1172 } 1173 } 1174 } 1175 1176 // Unsafe mechanics 1177 1178 private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe(); 1179 private static final long spOffset = 1180 objectFieldOffset("sp", ForkJoinWorkerThread.class); 1181 private static final long runStateOffset = 1182 objectFieldOffset("runState", ForkJoinWorkerThread.class); 1183 private static final long currentJoinOffset = 1184 objectFieldOffset("currentJoin", ForkJoinWorkerThread.class); 1185 private static final long currentStealOffset = 1186 objectFieldOffset("currentSteal", ForkJoinWorkerThread.class); 1187 private static final long qBase = 1188 UNSAFE.arrayBaseOffset(ForkJoinTask[].class); 1189 private static final long poolRunStateOffset = // to inline CAS 1190 objectFieldOffset("runState", ForkJoinPool.class); 1191 1192 private static final int qShift; 1193 1194 static { 1195 int s = UNSAFE.arrayIndexScale(ForkJoinTask[].class); 1196 if ((s & (s-1)) != 0) 1197 throw new Error("data type scale not a power of two"); 1198 qShift = 31 - Integer.numberOfLeadingZeros(s); 1199 MAXIMUM_QUEUE_CAPACITY = 1 << (31 - qShift); 1200 } 1201 1202 private static long objectFieldOffset(String field, Class<?> klazz) { 1203 try { 1204 return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field)); 1205 } catch (NoSuchFieldException e) { 1206 // Convert Exception to corresponding Error 1207 NoSuchFieldError error = new NoSuchFieldError(field); 1208 error.initCause(e); 1209 throw error; 1210 } 1211 } 1212 }