1 /*
   2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   3  *
   4  * This code is free software; you can redistribute it and/or modify it
   5  * under the terms of the GNU General Public License version 2 only, as
   6  * published by the Free Software Foundation.  Oracle designates this
   7  * particular file as subject to the "Classpath" exception as provided
   8  * by Oracle in the LICENSE file that accompanied this code.
   9  *
  10  * This code is distributed in the hope that it will be useful, but WITHOUT
  11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  13  * version 2 for more details (a copy is included in the LICENSE file that
  14  * accompanied this code).
  15  *
  16  * You should have received a copy of the GNU General Public License version
  17  * 2 along with this work; if not, write to the Free Software Foundation,
  18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  19  *
  20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  21  * or visit www.oracle.com if you need additional information or have any
  22  * questions.
  23  */
  24 
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/publicdomain/zero/1.0/
  34  */
  35 
  36 package java.util.concurrent;
  37 
  38 import java.util.Collection;
  39 import java.util.concurrent.RejectedExecutionException;
  40 
  41 /**
  42  * A thread managed by a {@link ForkJoinPool}, which executes
  43  * {@link ForkJoinTask}s.
  44  * This class is subclassable solely for the sake of adding
  45  * functionality -- there are no overridable methods dealing with
  46  * scheduling or execution.  However, you can override initialization
  47  * and termination methods surrounding the main task processing loop.
  48  * If you do create such a subclass, you will also need to supply a
  49  * custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to use it
  50  * in a {@code ForkJoinPool}.
  51  *
  52  * @since 1.7
  53  * @author Doug Lea
  54  */
  55 public class ForkJoinWorkerThread extends Thread {
  56     /*
  57      * Overview:
  58      *
  59      * ForkJoinWorkerThreads are managed by ForkJoinPools and perform
  60      * ForkJoinTasks. This class includes bookkeeping in support of
  61      * worker activation, suspension, and lifecycle control described
  62      * in more detail in the internal documentation of class
  63      * ForkJoinPool. And as described further below, this class also
  64      * includes special-cased support for some ForkJoinTask
  65      * methods. But the main mechanics involve work-stealing:
  66      *
  67      * Work-stealing queues are special forms of Deques that support
  68      * only three of the four possible end-operations -- push, pop,
  69      * and deq (aka steal), under the further constraints that push
  70      * and pop are called only from the owning thread, while deq may
  71      * be called from other threads.  (If you are unfamiliar with
  72      * them, you probably want to read Herlihy and Shavit's book "The
  73      * Art of Multiprocessor programming", chapter 16 describing these
  74      * in more detail before proceeding.)  The main work-stealing
  75      * queue design is roughly similar to those in the papers "Dynamic
  76      * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
  77      * (http://research.sun.com/scalable/pubs/index.html) and
  78      * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
  79      * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
  80      * The main differences ultimately stem from gc requirements that
  81      * we null out taken slots as soon as we can, to maintain as small
  82      * a footprint as possible even in programs generating huge
  83      * numbers of tasks. To accomplish this, we shift the CAS
  84      * arbitrating pop vs deq (steal) from being on the indices
  85      * ("queueBase" and "queueTop") to the slots themselves (mainly
  86      * via method "casSlotNull()"). So, both a successful pop and deq
  87      * mainly entail a CAS of a slot from non-null to null.  Because
  88      * we rely on CASes of references, we do not need tag bits on
  89      * queueBase or queueTop.  They are simple ints as used in any
  90      * circular array-based queue (see for example ArrayDeque).
  91      * Updates to the indices must still be ordered in a way that
  92      * guarantees that queueTop == queueBase means the queue is empty,
  93      * but otherwise may err on the side of possibly making the queue
  94      * appear nonempty when a push, pop, or deq have not fully
  95      * committed. Note that this means that the deq operation,
  96      * considered individually, is not wait-free. One thief cannot
  97      * successfully continue until another in-progress one (or, if
  98      * previously empty, a push) completes.  However, in the
  99      * aggregate, we ensure at least probabilistic non-blockingness.
 100      * If an attempted steal fails, a thief always chooses a different
 101      * random victim target to try next. So, in order for one thief to
 102      * progress, it suffices for any in-progress deq or new push on
 103      * any empty queue to complete.
 104      *
 105      * This approach also enables support for "async mode" where local
 106      * task processing is in FIFO, not LIFO order; simply by using a
 107      * version of deq rather than pop when locallyFifo is true (as set
 108      * by the ForkJoinPool).  This allows use in message-passing
 109      * frameworks in which tasks are never joined.  However neither
 110      * mode considers affinities, loads, cache localities, etc, so
 111      * rarely provide the best possible performance on a given
 112      * machine, but portably provide good throughput by averaging over
 113      * these factors.  (Further, even if we did try to use such
 114      * information, we do not usually have a basis for exploiting
 115      * it. For example, some sets of tasks profit from cache
 116      * affinities, but others are harmed by cache pollution effects.)
 117      *
 118      * When a worker would otherwise be blocked waiting to join a
 119      * task, it first tries a form of linear helping: Each worker
 120      * records (in field currentSteal) the most recent task it stole
 121      * from some other worker. Plus, it records (in field currentJoin)
 122      * the task it is currently actively joining. Method joinTask uses
 123      * these markers to try to find a worker to help (i.e., steal back
 124      * a task from and execute it) that could hasten completion of the
 125      * actively joined task. In essence, the joiner executes a task
 126      * that would be on its own local deque had the to-be-joined task
 127      * not been stolen. This may be seen as a conservative variant of
 128      * the approach in Wagner & Calder "Leapfrogging: a portable
 129      * technique for implementing efficient futures" SIGPLAN Notices,
 130      * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs
 131      * in that: (1) We only maintain dependency links across workers
 132      * upon steals, rather than use per-task bookkeeping.  This may
 133      * require a linear scan of workers array to locate stealers, but
 134      * usually doesn't because stealers leave hints (that may become
 135      * stale/wrong) of where to locate them. This isolates cost to
 136      * when it is needed, rather than adding to per-task overhead.
 137      * (2) It is "shallow", ignoring nesting and potentially cyclic
 138      * mutual steals.  (3) It is intentionally racy: field currentJoin
 139      * is updated only while actively joining, which means that we
 140      * miss links in the chain during long-lived tasks, GC stalls etc
 141      * (which is OK since blocking in such cases is usually a good
 142      * idea).  (4) We bound the number of attempts to find work (see
 143      * MAX_HELP) and fall back to suspending the worker and if
 144      * necessary replacing it with another.
 145      *
 146      * Efficient implementation of these algorithms currently relies
 147      * on an uncomfortable amount of "Unsafe" mechanics. To maintain
 148      * correct orderings, reads and writes of variable queueBase
 149      * require volatile ordering.  Variable queueTop need not be
 150      * volatile because non-local reads always follow those of
 151      * queueBase.  Similarly, because they are protected by volatile
 152      * queueBase reads, reads of the queue array and its slots by
 153      * other threads do not need volatile load semantics, but writes
 154      * (in push) require store order and CASes (in pop and deq)
 155      * require (volatile) CAS semantics.  (Michael, Saraswat, and
 156      * Vechev's algorithm has similar properties, but without support
 157      * for nulling slots.)  Since these combinations aren't supported
 158      * using ordinary volatiles, the only way to accomplish these
 159      * efficiently is to use direct Unsafe calls. (Using external
 160      * AtomicIntegers and AtomicReferenceArrays for the indices and
 161      * array is significantly slower because of memory locality and
 162      * indirection effects.)
 163      *
 164      * Further, performance on most platforms is very sensitive to
 165      * placement and sizing of the (resizable) queue array.  Even
 166      * though these queues don't usually become all that big, the
 167      * initial size must be large enough to counteract cache
 168      * contention effects across multiple queues (especially in the
 169      * presence of GC cardmarking). Also, to improve thread-locality,
 170      * queues are initialized after starting.
 171      */
 172 
 173     /**
 174      * Mask for pool indices encoded as shorts
 175      */
 176     private static final int  SMASK  = 0xffff;
 177 
 178     /**
 179      * Capacity of work-stealing queue array upon initialization.
 180      * Must be a power of two. Initial size must be at least 4, but is
 181      * padded to minimize cache effects.
 182      */
 183     private static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
 184 
 185     /**
 186      * Maximum size for queue array. Must be a power of two
 187      * less than or equal to 1 << (31 - width of array entry) to
 188      * ensure lack of index wraparound, but is capped at a lower
 189      * value to help users trap runaway computations.
 190      */
 191     private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 24; // 16M
 192 
 193     /**
 194      * The work-stealing queue array. Size must be a power of two.
 195      * Initialized when started (as opposed to when constructed), to
 196      * improve memory locality.
 197      */
 198     ForkJoinTask<?>[] queue;
 199 
 200     /**
 201      * The pool this thread works in. Accessed directly by ForkJoinTask.
 202      */
 203     final ForkJoinPool pool;
 204 
 205     /**
 206      * Index (mod queue.length) of next queue slot to push to or pop
 207      * from. It is written only by owner thread, and accessed by other
 208      * threads only after reading (volatile) queueBase.  Both queueTop
 209      * and queueBase are allowed to wrap around on overflow, but
 210      * (queueTop - queueBase) still estimates size.
 211      */
 212     int queueTop;
 213 
 214     /**
 215      * Index (mod queue.length) of least valid queue slot, which is
 216      * always the next position to steal from if nonempty.
 217      */
 218     volatile int queueBase;
 219 
 220     /**
 221      * The index of most recent stealer, used as a hint to avoid
 222      * traversal in method helpJoinTask. This is only a hint because a
 223      * worker might have had multiple steals and this only holds one
 224      * of them (usually the most current). Declared non-volatile,
 225      * relying on other prevailing sync to keep reasonably current.
 226      */
 227     int stealHint;
 228 
 229     /**
 230      * Index of this worker in pool array. Set once by pool before
 231      * running, and accessed directly by pool to locate this worker in
 232      * its workers array.
 233      */
 234     final int poolIndex;
 235 
 236     /**
 237      * Encoded record for pool task waits. Usages are always
 238      * surrounded by volatile reads/writes
 239      */
 240     int nextWait;
 241 
 242     /**
 243      * Complement of poolIndex, offset by count of entries of task
 244      * waits. Accessed by ForkJoinPool to manage event waiters.
 245      */
 246     volatile int eventCount;
 247 
 248     /**
 249      * Seed for random number generator for choosing steal victims.
 250      * Uses Marsaglia xorshift. Must be initialized as nonzero.
 251      */
 252     int seed;
 253 
 254     /**
 255      * Number of steals. Directly accessed (and reset) by pool when
 256      * idle.
 257      */
 258     int stealCount;
 259 
 260     /**
 261      * True if this worker should or did terminate
 262      */
 263     volatile boolean terminate;
 264 
 265     /**
 266      * Set to true before LockSupport.park; false on return
 267      */
 268     volatile boolean parked;
 269 
 270     /**
 271      * True if use local fifo, not default lifo, for local polling.
 272      * Shadows value from ForkJoinPool.
 273      */
 274     final boolean locallyFifo;
 275 
 276     /**
 277      * The task most recently stolen from another worker (or
 278      * submission queue).  All uses are surrounded by enough volatile
 279      * reads/writes to maintain as non-volatile.
 280      */
 281     ForkJoinTask<?> currentSteal;
 282 
 283     /**
 284      * The task currently being joined, set only when actively trying
 285      * to help other stealers in helpJoinTask. All uses are surrounded
 286      * by enough volatile reads/writes to maintain as non-volatile.
 287      */
 288     ForkJoinTask<?> currentJoin;
 289 
 290     /**
 291      * Creates a ForkJoinWorkerThread operating in the given pool.
 292      *
 293      * @param pool the pool this thread works in
 294      * @throws NullPointerException if pool is null
 295      */
 296     protected ForkJoinWorkerThread(ForkJoinPool pool) {
 297         super(pool.nextWorkerName());
 298         this.pool = pool;
 299         int k = pool.registerWorker(this);
 300         poolIndex = k;
 301         eventCount = ~k & SMASK; // clear wait count
 302         locallyFifo = pool.locallyFifo;
 303         Thread.UncaughtExceptionHandler ueh = pool.ueh;
 304         if (ueh != null)
 305             setUncaughtExceptionHandler(ueh);
 306         setDaemon(true);
 307     }
 308 
 309     // Public methods
 310 
 311     /**
 312      * Returns the pool hosting this thread.
 313      *
 314      * @return the pool
 315      */
 316     public ForkJoinPool getPool() {
 317         return pool;
 318     }
 319 
 320     /**
 321      * Returns the index number of this thread in its pool.  The
 322      * returned value ranges from zero to the maximum number of
 323      * threads (minus one) that have ever been created in the pool.
 324      * This method may be useful for applications that track status or
 325      * collect results per-worker rather than per-task.
 326      *
 327      * @return the index number
 328      */
 329     public int getPoolIndex() {
 330         return poolIndex;
 331     }
 332 
 333     // Randomization
 334 
 335     /**
 336      * Computes next value for random victim probes and backoffs.
 337      * Scans don't require a very high quality generator, but also not
 338      * a crummy one.  Marsaglia xor-shift is cheap and works well
 339      * enough.  Note: This is manually inlined in FJP.scan() to avoid
 340      * writes inside busy loops.
 341      */
 342     private int nextSeed() {
 343         int r = seed;
 344         r ^= r << 13;
 345         r ^= r >>> 17;
 346         r ^= r << 5;
 347         return seed = r;
 348     }
 349 
 350     // Run State management
 351 
 352     /**
 353      * Initializes internal state after construction but before
 354      * processing any tasks. If you override this method, you must
 355      * invoke {@code super.onStart()} at the beginning of the method.
 356      * Initialization requires care: Most fields must have legal
 357      * default values, to ensure that attempted accesses from other
 358      * threads work correctly even before this thread starts
 359      * processing tasks.
 360      */
 361     protected void onStart() {
 362         queue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
 363         int r = ForkJoinPool.workerSeedGenerator.nextInt();
 364         seed = (r == 0) ? 1 : r; //  must be nonzero
 365     }
 366 
 367     /**
 368      * Performs cleanup associated with termination of this worker
 369      * thread.  If you override this method, you must invoke
 370      * {@code super.onTermination} at the end of the overridden method.
 371      *
 372      * @param exception the exception causing this thread to abort due
 373      * to an unrecoverable error, or {@code null} if completed normally
 374      */
 375     protected void onTermination(Throwable exception) {
 376         try {
 377             terminate = true;
 378             cancelTasks();
 379             pool.deregisterWorker(this, exception);
 380         } catch (Throwable ex) {        // Shouldn't ever happen
 381             if (exception == null)      // but if so, at least rethrown
 382                 exception = ex;
 383         } finally {
 384             if (exception != null)
 385                 UNSAFE.throwException(exception);
 386         }
 387     }
 388 
 389     /**
 390      * This method is required to be public, but should never be
 391      * called explicitly. It performs the main run loop to execute
 392      * {@link ForkJoinTask}s.
 393      */
 394     public void run() {
 395         Throwable exception = null;
 396         try {
 397             onStart();
 398             pool.work(this);
 399         } catch (Throwable ex) {
 400             exception = ex;
 401         } finally {
 402             onTermination(exception);
 403         }
 404     }
 405 
 406     /*
 407      * Intrinsics-based atomic writes for queue slots. These are
 408      * basically the same as methods in AtomicReferenceArray, but
 409      * specialized for (1) ForkJoinTask elements (2) requirement that
 410      * nullness and bounds checks have already been performed by
 411      * callers and (3) effective offsets are known not to overflow
 412      * from int to long (because of MAXIMUM_QUEUE_CAPACITY). We don't
 413      * need corresponding version for reads: plain array reads are OK
 414      * because they are protected by other volatile reads and are
 415      * confirmed by CASes.
 416      *
 417      * Most uses don't actually call these methods, but instead
 418      * contain inlined forms that enable more predictable
 419      * optimization.  We don't define the version of write used in
 420      * pushTask at all, but instead inline there a store-fenced array
 421      * slot write.
 422      *
 423      * Also in most methods, as a performance (not correctness) issue,
 424      * we'd like to encourage compilers not to arbitrarily postpone
 425      * setting queueTop after writing slot.  Currently there is no
 426      * intrinsic for arranging this, but using Unsafe putOrderedInt
 427      * may be a preferable strategy on some compilers even though its
 428      * main effect is a pre-, not post- fence. To simplify possible
 429      * changes, the option is left in comments next to the associated
 430      * assignments.
 431      */
 432 
 433     /**
 434      * CASes slot i of array q from t to null. Caller must ensure q is
 435      * non-null and index is in range.
 436      */
 437     private static final boolean casSlotNull(ForkJoinTask<?>[] q, int i,
 438                                              ForkJoinTask<?> t) {
 439         return UNSAFE.compareAndSwapObject(q, (i << ASHIFT) + ABASE, t, null);
 440     }
 441 
 442     /**
 443      * Performs a volatile write of the given task at given slot of
 444      * array q.  Caller must ensure q is non-null and index is in
 445      * range. This method is used only during resets and backouts.
 446      */
 447     private static final void writeSlot(ForkJoinTask<?>[] q, int i,
 448                                         ForkJoinTask<?> t) {
 449         UNSAFE.putObjectVolatile(q, (i << ASHIFT) + ABASE, t);
 450     }
 451 
 452     // queue methods
 453 
 454     /**
 455      * Pushes a task. Call only from this thread.
 456      *
 457      * @param t the task. Caller must ensure non-null.
 458      */
 459     final void pushTask(ForkJoinTask<?> t) {
 460         ForkJoinTask<?>[] q; int s, m;
 461         if ((q = queue) != null) {    // ignore if queue removed
 462             long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE;
 463             UNSAFE.putOrderedObject(q, u, t);
 464             queueTop = s + 1;         // or use putOrderedInt
 465             if ((s -= queueBase) <= 2)
 466                 pool.signalWork();
 467             else if (s == m)
 468                 growQueue();
 469         }
 470     }
 471 
 472     /**
 473      * Creates or doubles queue array.  Transfers elements by
 474      * emulating steals (deqs) from old array and placing, oldest
 475      * first, into new array.
 476      */
 477     private void growQueue() {
 478         ForkJoinTask<?>[] oldQ = queue;
 479         int size = oldQ != null ? oldQ.length << 1 : INITIAL_QUEUE_CAPACITY;
 480         if (size > MAXIMUM_QUEUE_CAPACITY)
 481             throw new RejectedExecutionException("Queue capacity exceeded");
 482         if (size < INITIAL_QUEUE_CAPACITY)
 483             size = INITIAL_QUEUE_CAPACITY;
 484         ForkJoinTask<?>[] q = queue = new ForkJoinTask<?>[size];
 485         int mask = size - 1;
 486         int top = queueTop;
 487         int oldMask;
 488         if (oldQ != null && (oldMask = oldQ.length - 1) >= 0) {
 489             for (int b = queueBase; b != top; ++b) {
 490                 long u = ((b & oldMask) << ASHIFT) + ABASE;
 491                 Object x = UNSAFE.getObjectVolatile(oldQ, u);
 492                 if (x != null && UNSAFE.compareAndSwapObject(oldQ, u, x, null))
 493                     UNSAFE.putObjectVolatile
 494                         (q, ((b & mask) << ASHIFT) + ABASE, x);
 495             }
 496         }
 497     }
 498 
 499     /**
 500      * Tries to take a task from the base of the queue, failing if
 501      * empty or contended. Note: Specializations of this code appear
 502      * in locallyDeqTask and elsewhere.
 503      *
 504      * @return a task, or null if none or contended
 505      */
 506     final ForkJoinTask<?> deqTask() {
 507         ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
 508         if (queueTop != (b = queueBase) &&
 509             (q = queue) != null && // must read q after b
 510             (i = (q.length - 1) & b) >= 0 &&
 511             (t = q[i]) != null && queueBase == b &&
 512             UNSAFE.compareAndSwapObject(q, (i << ASHIFT) + ABASE, t, null)) {
 513             queueBase = b + 1;
 514             return t;
 515         }
 516         return null;
 517     }
 518 
 519     /**
 520      * Tries to take a task from the base of own queue.  Called only
 521      * by this thread.
 522      *
 523      * @return a task, or null if none
 524      */
 525     final ForkJoinTask<?> locallyDeqTask() {
 526         ForkJoinTask<?> t; int m, b, i;
 527         ForkJoinTask<?>[] q = queue;
 528         if (q != null && (m = q.length - 1) >= 0) {
 529             while (queueTop != (b = queueBase)) {
 530                 if ((t = q[i = m & b]) != null &&
 531                     queueBase == b &&
 532                     UNSAFE.compareAndSwapObject(q, (i << ASHIFT) + ABASE,
 533                                                 t, null)) {
 534                     queueBase = b + 1;
 535                     return t;
 536                 }
 537             }
 538         }
 539         return null;
 540     }
 541 
 542     /**
 543      * Returns a popped task, or null if empty.
 544      * Called only by this thread.
 545      */
 546     private ForkJoinTask<?> popTask() {
 547         int m;
 548         ForkJoinTask<?>[] q = queue;
 549         if (q != null && (m = q.length - 1) >= 0) {
 550             for (int s; (s = queueTop) != queueBase;) {
 551                 int i = m & --s;
 552                 long u = (i << ASHIFT) + ABASE; // raw offset
 553                 ForkJoinTask<?> t = q[i];
 554                 if (t == null)   // lost to stealer
 555                     break;
 556                 if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
 557                     queueTop = s; // or putOrderedInt
 558                     return t;
 559                 }
 560             }
 561         }
 562         return null;
 563     }
 564 
 565     /**
 566      * Specialized version of popTask to pop only if topmost element
 567      * is the given task. Called only by this thread.
 568      *
 569      * @param t the task. Caller must ensure non-null.
 570      */
 571     final boolean unpushTask(ForkJoinTask<?> t) {
 572         ForkJoinTask<?>[] q;
 573         int s;
 574         if ((q = queue) != null && (s = queueTop) != queueBase &&
 575             UNSAFE.compareAndSwapObject
 576             (q, (((q.length - 1) & --s) << ASHIFT) + ABASE, t, null)) {
 577             queueTop = s; // or putOrderedInt
 578             return true;
 579         }
 580         return false;
 581     }
 582 
 583     /**
 584      * Returns next task, or null if empty or contended.
 585      */
 586     final ForkJoinTask<?> peekTask() {
 587         int m;
 588         ForkJoinTask<?>[] q = queue;
 589         if (q == null || (m = q.length - 1) < 0)
 590             return null;
 591         int i = locallyFifo ? queueBase : (queueTop - 1);
 592         return q[i & m];
 593     }
 594 
 595     // Support methods for ForkJoinPool
 596 
 597     /**
 598      * Runs the given task, plus any local tasks until queue is empty
 599      */
 600     final void execTask(ForkJoinTask<?> t) {
 601         currentSteal = t;
 602         for (;;) {
 603             if (t != null)
 604                 t.doExec();
 605             if (queueTop == queueBase)
 606                 break;
 607             t = locallyFifo ? locallyDeqTask() : popTask();
 608         }
 609         ++stealCount;
 610         currentSteal = null;
 611     }
 612 
 613     /**
 614      * Removes and cancels all tasks in queue.  Can be called from any
 615      * thread.
 616      */
 617     final void cancelTasks() {
 618         ForkJoinTask<?> cj = currentJoin; // try to cancel ongoing tasks
 619         if (cj != null && cj.status >= 0)
 620             cj.cancelIgnoringExceptions();
 621         ForkJoinTask<?> cs = currentSteal;
 622         if (cs != null && cs.status >= 0)
 623             cs.cancelIgnoringExceptions();
 624         while (queueBase != queueTop) {
 625             ForkJoinTask<?> t = deqTask();
 626             if (t != null)
 627                 t.cancelIgnoringExceptions();
 628         }
 629     }
 630 
 631     /**
 632      * Drains tasks to given collection c.
 633      *
 634      * @return the number of tasks drained
 635      */
 636     final int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
 637         int n = 0;
 638         while (queueBase != queueTop) {
 639             ForkJoinTask<?> t = deqTask();
 640             if (t != null) {
 641                 c.add(t);
 642                 ++n;
 643             }
 644         }
 645         return n;
 646     }
 647 
 648     // Support methods for ForkJoinTask
 649 
 650     /**
 651      * Returns an estimate of the number of tasks in the queue.
 652      */
 653     final int getQueueSize() {
 654         return queueTop - queueBase;
 655     }
 656 
 657     /**
 658      * Gets and removes a local task.
 659      *
 660      * @return a task, if available
 661      */
 662     final ForkJoinTask<?> pollLocalTask() {
 663         return locallyFifo ? locallyDeqTask() : popTask();
 664     }
 665 
 666     /**
 667      * Gets and removes a local or stolen task.
 668      *
 669      * @return a task, if available
 670      */
 671     final ForkJoinTask<?> pollTask() {
 672         ForkJoinWorkerThread[] ws;
 673         ForkJoinTask<?> t = pollLocalTask();
 674         if (t != null || (ws = pool.workers) == null)
 675             return t;
 676         int n = ws.length; // cheap version of FJP.scan
 677         int steps = n << 1;
 678         int r = nextSeed();
 679         int i = 0;
 680         while (i < steps) {
 681             ForkJoinWorkerThread w = ws[(i++ + r) & (n - 1)];
 682             if (w != null && w.queueBase != w.queueTop && w.queue != null) {
 683                 if ((t = w.deqTask()) != null)
 684                     return t;
 685                 i = 0;
 686             }
 687         }
 688         return null;
 689     }
 690 
 691     /**
 692      * The maximum stolen->joining link depth allowed in helpJoinTask,
 693      * as well as the maximum number of retries (allowing on average
 694      * one staleness retry per level) per attempt to instead try
 695      * compensation.  Depths for legitimate chains are unbounded, but
 696      * we use a fixed constant to avoid (otherwise unchecked) cycles
 697      * and bound staleness of traversal parameters at the expense of
 698      * sometimes blocking when we could be helping.
 699      */
 700     private static final int MAX_HELP = 16;
 701 
 702     /**
 703      * Possibly runs some tasks and/or blocks, until joinMe is done.
 704      *
 705      * @param joinMe the task to join
 706      * @return completion status on exit
 707      */
 708     final int joinTask(ForkJoinTask<?> joinMe) {
 709         ForkJoinTask<?> prevJoin = currentJoin;
 710         currentJoin = joinMe;
 711         for (int s, retries = MAX_HELP;;) {
 712             if ((s = joinMe.status) < 0) {
 713                 currentJoin = prevJoin;
 714                 return s;
 715             }
 716             if (retries > 0) {
 717                 if (queueTop != queueBase) {
 718                     if (!localHelpJoinTask(joinMe))
 719                         retries = 0;           // cannot help
 720                 }
 721                 else if (retries == MAX_HELP >>> 1) {
 722                     --retries;                 // check uncommon case
 723                     if (tryDeqAndExec(joinMe) >= 0)
 724                         Thread.yield();        // for politeness
 725                 }
 726                 else
 727                     retries = helpJoinTask(joinMe) ? MAX_HELP : retries - 1;
 728             }
 729             else {
 730                 retries = MAX_HELP;           // restart if not done
 731                 pool.tryAwaitJoin(joinMe);
 732             }
 733         }
 734     }
 735 
 736     /**
 737      * If present, pops and executes the given task, or any other
 738      * cancelled task
 739      *
 740      * @return false if any other non-cancelled task exists in local queue
 741      */
 742     private boolean localHelpJoinTask(ForkJoinTask<?> joinMe) {
 743         int s, i; ForkJoinTask<?>[] q; ForkJoinTask<?> t;
 744         if ((s = queueTop) != queueBase && (q = queue) != null &&
 745             (i = (q.length - 1) & --s) >= 0 &&
 746             (t = q[i]) != null) {
 747             if (t != joinMe && t.status >= 0)
 748                 return false;
 749             if (UNSAFE.compareAndSwapObject
 750                 (q, (i << ASHIFT) + ABASE, t, null)) {
 751                 queueTop = s;           // or putOrderedInt
 752                 t.doExec();
 753             }
 754         }
 755         return true;
 756     }
 757 
 758     /**
 759      * Tries to locate and execute tasks for a stealer of the given
 760      * task, or in turn one of its stealers, Traces
 761      * currentSteal->currentJoin links looking for a thread working on
 762      * a descendant of the given task and with a non-empty queue to
 763      * steal back and execute tasks from.  The implementation is very
 764      * branchy to cope with potential inconsistencies or loops
 765      * encountering chains that are stale, unknown, or of length
 766      * greater than MAX_HELP links.  All of these cases are dealt with
 767      * by just retrying by caller.
 768      *
 769      * @param joinMe the task to join
 770      * @param canSteal true if local queue is empty
 771      * @return true if ran a task
 772      */
 773     private boolean helpJoinTask(ForkJoinTask<?> joinMe) {
 774         boolean helped = false;
 775         int m = pool.scanGuard & SMASK;
 776         ForkJoinWorkerThread[] ws = pool.workers;
 777         if (ws != null && ws.length > m && joinMe.status >= 0) {
 778             int levels = MAX_HELP;              // remaining chain length
 779             ForkJoinTask<?> task = joinMe;      // base of chain
 780             outer:for (ForkJoinWorkerThread thread = this;;) {
 781                 // Try to find v, the stealer of task, by first using hint
 782                 ForkJoinWorkerThread v = ws[thread.stealHint & m];
 783                 if (v == null || v.currentSteal != task) {
 784                     for (int j = 0; ;) {        // search array
 785                         if ((v = ws[j]) != null && v.currentSteal == task) {
 786                             thread.stealHint = j;
 787                             break;              // save hint for next time
 788                         }
 789                         if (++j > m)
 790                             break outer;        // can't find stealer
 791                     }
 792                 }
 793                 // Try to help v, using specialized form of deqTask
 794                 for (;;) {
 795                     ForkJoinTask<?>[] q; int b, i;
 796                     if (joinMe.status < 0)
 797                         break outer;
 798                     if ((b = v.queueBase) == v.queueTop ||
 799                         (q = v.queue) == null ||
 800                         (i = (q.length-1) & b) < 0)
 801                         break;                  // empty
 802                     long u = (i << ASHIFT) + ABASE;
 803                     ForkJoinTask<?> t = q[i];
 804                     if (task.status < 0)
 805                         break outer;            // stale
 806                     if (t != null && v.queueBase == b &&
 807                         UNSAFE.compareAndSwapObject(q, u, t, null)) {
 808                         v.queueBase = b + 1;
 809                         v.stealHint = poolIndex;
 810                         ForkJoinTask<?> ps = currentSteal;
 811                         currentSteal = t;
 812                         t.doExec();
 813                         currentSteal = ps;
 814                         helped = true;
 815                     }
 816                 }
 817                 // Try to descend to find v's stealer
 818                 ForkJoinTask<?> next = v.currentJoin;
 819                 if (--levels > 0 && task.status >= 0 &&
 820                     next != null && next != task) {
 821                     task = next;
 822                     thread = v;
 823                 }
 824                 else
 825                     break;  // max levels, stale, dead-end, or cyclic
 826             }
 827         }
 828         return helped;
 829     }
 830 
 831     /**
 832      * Performs an uncommon case for joinTask: If task t is at base of
 833      * some workers queue, steals and executes it.
 834      *
 835      * @param t the task
 836      * @return t's status
 837      */
 838     private int tryDeqAndExec(ForkJoinTask<?> t) {
 839         int m = pool.scanGuard & SMASK;
 840         ForkJoinWorkerThread[] ws = pool.workers;
 841         if (ws != null && ws.length > m && t.status >= 0) {
 842             for (int j = 0; j <= m; ++j) {
 843                 ForkJoinTask<?>[] q; int b, i;
 844                 ForkJoinWorkerThread v = ws[j];
 845                 if (v != null &&
 846                     (b = v.queueBase) != v.queueTop &&
 847                     (q = v.queue) != null &&
 848                     (i = (q.length - 1) & b) >= 0 &&
 849                     q[i] == t) {
 850                     long u = (i << ASHIFT) + ABASE;
 851                     if (v.queueBase == b &&
 852                         UNSAFE.compareAndSwapObject(q, u, t, null)) {
 853                         v.queueBase = b + 1;
 854                         v.stealHint = poolIndex;
 855                         ForkJoinTask<?> ps = currentSteal;
 856                         currentSteal = t;
 857                         t.doExec();
 858                         currentSteal = ps;
 859                     }
 860                     break;
 861                 }
 862             }
 863         }
 864         return t.status;
 865     }
 866 
 867     /**
 868      * Implements ForkJoinTask.getSurplusQueuedTaskCount().  Returns
 869      * an estimate of the number of tasks, offset by a function of
 870      * number of idle workers.
 871      *
 872      * This method provides a cheap heuristic guide for task
 873      * partitioning when programmers, frameworks, tools, or languages
 874      * have little or no idea about task granularity.  In essence by
 875      * offering this method, we ask users only about tradeoffs in
 876      * overhead vs expected throughput and its variance, rather than
 877      * how finely to partition tasks.
 878      *
 879      * In a steady state strict (tree-structured) computation, each
 880      * thread makes available for stealing enough tasks for other
 881      * threads to remain active. Inductively, if all threads play by
 882      * the same rules, each thread should make available only a
 883      * constant number of tasks.
 884      *
 885      * The minimum useful constant is just 1. But using a value of 1
 886      * would require immediate replenishment upon each steal to
 887      * maintain enough tasks, which is infeasible.  Further,
 888      * partitionings/granularities of offered tasks should minimize
 889      * steal rates, which in general means that threads nearer the top
 890      * of computation tree should generate more than those nearer the
 891      * bottom. In perfect steady state, each thread is at
 892      * approximately the same level of computation tree. However,
 893      * producing extra tasks amortizes the uncertainty of progress and
 894      * diffusion assumptions.
 895      *
 896      * So, users will want to use values larger, but not much larger
 897      * than 1 to both smooth over transient shortages and hedge
 898      * against uneven progress; as traded off against the cost of
 899      * extra task overhead. We leave the user to pick a threshold
 900      * value to compare with the results of this call to guide
 901      * decisions, but recommend values such as 3.
 902      *
 903      * When all threads are active, it is on average OK to estimate
 904      * surplus strictly locally. In steady-state, if one thread is
 905      * maintaining say 2 surplus tasks, then so are others. So we can
 906      * just use estimated queue length (although note that (queueTop -
 907      * queueBase) can be an overestimate because of stealers lagging
 908      * increments of queueBase).  However, this strategy alone leads
 909      * to serious mis-estimates in some non-steady-state conditions
 910      * (ramp-up, ramp-down, other stalls). We can detect many of these
 911      * by further considering the number of "idle" threads, that are
 912      * known to have zero queued tasks, so compensate by a factor of
 913      * (#idle/#active) threads.
 914      */
 915     final int getEstimatedSurplusTaskCount() {
 916         return queueTop - queueBase - pool.idlePerActive();
 917     }
 918 
 919     /**
 920      * Runs tasks until {@code pool.isQuiescent()}. We piggyback on
 921      * pool's active count ctl maintenance, but rather than blocking
 922      * when tasks cannot be found, we rescan until all others cannot
 923      * find tasks either. The bracketing by pool quiescerCounts
 924      * updates suppresses pool auto-shutdown mechanics that could
 925      * otherwise prematurely terminate the pool because all threads
 926      * appear to be inactive.
 927      */
 928     final void helpQuiescePool() {
 929         boolean active = true;
 930         ForkJoinTask<?> ps = currentSteal; // to restore below
 931         ForkJoinPool p = pool;
 932         p.addQuiescerCount(1);
 933         for (;;) {
 934             ForkJoinWorkerThread[] ws = p.workers;
 935             ForkJoinWorkerThread v = null;
 936             int n;
 937             if (queueTop != queueBase)
 938                 v = this;
 939             else if (ws != null && (n = ws.length) > 1) {
 940                 ForkJoinWorkerThread w;
 941                 int r = nextSeed(); // cheap version of FJP.scan
 942                 int steps = n << 1;
 943                 for (int i = 0; i < steps; ++i) {
 944                     if ((w = ws[(i + r) & (n - 1)]) != null &&
 945                         w.queueBase != w.queueTop) {
 946                         v = w;
 947                         break;
 948                     }
 949                 }
 950             }
 951             if (v != null) {
 952                 ForkJoinTask<?> t;
 953                 if (!active) {
 954                     active = true;
 955                     p.addActiveCount(1);
 956                 }
 957                 if ((t = (v != this) ? v.deqTask() :
 958                      locallyFifo ? locallyDeqTask() : popTask()) != null) {
 959                     currentSteal = t;
 960                     t.doExec();
 961                     currentSteal = ps;
 962                 }
 963             }
 964             else {
 965                 if (active) {
 966                     active = false;
 967                     p.addActiveCount(-1);
 968                 }
 969                 if (p.isQuiescent()) {
 970                     p.addActiveCount(1);
 971                     p.addQuiescerCount(-1);
 972                     break;
 973                 }
 974             }
 975         }
 976     }
 977 
 978     // Unsafe mechanics
 979     private static final sun.misc.Unsafe UNSAFE;
 980     private static final long ABASE;
 981     private static final int ASHIFT;
 982 
 983     static {
 984         int s;
 985         try {
 986             UNSAFE = sun.misc.Unsafe.getUnsafe();
 987             Class<?> a = ForkJoinTask[].class;
 988             ABASE = UNSAFE.arrayBaseOffset(a);
 989             s = UNSAFE.arrayIndexScale(a);
 990         } catch (Exception e) {
 991             throw new Error(e);
 992         }
 993         if ((s & (s-1)) != 0)
 994             throw new Error("data type scale not a power of two");
 995         ASHIFT = 31 - Integer.numberOfLeadingZeros(s);
 996     }
 997 
 998 }