1 /*
   2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   3  *
   4  * This code is free software; you can redistribute it and/or modify it
   5  * under the terms of the GNU General Public License version 2 only, as
   6  * published by the Free Software Foundation.  Oracle designates this
   7  * particular file as subject to the "Classpath" exception as provided
   8  * by Oracle in the LICENSE file that accompanied this code.
   9  *
  10  * This code is distributed in the hope that it will be useful, but WITHOUT
  11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  13  * version 2 for more details (a copy is included in the LICENSE file that
  14  * accompanied this code).
  15  *
  16  * You should have received a copy of the GNU General Public License version
  17  * 2 along with this work; if not, write to the Free Software Foundation,
  18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  19  *
  20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  21  * or visit www.oracle.com if you need additional information or have any
  22  * questions.
  23  */
  24 
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/licenses/publicdomain
  34  */
  35 
  36 package java.util.concurrent;
  37 
  38 import java.util.Random;
  39 import java.util.Collection;
  40 import java.util.concurrent.locks.LockSupport;
  41 import java.util.concurrent.RejectedExecutionException;
  42 
  43 /**
  44  * A thread managed by a {@link ForkJoinPool}, which executes
  45  * {@link ForkJoinTask}s.
  46  * This class is subclassable solely for the sake of adding
  47  * functionality -- there are no overridable methods dealing with
  48  * scheduling or execution.  However, you can override initialization
  49  * and termination methods surrounding the main task processing loop.
  50  * If you do create such a subclass, you will also need to supply a
  51  * custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to use it
  52  * in a {@code ForkJoinPool}.
  53  *
  54  * @since 1.7
  55  * @author Doug Lea
  56  */
  57 public class ForkJoinWorkerThread extends Thread {
  58     /*
  59      * Overview:
  60      *
  61      * ForkJoinWorkerThreads are managed by ForkJoinPools and perform
  62      * ForkJoinTasks. This class includes bookkeeping in support of
  63      * worker activation, suspension, and lifecycle control described
  64      * in more detail in the internal documentation of class
  65      * ForkJoinPool. And as described further below, this class also
  66      * includes special-cased support for some ForkJoinTask
  67      * methods. But the main mechanics involve work-stealing:
  68      *
  69      * Work-stealing queues are special forms of Deques that support
  70      * only three of the four possible end-operations -- push, pop,
  71      * and deq (aka steal), under the further constraints that push
  72      * and pop are called only from the owning thread, while deq may
  73      * be called from other threads.  (If you are unfamiliar with
  74      * them, you probably want to read Herlihy and Shavit's book "The
  75      * Art of Multiprocessor programming", chapter 16 describing these
  76      * in more detail before proceeding.)  The main work-stealing
  77      * queue design is roughly similar to those in the papers "Dynamic
  78      * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
  79      * (http://research.sun.com/scalable/pubs/index.html) and
  80      * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
  81      * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
  82      * The main differences ultimately stem from gc requirements that
  83      * we null out taken slots as soon as we can, to maintain as small
  84      * a footprint as possible even in programs generating huge
  85      * numbers of tasks. To accomplish this, we shift the CAS
  86      * arbitrating pop vs deq (steal) from being on the indices
  87      * ("base" and "sp") to the slots themselves (mainly via method
  88      * "casSlotNull()"). So, both a successful pop and deq mainly
  89      * entail a CAS of a slot from non-null to null.  Because we rely
  90      * on CASes of references, we do not need tag bits on base or sp.
  91      * They are simple ints as used in any circular array-based queue
  92      * (see for example ArrayDeque).  Updates to the indices must
  93      * still be ordered in a way that guarantees that sp == base means
  94      * the queue is empty, but otherwise may err on the side of
  95      * possibly making the queue appear nonempty when a push, pop, or
  96      * deq have not fully committed. Note that this means that the deq
  97      * operation, considered individually, is not wait-free. One thief
  98      * cannot successfully continue until another in-progress one (or,
  99      * if previously empty, a push) completes.  However, in the
 100      * aggregate, we ensure at least probabilistic non-blockingness.
 101      * If an attempted steal fails, a thief always chooses a different
 102      * random victim target to try next. So, in order for one thief to
 103      * progress, it suffices for any in-progress deq or new push on
 104      * any empty queue to complete. One reason this works well here is
 105      * that apparently-nonempty often means soon-to-be-stealable,
 106      * which gives threads a chance to set activation status if
 107      * necessary before stealing.
 108      *
 109      * This approach also enables support for "async mode" where local
 110      * task processing is in FIFO, not LIFO order; simply by using a
 111      * version of deq rather than pop when locallyFifo is true (as set
 112      * by the ForkJoinPool).  This allows use in message-passing
 113      * frameworks in which tasks are never joined.
 114      *
 115      * When a worker would otherwise be blocked waiting to join a
 116      * task, it first tries a form of linear helping: Each worker
 117      * records (in field currentSteal) the most recent task it stole
 118      * from some other worker. Plus, it records (in field currentJoin)
 119      * the task it is currently actively joining. Method joinTask uses
 120      * these markers to try to find a worker to help (i.e., steal back
 121      * a task from and execute it) that could hasten completion of the
 122      * actively joined task. In essence, the joiner executes a task
 123      * that would be on its own local deque had the to-be-joined task
 124      * not been stolen. This may be seen as a conservative variant of
 125      * the approach in Wagner & Calder "Leapfrogging: a portable
 126      * technique for implementing efficient futures" SIGPLAN Notices,
 127      * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs
 128      * in that: (1) We only maintain dependency links across workers
 129      * upon steals, rather than use per-task bookkeeping.  This may
 130      * require a linear scan of workers array to locate stealers, but
 131      * usually doesn't because stealers leave hints (that may become
 132      * stale/wrong) of where to locate them. This isolates cost to
 133      * when it is needed, rather than adding to per-task overhead.
 134      * (2) It is "shallow", ignoring nesting and potentially cyclic
 135      * mutual steals.  (3) It is intentionally racy: field currentJoin
 136      * is updated only while actively joining, which means that we
 137      * miss links in the chain during long-lived tasks, GC stalls etc
 138      * (which is OK since blocking in such cases is usually a good
 139      * idea).  (4) We bound the number of attempts to find work (see
 140      * MAX_HELP_DEPTH) and fall back to suspending the worker and if
 141      * necessary replacing it with a spare (see
 142      * ForkJoinPool.awaitJoin).
 143      *
 144      * Efficient implementation of these algorithms currently relies
 145      * on an uncomfortable amount of "Unsafe" mechanics. To maintain
 146      * correct orderings, reads and writes of variable base require
 147      * volatile ordering.  Variable sp does not require volatile
 148      * writes but still needs store-ordering, which we accomplish by
 149      * pre-incrementing sp before filling the slot with an ordered
 150      * store.  (Pre-incrementing also enables backouts used in
 151      * joinTask.)  Because they are protected by volatile base reads,
 152      * reads of the queue array and its slots by other threads do not
 153      * need volatile load semantics, but writes (in push) require
 154      * store order and CASes (in pop and deq) require (volatile) CAS
 155      * semantics.  (Michael, Saraswat, and Vechev's algorithm has
 156      * similar properties, but without support for nulling slots.)
 157      * Since these combinations aren't supported using ordinary
 158      * volatiles, the only way to accomplish these efficiently is to
 159      * use direct Unsafe calls. (Using external AtomicIntegers and
 160      * AtomicReferenceArrays for the indices and array is
 161      * significantly slower because of memory locality and indirection
 162      * effects.)
 163      *
 164      * Further, performance on most platforms is very sensitive to
 165      * placement and sizing of the (resizable) queue array.  Even
 166      * though these queues don't usually become all that big, the
 167      * initial size must be large enough to counteract cache
 168      * contention effects across multiple queues (especially in the
 169      * presence of GC cardmarking). Also, to improve thread-locality,
 170      * queues are initialized after starting.  All together, these
 171      * low-level implementation choices produce as much as a factor of
 172      * 4 performance improvement compared to naive implementations,
 173      * and enable the processing of billions of tasks per second,
 174      * sometimes at the expense of ugliness.
 175      */
 176 
 177     /**
 178      * Generator for initial random seeds for random victim
 179      * selection. This is used only to create initial seeds. Random
 180      * steals use a cheaper xorshift generator per steal attempt. We
 181      * expect only rare contention on seedGenerator, so just use a
 182      * plain Random.
 183      */
 184     private static final Random seedGenerator = new Random();
 185 
 186     /**
 187      * The maximum stolen->joining link depth allowed in helpJoinTask.
 188      * Depths for legitimate chains are unbounded, but we use a fixed
 189      * constant to avoid (otherwise unchecked) cycles and bound
 190      * staleness of traversal parameters at the expense of sometimes
 191      * blocking when we could be helping.
 192      */
 193     private static final int MAX_HELP_DEPTH = 8;
 194 
 195     /**
 196      * Capacity of work-stealing queue array upon initialization.
 197      * Must be a power of two. Initial size must be at least 4, but is
 198      * padded to minimize cache effects.
 199      */
 200     private static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
 201 
 202     /**
 203      * Maximum work-stealing queue array size.  Must be less than or
 204      * equal to 1 << (31 - width of array entry) to ensure lack of
 205      * index wraparound. The value is set in the static block
 206      * at the end of this file after obtaining width.
 207      */
 208     private static final int MAXIMUM_QUEUE_CAPACITY;
 209 
 210     /**
 211      * The pool this thread works in. Accessed directly by ForkJoinTask.
 212      */
 213     final ForkJoinPool pool;
 214 
 215     /**
 216      * The work-stealing queue array. Size must be a power of two.
 217      * Initialized in onStart, to improve memory locality.
 218      */
 219     private ForkJoinTask<?>[] queue;
 220 
 221     /**
 222      * Index (mod queue.length) of least valid queue slot, which is
 223      * always the next position to steal from if nonempty.
 224      */
 225     private volatile int base;
 226 
 227     /**
 228      * Index (mod queue.length) of next queue slot to push to or pop
 229      * from. It is written only by owner thread, and accessed by other
 230      * threads only after reading (volatile) base.  Both sp and base
 231      * are allowed to wrap around on overflow, but (sp - base) still
 232      * estimates size.
 233      */
 234     private int sp;
 235 
 236     /**
 237      * The index of most recent stealer, used as a hint to avoid
 238      * traversal in method helpJoinTask. This is only a hint because a
 239      * worker might have had multiple steals and this only holds one
 240      * of them (usually the most current). Declared non-volatile,
 241      * relying on other prevailing sync to keep reasonably current.
 242      */
 243     private int stealHint;
 244 
 245     /**
 246      * Run state of this worker. In addition to the usual run levels,
 247      * tracks if this worker is suspended as a spare, and if it was
 248      * killed (trimmed) while suspended. However, "active" status is
 249      * maintained separately and modified only in conjunction with
 250      * CASes of the pool's runState (which are currently sadly
 251      * manually inlined for performance.)  Accessed directly by pool
 252      * to simplify checks for normal (zero) status.
 253      */
 254     volatile int runState;
 255 
 256     private static final int TERMINATING = 0x01;
 257     private static final int TERMINATED  = 0x02;
 258     private static final int SUSPENDED   = 0x04; // inactive spare
 259     private static final int TRIMMED     = 0x08; // killed while suspended
 260 
 261     /**
 262      * Number of steals. Directly accessed (and reset) by
 263      * pool.tryAccumulateStealCount when idle.
 264      */
 265     int stealCount;
 266 
 267     /**
 268      * Seed for random number generator for choosing steal victims.
 269      * Uses Marsaglia xorshift. Must be initialized as nonzero.
 270      */
 271     private int seed;
 272 
 273     /**
 274      * Activity status. When true, this worker is considered active.
 275      * Accessed directly by pool.  Must be false upon construction.
 276      */
 277     boolean active;
 278 
 279     /**
 280      * True if use local fifo, not default lifo, for local polling.
 281      * Shadows value from ForkJoinPool.
 282      */
 283     private final boolean locallyFifo;
 284 
 285     /**
 286      * Index of this worker in pool array. Set once by pool before
 287      * running, and accessed directly by pool to locate this worker in
 288      * its workers array.
 289      */
 290     int poolIndex;
 291 
 292     /**
 293      * The last pool event waited for. Accessed only by pool in
 294      * callback methods invoked within this thread.
 295      */
 296     int lastEventCount;
 297 
 298     /**
 299      * Encoded index and event count of next event waiter. Accessed
 300      * only by ForkJoinPool for managing event waiters.
 301      */
 302     volatile long nextWaiter;
 303 
 304     /**
 305      * Number of times this thread suspended as spare. Accessed only
 306      * by pool.
 307      */
 308     int spareCount;
 309 
 310     /**
 311      * Encoded index and count of next spare waiter. Accessed only
 312      * by ForkJoinPool for managing spares.
 313      */
 314     volatile int nextSpare;
 315 
 316     /**
 317      * The task currently being joined, set only when actively trying
 318      * to help other stealers in helpJoinTask. Written only by this
 319      * thread, but read by others.
 320      */
 321     private volatile ForkJoinTask<?> currentJoin;
 322 
 323     /**
 324      * The task most recently stolen from another worker (or
 325      * submission queue).  Written only by this thread, but read by
 326      * others.
 327      */
 328     private volatile ForkJoinTask<?> currentSteal;
 329 
 330     /**
 331      * Creates a ForkJoinWorkerThread operating in the given pool.
 332      *
 333      * @param pool the pool this thread works in
 334      * @throws NullPointerException if pool is null
 335      */
 336     protected ForkJoinWorkerThread(ForkJoinPool pool) {
 337         this.pool = pool;
 338         this.locallyFifo = pool.locallyFifo;
 339         setDaemon(true);
 340         // To avoid exposing construction details to subclasses,
 341         // remaining initialization is in start() and onStart()
 342     }
 343 
 344     /**
 345      * Performs additional initialization and starts this thread.
 346      */
 347     final void start(int poolIndex, UncaughtExceptionHandler ueh) {
 348         this.poolIndex = poolIndex;
 349         if (ueh != null)
 350             setUncaughtExceptionHandler(ueh);
 351         start();
 352     }
 353 
 354     // Public/protected methods
 355 
 356     /**
 357      * Returns the pool hosting this thread.
 358      *
 359      * @return the pool
 360      */
 361     public ForkJoinPool getPool() {
 362         return pool;
 363     }
 364 
 365     /**
 366      * Returns the index number of this thread in its pool.  The
 367      * returned value ranges from zero to the maximum number of
 368      * threads (minus one) that have ever been created in the pool.
 369      * This method may be useful for applications that track status or
 370      * collect results per-worker rather than per-task.
 371      *
 372      * @return the index number
 373      */
 374     public int getPoolIndex() {
 375         return poolIndex;
 376     }
 377 
 378     /**
 379      * Initializes internal state after construction but before
 380      * processing any tasks. If you override this method, you must
 381      * invoke {@code super.onStart()} at the beginning of the method.
 382      * Initialization requires care: Most fields must have legal
 383      * default values, to ensure that attempted accesses from other
 384      * threads work correctly even before this thread starts
 385      * processing tasks.
 386      */
 387     protected void onStart() {
 388         int rs = seedGenerator.nextInt();
 389         seed = (rs == 0) ? 1 : rs; // seed must be nonzero
 390 
 391         // Allocate name string and arrays in this thread
 392         String pid = Integer.toString(pool.getPoolNumber());
 393         String wid = Integer.toString(poolIndex);
 394         setName("ForkJoinPool-" + pid + "-worker-" + wid);
 395 
 396         queue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
 397     }
 398 
 399     /**
 400      * Performs cleanup associated with termination of this worker
 401      * thread.  If you override this method, you must invoke
 402      * {@code super.onTermination} at the end of the overridden method.
 403      *
 404      * @param exception the exception causing this thread to abort due
 405      * to an unrecoverable error, or {@code null} if completed normally
 406      */
 407     protected void onTermination(Throwable exception) {
 408         try {
 409             ForkJoinPool p = pool;
 410             if (active) {
 411                 int a; // inline p.tryDecrementActiveCount
 412                 active = false;
 413                 do {} while (!UNSAFE.compareAndSwapInt
 414                              (p, poolRunStateOffset, a = p.runState, a - 1));
 415             }
 416             cancelTasks();
 417             setTerminated();
 418             p.workerTerminated(this);
 419         } catch (Throwable ex) {        // Shouldn't ever happen
 420             if (exception == null)      // but if so, at least rethrown
 421                 exception = ex;
 422         } finally {
 423             if (exception != null)
 424                 UNSAFE.throwException(exception);
 425         }
 426     }
 427 
 428     /**
 429      * This method is required to be public, but should never be
 430      * called explicitly. It performs the main run loop to execute
 431      * {@link ForkJoinTask}s.
 432      */
 433     public void run() {
 434         Throwable exception = null;
 435         try {
 436             onStart();
 437             mainLoop();
 438         } catch (Throwable ex) {
 439             exception = ex;
 440         } finally {
 441             onTermination(exception);
 442         }
 443     }
 444 
 445     // helpers for run()
 446 
 447     /**
 448      * Finds and executes tasks, and checks status while running.
 449      */
 450     private void mainLoop() {
 451         boolean ran = false; // true if ran a task on last step
 452         ForkJoinPool p = pool;
 453         for (;;) {
 454             p.preStep(this, ran);
 455             if (runState != 0)
 456                 break;
 457             ran = tryExecSteal() || tryExecSubmission();
 458         }
 459     }
 460 
 461     /**
 462      * Tries to steal a task and execute it.
 463      *
 464      * @return true if ran a task
 465      */
 466     private boolean tryExecSteal() {
 467         ForkJoinTask<?> t;
 468         if ((t = scan()) != null) {
 469             t.quietlyExec();
 470             UNSAFE.putOrderedObject(this, currentStealOffset, null);
 471             if (sp != base)
 472                 execLocalTasks();
 473             return true;
 474         }
 475         return false;
 476     }
 477 
 478     /**
 479      * If a submission exists, try to activate and run it.
 480      *
 481      * @return true if ran a task
 482      */
 483     private boolean tryExecSubmission() {
 484         ForkJoinPool p = pool;
 485         // This loop is needed in case attempt to activate fails, in
 486         // which case we only retry if there still appears to be a
 487         // submission.
 488         while (p.hasQueuedSubmissions()) {
 489             ForkJoinTask<?> t; int a;
 490             if (active || // inline p.tryIncrementActiveCount
 491                 (active = UNSAFE.compareAndSwapInt(p, poolRunStateOffset,
 492                                                    a = p.runState, a + 1))) {
 493                 if ((t = p.pollSubmission()) != null) {
 494                     UNSAFE.putOrderedObject(this, currentStealOffset, t);
 495                     t.quietlyExec();
 496                     UNSAFE.putOrderedObject(this, currentStealOffset, null);
 497                     if (sp != base)
 498                         execLocalTasks();
 499                     return true;
 500                 }
 501             }
 502         }
 503         return false;
 504     }
 505 
 506     /**
 507      * Runs local tasks until queue is empty or shut down.  Call only
 508      * while active.
 509      */
 510     private void execLocalTasks() {
 511         while (runState == 0) {
 512             ForkJoinTask<?> t = locallyFifo ? locallyDeqTask() : popTask();
 513             if (t != null)
 514                 t.quietlyExec();
 515             else if (sp == base)
 516                 break;
 517         }
 518     }
 519 
 520     /*
 521      * Intrinsics-based atomic writes for queue slots. These are
 522      * basically the same as methods in AtomicReferenceArray, but
 523      * specialized for (1) ForkJoinTask elements (2) requirement that
 524      * nullness and bounds checks have already been performed by
 525      * callers and (3) effective offsets are known not to overflow
 526      * from int to long (because of MAXIMUM_QUEUE_CAPACITY). We don't
 527      * need corresponding version for reads: plain array reads are OK
 528      * because they are protected by other volatile reads and are
 529      * confirmed by CASes.
 530      *
 531      * Most uses don't actually call these methods, but instead contain
 532      * inlined forms that enable more predictable optimization.  We
 533      * don't define the version of write used in pushTask at all, but
 534      * instead inline there a store-fenced array slot write.
 535      */
 536 
 537     /**
 538      * CASes slot i of array q from t to null. Caller must ensure q is
 539      * non-null and index is in range.
 540      */
 541     private static final boolean casSlotNull(ForkJoinTask<?>[] q, int i,
 542                                              ForkJoinTask<?> t) {
 543         return UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, t, null);
 544     }
 545 
 546     /**
 547      * Performs a volatile write of the given task at given slot of
 548      * array q.  Caller must ensure q is non-null and index is in
 549      * range. This method is used only during resets and backouts.
 550      */
 551     private static final void writeSlot(ForkJoinTask<?>[] q, int i,
 552                                         ForkJoinTask<?> t) {
 553         UNSAFE.putObjectVolatile(q, (i << qShift) + qBase, t);
 554     }
 555 
 556     // queue methods
 557 
 558     /**
 559      * Pushes a task. Call only from this thread.
 560      *
 561      * @param t the task. Caller must ensure non-null.
 562      */
 563     final void pushTask(ForkJoinTask<?> t) {
 564         ForkJoinTask<?>[] q = queue;
 565         int mask = q.length - 1; // implicit assert q != null
 566         int s = sp++;            // ok to increment sp before slot write
 567         UNSAFE.putOrderedObject(q, ((s & mask) << qShift) + qBase, t);
 568         if ((s -= base) == 0)
 569             pool.signalWork();   // was empty
 570         else if (s == mask)
 571             growQueue();         // is full
 572     }
 573 
 574     /**
 575      * Tries to take a task from the base of the queue, failing if
 576      * empty or contended. Note: Specializations of this code appear
 577      * in locallyDeqTask and elsewhere.
 578      *
 579      * @return a task, or null if none or contended
 580      */
 581     final ForkJoinTask<?> deqTask() {
 582         ForkJoinTask<?> t;
 583         ForkJoinTask<?>[] q;
 584         int b, i;
 585         if (sp != (b = base) &&
 586             (q = queue) != null && // must read q after b
 587             (t = q[i = (q.length - 1) & b]) != null && base == b &&
 588             UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, t, null)) {
 589             base = b + 1;
 590             return t;
 591         }
 592         return null;
 593     }
 594 
 595     /**
 596      * Tries to take a task from the base of own queue. Assumes active
 597      * status.  Called only by this thread.
 598      *
 599      * @return a task, or null if none
 600      */
 601     final ForkJoinTask<?> locallyDeqTask() {
 602         ForkJoinTask<?>[] q = queue;
 603         if (q != null) {
 604             ForkJoinTask<?> t;
 605             int b, i;
 606             while (sp != (b = base)) {
 607                 if ((t = q[i = (q.length - 1) & b]) != null && base == b &&
 608                     UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase,
 609                                                 t, null)) {
 610                     base = b + 1;
 611                     return t;
 612                 }
 613             }
 614         }
 615         return null;
 616     }
 617 
 618     /**
 619      * Returns a popped task, or null if empty. Assumes active status.
 620      * Called only by this thread.
 621      */
 622     private ForkJoinTask<?> popTask() {
 623         ForkJoinTask<?>[] q = queue;
 624         if (q != null) {
 625             int s;
 626             while ((s = sp) != base) {
 627                 int i = (q.length - 1) & --s;
 628                 long u = (i << qShift) + qBase; // raw offset
 629                 ForkJoinTask<?> t = q[i];
 630                 if (t == null)   // lost to stealer
 631                     break;
 632                 if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
 633                     /*
 634                      * Note: here and in related methods, as a
 635                      * performance (not correctness) issue, we'd like
 636                      * to encourage compiler not to arbitrarily
 637                      * postpone setting sp after successful CAS.
 638                      * Currently there is no intrinsic for arranging
 639                      * this, but using Unsafe putOrderedInt may be a
 640                      * preferable strategy on some compilers even
 641                      * though its main effect is a pre-, not post-
 642                      * fence. To simplify possible changes, the option
 643                      * is left in comments next to the associated
 644                      * assignments.
 645                      */
 646                     sp = s; // putOrderedInt may encourage more timely write
 647                     // UNSAFE.putOrderedInt(this, spOffset, s);
 648                     return t;
 649                 }
 650             }
 651         }
 652         return null;
 653     }
 654 
 655     /**
 656      * Specialized version of popTask to pop only if topmost element
 657      * is the given task. Called only by this thread while active.
 658      *
 659      * @param t the task. Caller must ensure non-null.
 660      */
 661     final boolean unpushTask(ForkJoinTask<?> t) {
 662         int s;
 663         ForkJoinTask<?>[] q = queue;
 664         if ((s = sp) != base && q != null &&
 665             UNSAFE.compareAndSwapObject
 666             (q, (((q.length - 1) & --s) << qShift) + qBase, t, null)) {
 667             sp = s; // putOrderedInt may encourage more timely write
 668             // UNSAFE.putOrderedInt(this, spOffset, s);
 669             return true;
 670         }
 671         return false;
 672     }
 673 
 674     /**
 675      * Returns next task, or null if empty or contended.
 676      */
 677     final ForkJoinTask<?> peekTask() {
 678         ForkJoinTask<?>[] q = queue;
 679         if (q == null)
 680             return null;
 681         int mask = q.length - 1;
 682         int i = locallyFifo ? base : (sp - 1);
 683         return q[i & mask];
 684     }
 685 
 686     /**
 687      * Doubles queue array size. Transfers elements by emulating
 688      * steals (deqs) from old array and placing, oldest first, into
 689      * new array.
 690      */
 691     private void growQueue() {
 692         ForkJoinTask<?>[] oldQ = queue;
 693         int oldSize = oldQ.length;
 694         int newSize = oldSize << 1;
 695         if (newSize > MAXIMUM_QUEUE_CAPACITY)
 696             throw new RejectedExecutionException("Queue capacity exceeded");
 697         ForkJoinTask<?>[] newQ = queue = new ForkJoinTask<?>[newSize];
 698 
 699         int b = base;
 700         int bf = b + oldSize;
 701         int oldMask = oldSize - 1;
 702         int newMask = newSize - 1;
 703         do {
 704             int oldIndex = b & oldMask;
 705             ForkJoinTask<?> t = oldQ[oldIndex];
 706             if (t != null && !casSlotNull(oldQ, oldIndex, t))
 707                 t = null;
 708             writeSlot(newQ, b & newMask, t);
 709         } while (++b != bf);
 710         pool.signalWork();
 711     }
 712 
 713     /**
 714      * Computes next value for random victim probe in scan().  Scans
 715      * don't require a very high quality generator, but also not a
 716      * crummy one.  Marsaglia xor-shift is cheap and works well enough.
 717      * Note: This is manually inlined in scan().
 718      */
 719     private static final int xorShift(int r) {
 720         r ^= r << 13;
 721         r ^= r >>> 17;
 722         return r ^ (r << 5);
 723     }
 724 
 725     /**
 726      * Tries to steal a task from another worker. Starts at a random
 727      * index of workers array, and probes workers until finding one
 728      * with non-empty queue or finding that all are empty.  It
 729      * randomly selects the first n probes. If these are empty, it
 730      * resorts to a circular sweep, which is necessary to accurately
 731      * set active status. (The circular sweep uses steps of
 732      * approximately half the array size plus 1, to avoid bias
 733      * stemming from leftmost packing of the array in ForkJoinPool.)
 734      *
 735      * This method must be both fast and quiet -- usually avoiding
 736      * memory accesses that could disrupt cache sharing etc other than
 737      * those needed to check for and take tasks (or to activate if not
 738      * already active). This accounts for, among other things,
 739      * updating random seed in place without storing it until exit.
 740      *
 741      * @return a task, or null if none found
 742      */
 743     private ForkJoinTask<?> scan() {
 744         ForkJoinPool p = pool;
 745         ForkJoinWorkerThread[] ws;        // worker array
 746         int n;                            // upper bound of #workers
 747         if ((ws = p.workers) != null && (n = ws.length) > 1) {
 748             boolean canSteal = active;    // shadow active status
 749             int r = seed;                 // extract seed once
 750             int mask = n - 1;
 751             int j = -n;                   // loop counter
 752             int k = r;                    // worker index, random if j < 0
 753             for (;;) {
 754                 ForkJoinWorkerThread v = ws[k & mask];
 755                 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // inline xorshift
 756                 ForkJoinTask<?>[] q; ForkJoinTask<?> t; int b, a;
 757                 if (v != null && (b = v.base) != v.sp &&
 758                     (q = v.queue) != null) {
 759                     int i = (q.length - 1) & b;
 760                     long u = (i << qShift) + qBase; // raw offset
 761                     int pid = poolIndex;
 762                     if ((t = q[i]) != null) {
 763                         if (!canSteal &&  // inline p.tryIncrementActiveCount
 764                             UNSAFE.compareAndSwapInt(p, poolRunStateOffset,
 765                                                      a = p.runState, a + 1))
 766                             canSteal = active = true;
 767                         if (canSteal && v.base == b++ &&
 768                             UNSAFE.compareAndSwapObject(q, u, t, null)) {
 769                             v.base = b;
 770                             v.stealHint = pid;
 771                             UNSAFE.putOrderedObject(this,
 772                                                     currentStealOffset, t);
 773                             seed = r;
 774                             ++stealCount;
 775                             return t;
 776                         }
 777                     }
 778                     j = -n;
 779                     k = r;                // restart on contention
 780                 }
 781                 else if (++j <= 0)
 782                     k = r;
 783                 else if (j <= n)
 784                     k += (n >>> 1) | 1;
 785                 else
 786                     break;
 787             }
 788         }
 789         return null;
 790     }
 791 
 792     // Run State management
 793 
 794     // status check methods used mainly by ForkJoinPool
 795     final boolean isRunning()    { return runState == 0; }
 796     final boolean isTerminated() { return (runState & TERMINATED) != 0; }
 797     final boolean isSuspended()  { return (runState & SUSPENDED) != 0; }
 798     final boolean isTrimmed()    { return (runState & TRIMMED) != 0; }
 799 
 800     final boolean isTerminating() {
 801         if ((runState & TERMINATING) != 0)
 802             return true;
 803         if (pool.isAtLeastTerminating()) { // propagate pool state
 804             shutdown();
 805             return true;
 806         }
 807         return false;
 808     }
 809 
 810     /**
 811      * Sets state to TERMINATING. Does NOT unpark or interrupt
 812      * to wake up if currently blocked. Callers must do so if desired.
 813      */
 814     final void shutdown() {
 815         for (;;) {
 816             int s = runState;
 817             if ((s & (TERMINATING|TERMINATED)) != 0)
 818                 break;
 819             if ((s & SUSPENDED) != 0) { // kill and wakeup if suspended
 820                 if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
 821                                              (s & ~SUSPENDED) |
 822                                              (TRIMMED|TERMINATING)))
 823                     break;
 824             }
 825             else if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
 826                                               s | TERMINATING))
 827                 break;
 828         }
 829     }
 830 
 831     /**
 832      * Sets state to TERMINATED. Called only by onTermination().
 833      */
 834     private void setTerminated() {
 835         int s;
 836         do {} while (!UNSAFE.compareAndSwapInt(this, runStateOffset,
 837                                                s = runState,
 838                                                s | (TERMINATING|TERMINATED)));
 839     }
 840 
 841     /**
 842      * If suspended, tries to set status to unsuspended.
 843      * Does NOT wake up if blocked.
 844      *
 845      * @return true if successful
 846      */
 847     final boolean tryUnsuspend() {
 848         int s;
 849         while (((s = runState) & SUSPENDED) != 0) {
 850             if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
 851                                          s & ~SUSPENDED))
 852                 return true;
 853         }
 854         return false;
 855     }
 856 
 857     /**
 858      * Sets suspended status and blocks as spare until resumed
 859      * or shutdown.
 860      */
 861     final void suspendAsSpare() {
 862         for (;;) {                  // set suspended unless terminating
 863             int s = runState;
 864             if ((s & TERMINATING) != 0) { // must kill
 865                 if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
 866                                              s | (TRIMMED | TERMINATING)))
 867                     return;
 868             }
 869             else if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
 870                                               s | SUSPENDED))
 871                 break;
 872         }
 873         ForkJoinPool p = pool;
 874         p.pushSpare(this);
 875         while ((runState & SUSPENDED) != 0) {
 876             if (p.tryAccumulateStealCount(this)) {
 877                 interrupted();          // clear/ignore interrupts
 878                 if ((runState & SUSPENDED) == 0)
 879                     break;
 880                 LockSupport.park(this);
 881             }
 882         }
 883     }
 884 
 885     // Misc support methods for ForkJoinPool
 886 
 887     /**
 888      * Returns an estimate of the number of tasks in the queue.  Also
 889      * used by ForkJoinTask.
 890      */
 891     final int getQueueSize() {
 892         int n; // external calls must read base first
 893         return (n = -base + sp) <= 0 ? 0 : n;
 894     }
 895 
 896     /**
 897      * Removes and cancels all tasks in queue.  Can be called from any
 898      * thread.
 899      */
 900     final void cancelTasks() {
 901         ForkJoinTask<?> cj = currentJoin; // try to cancel ongoing tasks
 902         if (cj != null && cj.status >= 0) {
 903             cj.cancelIgnoringExceptions();
 904             try {
 905                 this.interrupt(); // awaken wait
 906             } catch (SecurityException ignore) {
 907             }
 908         }
 909         ForkJoinTask<?> cs = currentSteal;
 910         if (cs != null && cs.status >= 0)
 911             cs.cancelIgnoringExceptions();
 912         while (base != sp) {
 913             ForkJoinTask<?> t = deqTask();
 914             if (t != null)
 915                 t.cancelIgnoringExceptions();
 916         }
 917     }
 918 
 919     /**
 920      * Drains tasks to given collection c.
 921      *
 922      * @return the number of tasks drained
 923      */
 924     final int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
 925         int n = 0;
 926         while (base != sp) {
 927             ForkJoinTask<?> t = deqTask();
 928             if (t != null) {
 929                 c.add(t);
 930                 ++n;
 931             }
 932         }
 933         return n;
 934     }
 935 
 936     // Support methods for ForkJoinTask
 937 
 938     /**
 939      * Gets and removes a local task.
 940      *
 941      * @return a task, if available
 942      */
 943     final ForkJoinTask<?> pollLocalTask() {
 944         ForkJoinPool p = pool;
 945         while (sp != base) {
 946             int a; // inline p.tryIncrementActiveCount
 947             if (active ||
 948                 (active = UNSAFE.compareAndSwapInt(p, poolRunStateOffset,
 949                                                    a = p.runState, a + 1)))
 950                 return locallyFifo ? locallyDeqTask() : popTask();
 951         }
 952         return null;
 953     }
 954 
 955     /**
 956      * Gets and removes a local or stolen task.
 957      *
 958      * @return a task, if available
 959      */
 960     final ForkJoinTask<?> pollTask() {
 961         ForkJoinTask<?> t = pollLocalTask();
 962         if (t == null) {
 963             t = scan();
 964             // cannot retain/track/help steal
 965             UNSAFE.putOrderedObject(this, currentStealOffset, null);
 966         }
 967         return t;
 968     }
 969 
 970     /**
 971      * Possibly runs some tasks and/or blocks, until task is done.
 972      *
 973      * @param joinMe the task to join
 974      * @param timed true if use timed wait
 975      * @param nanos wait time if timed
 976      */
 977     final void joinTask(ForkJoinTask<?> joinMe, boolean timed, long nanos) {
 978         // currentJoin only written by this thread; only need ordered store
 979         ForkJoinTask<?> prevJoin = currentJoin;
 980         UNSAFE.putOrderedObject(this, currentJoinOffset, joinMe);
 981         pool.awaitJoin(joinMe, this, timed, nanos);
 982         UNSAFE.putOrderedObject(this, currentJoinOffset, prevJoin);
 983     }
 984 
 985     /**
 986      * Tries to locate and help perform tasks for a stealer of the
 987      * given task, or in turn one of its stealers.  Traces
 988      * currentSteal->currentJoin links looking for a thread working on
 989      * a descendant of the given task and with a non-empty queue to
 990      * steal back and execute tasks from.
 991      *
 992      * The implementation is very branchy to cope with potential
 993      * inconsistencies or loops encountering chains that are stale,
 994      * unknown, or of length greater than MAX_HELP_DEPTH links.  All
 995      * of these cases are dealt with by just returning back to the
 996      * caller, who is expected to retry if other join mechanisms also
 997      * don't work out.
 998      *
 999      * @param joinMe the task to join
1000      * @param running if false, then must update pool count upon
1001      *  running a task
1002      * @return value of running on exit
1003      */
1004     final boolean helpJoinTask(ForkJoinTask<?> joinMe, boolean running) {
1005         /*
1006          * Initial checks to (1) abort if terminating; (2) clean out
1007          * old cancelled tasks from local queue; (3) if joinMe is next
1008          * task, run it; (4) omit scan if local queue nonempty (since
1009          * it may contain non-descendents of joinMe).
1010          */
1011         ForkJoinPool p = pool;
1012         for (;;) {
1013             ForkJoinTask<?>[] q;
1014             int s;
1015             if (joinMe.status < 0)
1016                 return running;
1017             else if ((runState & TERMINATING) != 0) {
1018                 joinMe.cancelIgnoringExceptions();
1019                 return running;
1020             }
1021             else if ((s = sp) == base || (q = queue) == null)
1022                 break;                            // queue empty
1023             else {
1024                 int i = (q.length - 1) & --s;
1025                 long u = (i << qShift) + qBase;   // raw offset
1026                 ForkJoinTask<?> t = q[i];
1027                 if (t == null)
1028                     break;                        // lost to a stealer
1029                 else if (t != joinMe && t.status >= 0)
1030                     return running;               // cannot safely help
1031                 else if ((running ||
1032                           (running = p.tryIncrementRunningCount())) &&
1033                          UNSAFE.compareAndSwapObject(q, u, t, null)) {
1034                     sp = s; // putOrderedInt may encourage more timely write
1035                     // UNSAFE.putOrderedInt(this, spOffset, s);
1036                     t.quietlyExec();
1037                 }
1038             }
1039         }
1040 
1041         int n;                                    // worker array size
1042         ForkJoinWorkerThread[] ws = p.workers;
1043         if (ws != null && (n = ws.length) > 1) {  // need at least 2 workers
1044             ForkJoinTask<?> task = joinMe;        // base of chain
1045             ForkJoinWorkerThread thread = this;   // thread with stolen task
1046 
1047             outer:for (int d = 0; d < MAX_HELP_DEPTH; ++d) { // chain length
1048                 // Try to find v, the stealer of task, by first using hint
1049                 ForkJoinWorkerThread v = ws[thread.stealHint & (n - 1)];
1050                 if (v == null || v.currentSteal != task) {
1051                     for (int j = 0; ; ++j) {      // search array
1052                         if (j < n) {
1053                             ForkJoinTask<?> vs;
1054                             if ((v = ws[j]) != null &&
1055                                 (vs = v.currentSteal) != null) {
1056                                 if (joinMe.status < 0)
1057                                     break outer;
1058                                 if (vs == task) {
1059                                     if (task.status < 0)
1060                                         break outer; // stale
1061                                     thread.stealHint = j;
1062                                     break;        // save hint for next time
1063                                 }
1064                             }
1065                         }
1066                         else
1067                             break outer;          // no stealer
1068                     }
1069                 }
1070 
1071                 // Try to help v, using specialized form of deqTask
1072                 for (;;) {
1073                     if (joinMe.status < 0)
1074                         break outer;
1075                     int b = v.base;
1076                     ForkJoinTask<?>[] q = v.queue;
1077                     if (b == v.sp || q == null)
1078                         break;                    // empty
1079                     int i = (q.length - 1) & b;
1080                     long u = (i << qShift) + qBase;
1081                     ForkJoinTask<?> t = q[i];
1082                     if (task.status < 0)
1083                         break outer;              // stale
1084                     if (t != null &&
1085                         (running ||
1086                          (running = p.tryIncrementRunningCount())) &&
1087                         v.base == b++ &&
1088                         UNSAFE.compareAndSwapObject(q, u, t, null)) {
1089                         if (t != joinMe && joinMe.status < 0) {
1090                             UNSAFE.putObjectVolatile(q, u, t);
1091                             break outer;          // joinMe cancelled; back out
1092                         }
1093                         v.base = b;
1094                         if (t.status >= 0) {
1095                             ForkJoinTask<?> ps = currentSteal;
1096                             int pid = poolIndex;
1097                             v.stealHint = pid;
1098                             UNSAFE.putOrderedObject(this,
1099                                                     currentStealOffset, t);
1100                             t.quietlyExec();
1101                             UNSAFE.putOrderedObject(this,
1102                                                     currentStealOffset, ps);
1103                         }
1104                     }
1105                     else if ((runState & TERMINATING) != 0) {
1106                         joinMe.cancelIgnoringExceptions();
1107                         break outer;
1108                     }
1109                 }
1110 
1111                 // Try to descend to find v's stealer
1112                 ForkJoinTask<?> next = v.currentJoin;
1113                 if (task.status < 0 || next == null || next == task ||
1114                     joinMe.status < 0)
1115                     break;                 // done, stale, dead-end, or cyclic
1116                 task = next;
1117                 thread = v;
1118             }
1119         }
1120         return running;
1121     }
1122 
1123     /**
1124      * Implements ForkJoinTask.getSurplusQueuedTaskCount().
1125      * Returns an estimate of the number of tasks, offset by a
1126      * function of number of idle workers.
1127      *
1128      * This method provides a cheap heuristic guide for task
1129      * partitioning when programmers, frameworks, tools, or languages
1130      * have little or no idea about task granularity.  In essence by
1131      * offering this method, we ask users only about tradeoffs in
1132      * overhead vs expected throughput and its variance, rather than
1133      * how finely to partition tasks.
1134      *
1135      * In a steady state strict (tree-structured) computation, each
1136      * thread makes available for stealing enough tasks for other
1137      * threads to remain active. Inductively, if all threads play by
1138      * the same rules, each thread should make available only a
1139      * constant number of tasks.
1140      *
1141      * The minimum useful constant is just 1. But using a value of 1
1142      * would require immediate replenishment upon each steal to
1143      * maintain enough tasks, which is infeasible.  Further,
1144      * partitionings/granularities of offered tasks should minimize
1145      * steal rates, which in general means that threads nearer the top
1146      * of computation tree should generate more than those nearer the
1147      * bottom. In perfect steady state, each thread is at
1148      * approximately the same level of computation tree. However,
1149      * producing extra tasks amortizes the uncertainty of progress and
1150      * diffusion assumptions.
1151      *
1152      * So, users will want to use values larger, but not much larger
1153      * than 1 to both smooth over transient shortages and hedge
1154      * against uneven progress; as traded off against the cost of
1155      * extra task overhead. We leave the user to pick a threshold
1156      * value to compare with the results of this call to guide
1157      * decisions, but recommend values such as 3.
1158      *
1159      * When all threads are active, it is on average OK to estimate
1160      * surplus strictly locally. In steady-state, if one thread is
1161      * maintaining say 2 surplus tasks, then so are others. So we can
1162      * just use estimated queue length (although note that (sp - base)
1163      * can be an overestimate because of stealers lagging increments
1164      * of base).  However, this strategy alone leads to serious
1165      * mis-estimates in some non-steady-state conditions (ramp-up,
1166      * ramp-down, other stalls). We can detect many of these by
1167      * further considering the number of "idle" threads, that are
1168      * known to have zero queued tasks, so compensate by a factor of
1169      * (#idle/#active) threads.
1170      */
1171     final int getEstimatedSurplusTaskCount() {
1172         return sp - base - pool.idlePerActive();
1173     }
1174 
1175     /**
1176      * Runs tasks until {@code pool.isQuiescent()}.
1177      */
1178     final void helpQuiescePool() {
1179         ForkJoinTask<?> ps = currentSteal; // to restore below
1180         for (;;) {
1181             ForkJoinTask<?> t = pollLocalTask();
1182             if (t != null || (t = scan()) != null)
1183                 t.quietlyExec();
1184             else {
1185                 ForkJoinPool p = pool;
1186                 int a; // to inline CASes
1187                 if (active) {
1188                     if (!UNSAFE.compareAndSwapInt
1189                         (p, poolRunStateOffset, a = p.runState, a - 1))
1190                         continue;   // retry later
1191                     active = false; // inactivate
1192                     UNSAFE.putOrderedObject(this, currentStealOffset, ps);
1193                 }
1194                 if (p.isQuiescent()) {
1195                     active = true; // re-activate
1196                     do {} while (!UNSAFE.compareAndSwapInt
1197                                  (p, poolRunStateOffset, a = p.runState, a+1));
1198                     return;
1199                 }
1200             }
1201         }
1202     }
1203 
1204     // Unsafe mechanics
1205 
1206     private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
1207     private static final long spOffset =
1208         objectFieldOffset("sp", ForkJoinWorkerThread.class);
1209     private static final long runStateOffset =
1210         objectFieldOffset("runState", ForkJoinWorkerThread.class);
1211     private static final long currentJoinOffset =
1212         objectFieldOffset("currentJoin", ForkJoinWorkerThread.class);
1213     private static final long currentStealOffset =
1214         objectFieldOffset("currentSteal", ForkJoinWorkerThread.class);
1215     private static final long qBase =
1216         UNSAFE.arrayBaseOffset(ForkJoinTask[].class);
1217     private static final long poolRunStateOffset = // to inline CAS
1218         objectFieldOffset("runState", ForkJoinPool.class);
1219 
1220     private static final int qShift;
1221 
1222     static {
1223         int s = UNSAFE.arrayIndexScale(ForkJoinTask[].class);
1224         if ((s & (s-1)) != 0)
1225             throw new Error("data type scale not a power of two");
1226         qShift = 31 - Integer.numberOfLeadingZeros(s);
1227         MAXIMUM_QUEUE_CAPACITY = 1 << (31 - qShift);
1228     }
1229 
1230     private static long objectFieldOffset(String field, Class<?> klazz) {
1231         try {
1232             return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
1233         } catch (NoSuchFieldException e) {
1234             // Convert Exception to corresponding Error
1235             NoSuchFieldError error = new NoSuchFieldError(field);
1236             error.initCause(e);
1237             throw error;
1238         }
1239     }
1240 }