1 /*
   2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   3  *
   4  * This code is free software; you can redistribute it and/or modify it
   5  * under the terms of the GNU General Public License version 2 only, as
   6  * published by the Free Software Foundation.  Oracle designates this
   7  * particular file as subject to the "Classpath" exception as provided
   8  * by Oracle in the LICENSE file that accompanied this code.
   9  *
  10  * This code is distributed in the hope that it will be useful, but WITHOUT
  11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  13  * version 2 for more details (a copy is included in the LICENSE file that
  14  * accompanied this code).
  15  *
  16  * You should have received a copy of the GNU General Public License version
  17  * 2 along with this work; if not, write to the Free Software Foundation,
  18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  19  *
  20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  21  * or visit www.oracle.com if you need additional information or have any
  22  * questions.
  23  */
  24 
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/licenses/publicdomain
  34  */
  35 
  36 package java.util.concurrent;
  37 
  38 import java.util.Random;
  39 import java.util.Collection;
  40 import java.util.concurrent.locks.LockSupport;

  41 
  42 /**
  43  * A thread managed by a {@link ForkJoinPool}.  This class is
  44  * subclassable solely for the sake of adding functionality -- there
  45  * are no overridable methods dealing with scheduling or execution.
  46  * However, you can override initialization and termination methods
  47  * surrounding the main task processing loop.  If you do create such a
  48  * subclass, you will also need to supply a custom {@link
  49  * ForkJoinPool.ForkJoinWorkerThreadFactory} to use it in a {@code
  50  * ForkJoinPool}.

  51  *
  52  * @since 1.7
  53  * @author Doug Lea
  54  */
  55 public class ForkJoinWorkerThread extends Thread {
  56     /*
  57      * Overview:
  58      *
  59      * ForkJoinWorkerThreads are managed by ForkJoinPools and perform
  60      * ForkJoinTasks. This class includes bookkeeping in support of
  61      * worker activation, suspension, and lifecycle control described
  62      * in more detail in the internal documentation of class
  63      * ForkJoinPool. And as described further below, this class also
  64      * includes special-cased support for some ForkJoinTask
  65      * methods. But the main mechanics involve work-stealing:
  66      *
  67      * Work-stealing queues are special forms of Deques that support
  68      * only three of the four possible end-operations -- push, pop,
  69      * and deq (aka steal), under the further constraints that push
  70      * and pop are called only from the owning thread, while deq may
  71      * be called from other threads.  (If you are unfamiliar with
  72      * them, you probably want to read Herlihy and Shavit's book "The
  73      * Art of Multiprocessor programming", chapter 16 describing these
  74      * in more detail before proceeding.)  The main work-stealing
  75      * queue design is roughly similar to those in the papers "Dynamic
  76      * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
  77      * (http://research.sun.com/scalable/pubs/index.html) and
  78      * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
  79      * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
  80      * The main differences ultimately stem from gc requirements that
  81      * we null out taken slots as soon as we can, to maintain as small
  82      * a footprint as possible even in programs generating huge
  83      * numbers of tasks. To accomplish this, we shift the CAS
  84      * arbitrating pop vs deq (steal) from being on the indices
  85      * ("base" and "sp") to the slots themselves (mainly via method
  86      * "casSlotNull()"). So, both a successful pop and deq mainly
  87      * entail a CAS of a slot from non-null to null.  Because we rely
  88      * on CASes of references, we do not need tag bits on base or sp.
  89      * They are simple ints as used in any circular array-based queue
  90      * (see for example ArrayDeque).  Updates to the indices must
  91      * still be ordered in a way that guarantees that sp == base means
  92      * the queue is empty, but otherwise may err on the side of
  93      * possibly making the queue appear nonempty when a push, pop, or
  94      * deq have not fully committed. Note that this means that the deq
  95      * operation, considered individually, is not wait-free. One thief
  96      * cannot successfully continue until another in-progress one (or,
  97      * if previously empty, a push) completes.  However, in the
  98      * aggregate, we ensure at least probabilistic non-blockingness.
  99      * If an attempted steal fails, a thief always chooses a different
 100      * random victim target to try next. So, in order for one thief to
 101      * progress, it suffices for any in-progress deq or new push on
 102      * any empty queue to complete. One reason this works well here is
 103      * that apparently-nonempty often means soon-to-be-stealable,
 104      * which gives threads a chance to set activation status if
 105      * necessary before stealing.
 106      *
 107      * This approach also enables support for "async mode" where local
 108      * task processing is in FIFO, not LIFO order; simply by using a
 109      * version of deq rather than pop when locallyFifo is true (as set
 110      * by the ForkJoinPool).  This allows use in message-passing
 111      * frameworks in which tasks are never joined.
 112      *
 113      * When a worker would otherwise be blocked waiting to join a
 114      * task, it first tries a form of linear helping: Each worker
 115      * records (in field currentSteal) the most recent task it stole
 116      * from some other worker. Plus, it records (in field currentJoin)
 117      * the task it is currently actively joining. Method joinTask uses
 118      * these markers to try to find a worker to help (i.e., steal back
 119      * a task from and execute it) that could hasten completion of the
 120      * actively joined task. In essence, the joiner executes a task
 121      * that would be on its own local deque had the to-be-joined task
 122      * not been stolen. This may be seen as a conservative variant of
 123      * the approach in Wagner & Calder "Leapfrogging: a portable
 124      * technique for implementing efficient futures" SIGPLAN Notices,
 125      * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs
 126      * in that: (1) We only maintain dependency links across workers
 127      * upon steals, rather than use per-task bookkeeping.  This may
 128      * require a linear scan of workers array to locate stealers, but
 129      * usually doesn't because stealers leave hints (that may become
 130      * stale/wrong) of where to locate them. This isolates cost to
 131      * when it is needed, rather than adding to per-task overhead.
 132      * (2) It is "shallow", ignoring nesting and potentially cyclic
 133      * mutual steals.  (3) It is intentionally racy: field currentJoin
 134      * is updated only while actively joining, which means that we
 135      * miss links in the chain during long-lived tasks, GC stalls etc
 136      * (which is OK since blocking in such cases is usually a good
 137      * idea).  (4) We bound the number of attempts to find work (see
 138      * MAX_HELP_DEPTH) and fall back to suspending the worker and if
 139      * necessary replacing it with a spare (see
 140      * ForkJoinPool.awaitJoin).
 141      *
 142      * Efficient implementation of these algorithms currently relies
 143      * on an uncomfortable amount of "Unsafe" mechanics. To maintain
 144      * correct orderings, reads and writes of variable base require
 145      * volatile ordering.  Variable sp does not require volatile
 146      * writes but still needs store-ordering, which we accomplish by
 147      * pre-incrementing sp before filling the slot with an ordered
 148      * store.  (Pre-incrementing also enables backouts used in
 149      * joinTask.)  Because they are protected by volatile base reads,
 150      * reads of the queue array and its slots by other threads do not
 151      * need volatile load semantics, but writes (in push) require
 152      * store order and CASes (in pop and deq) require (volatile) CAS
 153      * semantics.  (Michael, Saraswat, and Vechev's algorithm has
 154      * similar properties, but without support for nulling slots.)
 155      * Since these combinations aren't supported using ordinary
 156      * volatiles, the only way to accomplish these efficiently is to
 157      * use direct Unsafe calls. (Using external AtomicIntegers and
 158      * AtomicReferenceArrays for the indices and array is
 159      * significantly slower because of memory locality and indirection
 160      * effects.)
 161      *
 162      * Further, performance on most platforms is very sensitive to
 163      * placement and sizing of the (resizable) queue array.  Even
 164      * though these queues don't usually become all that big, the
 165      * initial size must be large enough to counteract cache
 166      * contention effects across multiple queues (especially in the
 167      * presence of GC cardmarking). Also, to improve thread-locality,
 168      * queues are initialized after starting.  All together, these
 169      * low-level implementation choices produce as much as a factor of
 170      * 4 performance improvement compared to naive implementations,
 171      * and enable the processing of billions of tasks per second,
 172      * sometimes at the expense of ugliness.
 173      */
 174 
 175     /**
 176      * Generator for initial random seeds for random victim
 177      * selection. This is used only to create initial seeds. Random
 178      * steals use a cheaper xorshift generator per steal attempt. We
 179      * expect only rare contention on seedGenerator, so just use a
 180      * plain Random.
 181      */
 182     private static final Random seedGenerator = new Random();
 183 
 184     /**
 185      * The maximum stolen->joining link depth allowed in helpJoinTask.
 186      * Depths for legitimate chains are unbounded, but we use a fixed
 187      * constant to avoid (otherwise unchecked) cycles and bound
 188      * staleness of traversal parameters at the expense of sometimes
 189      * blocking when we could be helping.
 190      */
 191     private static final int MAX_HELP_DEPTH = 8;
 192 
 193     /**
 194      * Capacity of work-stealing queue array upon initialization.
 195      * Must be a power of two. Initial size must be at least 4, but is
 196      * padded to minimize cache effects.
 197      */
 198     private static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
 199 
 200     /**
 201      * Maximum work-stealing queue array size.  Must be less than or
 202      * equal to 1 << (31 - width of array entry) to ensure lack of
 203      * index wraparound. The value is set in the static block
 204      * at the end of this file after obtaining width.
 205      */
 206     private static final int MAXIMUM_QUEUE_CAPACITY;
 207 
 208     /**
 209      * The pool this thread works in. Accessed directly by ForkJoinTask.
 210      */
 211     final ForkJoinPool pool;
 212 
 213     /**
 214      * The work-stealing queue array. Size must be a power of two.
 215      * Initialized in onStart, to improve memory locality.
 216      */
 217     private ForkJoinTask<?>[] queue;
 218 
 219     /**
 220      * Index (mod queue.length) of least valid queue slot, which is
 221      * always the next position to steal from if nonempty.
 222      */
 223     private volatile int base;
 224 
 225     /**
 226      * Index (mod queue.length) of next queue slot to push to or pop
 227      * from. It is written only by owner thread, and accessed by other
 228      * threads only after reading (volatile) base.  Both sp and base
 229      * are allowed to wrap around on overflow, but (sp - base) still
 230      * estimates size.
 231      */
 232     private int sp;
 233 
 234     /**
 235      * The index of most recent stealer, used as a hint to avoid
 236      * traversal in method helpJoinTask. This is only a hint because a
 237      * worker might have had multiple steals and this only holds one
 238      * of them (usually the most current). Declared non-volatile,
 239      * relying on other prevailing sync to keep reasonably current.
 240      */
 241     private int stealHint;
 242 
 243     /**
 244      * Run state of this worker. In addition to the usual run levels,
 245      * tracks if this worker is suspended as a spare, and if it was
 246      * killed (trimmed) while suspended. However, "active" status is
 247      * maintained separately and modified only in conjunction with
 248      * CASes of the pool's runState (which are currently sadly
 249      * manually inlined for performance.)  Accessed directly by pool
 250      * to simplify checks for normal (zero) status.
 251      */
 252     volatile int runState;
 253 
 254     private static final int TERMINATING = 0x01;
 255     private static final int TERMINATED  = 0x02;
 256     private static final int SUSPENDED   = 0x04; // inactive spare
 257     private static final int TRIMMED     = 0x08; // killed while suspended
 258 
 259     /**
 260      * Number of steals. Directly accessed (and reset) by
 261      * pool.tryAccumulateStealCount when idle.
 262      */
 263     int stealCount;
 264 
 265     /**
 266      * Seed for random number generator for choosing steal victims.
 267      * Uses Marsaglia xorshift. Must be initialized as nonzero.
 268      */
 269     private int seed;
 270 
 271     /**
 272      * Activity status. When true, this worker is considered active.
 273      * Accessed directly by pool.  Must be false upon construction.
 274      */
 275     boolean active;
 276 
 277     /**
 278      * True if use local fifo, not default lifo, for local polling.
 279      * Shadows value from ForkJoinPool.
 280      */
 281     private final boolean locallyFifo;
 282 
 283     /**
 284      * Index of this worker in pool array. Set once by pool before
 285      * running, and accessed directly by pool to locate this worker in
 286      * its workers array.
 287      */
 288     int poolIndex;
 289 
 290     /**
 291      * The last pool event waited for. Accessed only by pool in
 292      * callback methods invoked within this thread.
 293      */
 294     int lastEventCount;
 295 
 296     /**
 297      * Encoded index and event count of next event waiter. Accessed
 298      * only by ForkJoinPool for managing event waiters.
 299      */
 300     volatile long nextWaiter;
 301 
 302     /**
 303      * Number of times this thread suspended as spare. Accessed only
 304      * by pool.
 305      */
 306     int spareCount;
 307 
 308     /**
 309      * Encoded index and count of next spare waiter. Accessed only
 310      * by ForkJoinPool for managing spares.
 311      */
 312     volatile int nextSpare;
 313 
 314     /**
 315      * The task currently being joined, set only when actively trying
 316      * to help other stealers in helpJoinTask. Written only by this
 317      * thread, but read by others.
 318      */
 319     private volatile ForkJoinTask<?> currentJoin;
 320 
 321     /**
 322      * The task most recently stolen from another worker (or
 323      * submission queue).  Written only by this thread, but read by
 324      * others.
 325      */
 326     private volatile ForkJoinTask<?> currentSteal;
 327 
 328     /**
 329      * Creates a ForkJoinWorkerThread operating in the given pool.
 330      *
 331      * @param pool the pool this thread works in
 332      * @throws NullPointerException if pool is null
 333      */
 334     protected ForkJoinWorkerThread(ForkJoinPool pool) {
 335         this.pool = pool;
 336         this.locallyFifo = pool.locallyFifo;
 337         setDaemon(true);
 338         // To avoid exposing construction details to subclasses,
 339         // remaining initialization is in start() and onStart()
 340     }
 341 
 342     /**
 343      * Performs additional initialization and starts this thread.
 344      */
 345     final void start(int poolIndex, UncaughtExceptionHandler ueh) {
 346         this.poolIndex = poolIndex;
 347         if (ueh != null)
 348             setUncaughtExceptionHandler(ueh);
 349         start();
 350     }
 351 
 352     // Public/protected methods
 353 
 354     /**
 355      * Returns the pool hosting this thread.
 356      *
 357      * @return the pool
 358      */
 359     public ForkJoinPool getPool() {
 360         return pool;
 361     }
 362 
 363     /**
 364      * Returns the index number of this thread in its pool.  The
 365      * returned value ranges from zero to the maximum number of
 366      * threads (minus one) that have ever been created in the pool.
 367      * This method may be useful for applications that track status or
 368      * collect results per-worker rather than per-task.
 369      *
 370      * @return the index number
 371      */
 372     public int getPoolIndex() {
 373         return poolIndex;
 374     }
 375 
 376     /**
 377      * Initializes internal state after construction but before
 378      * processing any tasks. If you override this method, you must
 379      * invoke @code{super.onStart()} at the beginning of the method.
 380      * Initialization requires care: Most fields must have legal
 381      * default values, to ensure that attempted accesses from other
 382      * threads work correctly even before this thread starts
 383      * processing tasks.
 384      */
 385     protected void onStart() {
 386         int rs = seedGenerator.nextInt();
 387         seed = rs == 0? 1 : rs; // seed must be nonzero
 388 
 389         // Allocate name string and arrays in this thread
 390         String pid = Integer.toString(pool.getPoolNumber());
 391         String wid = Integer.toString(poolIndex);
 392         setName("ForkJoinPool-" + pid + "-worker-" + wid);
 393 
 394         queue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
 395     }
 396 
 397     /**
 398      * Performs cleanup associated with termination of this worker
 399      * thread.  If you override this method, you must invoke
 400      * {@code super.onTermination} at the end of the overridden method.
 401      *
 402      * @param exception the exception causing this thread to abort due
 403      * to an unrecoverable error, or {@code null} if completed normally
 404      */
 405     protected void onTermination(Throwable exception) {
 406         try {
 407             ForkJoinPool p = pool;
 408             if (active) {
 409                 int a; // inline p.tryDecrementActiveCount
 410                 active = false;
 411                 do {} while (!UNSAFE.compareAndSwapInt
 412                              (p, poolRunStateOffset, a = p.runState, a - 1));
 413             }
 414             cancelTasks();
 415             setTerminated();
 416             p.workerTerminated(this);
 417         } catch (Throwable ex) {        // Shouldn't ever happen
 418             if (exception == null)      // but if so, at least rethrown
 419                 exception = ex;
 420         } finally {
 421             if (exception != null)
 422                 UNSAFE.throwException(exception);
 423         }
 424     }
 425 
 426     /**
 427      * This method is required to be public, but should never be
 428      * called explicitly. It performs the main run loop to execute
 429      * ForkJoinTasks.
 430      */
 431     public void run() {
 432         Throwable exception = null;
 433         try {
 434             onStart();
 435             mainLoop();
 436         } catch (Throwable ex) {
 437             exception = ex;
 438         } finally {
 439             onTermination(exception);
 440         }
 441     }
 442 
 443     // helpers for run()
 444 
 445     /**
 446      * Finds and executes tasks, and checks status while running.
 447      */
 448     private void mainLoop() {
 449         boolean ran = false; // true if ran a task on last step
 450         ForkJoinPool p = pool;
 451         for (;;) {
 452             p.preStep(this, ran);
 453             if (runState != 0)
 454                 break;
 455             ran = tryExecSteal() || tryExecSubmission();
 456         }
 457     }
 458 
 459     /**
 460      * Tries to steal a task and execute it.
 461      *
 462      * @return true if ran a task
 463      */
 464     private boolean tryExecSteal() {
 465         ForkJoinTask<?> t;
 466         if ((t = scan()) != null) {
 467             t.quietlyExec();
 468             UNSAFE.putOrderedObject(this, currentStealOffset, null);
 469             if (sp != base)
 470                 execLocalTasks();
 471             return true;
 472         }
 473         return false;
 474     }
 475 
 476     /**
 477      * If a submission exists, try to activate and run it.
 478      *
 479      * @return true if ran a task
 480      */
 481     private boolean tryExecSubmission() {
 482         ForkJoinPool p = pool;
 483         // This loop is needed in case attempt to activate fails, in
 484         // which case we only retry if there still appears to be a
 485         // submission.
 486         while (p.hasQueuedSubmissions()) {
 487             ForkJoinTask<?> t; int a;
 488             if (active || // inline p.tryIncrementActiveCount
 489                 (active = UNSAFE.compareAndSwapInt(p, poolRunStateOffset,
 490                                                    a = p.runState, a + 1))) {
 491                 if ((t = p.pollSubmission()) != null) {
 492                     UNSAFE.putOrderedObject(this, currentStealOffset, t);
 493                     t.quietlyExec();
 494                     UNSAFE.putOrderedObject(this, currentStealOffset, null);
 495                     if (sp != base)
 496                         execLocalTasks();
 497                     return true;
 498                 }
 499             }
 500         }
 501         return false;
 502     }
 503 
 504     /**
 505      * Runs local tasks until queue is empty or shut down.  Call only
 506      * while active.
 507      */
 508     private void execLocalTasks() {
 509         while (runState == 0) {
 510             ForkJoinTask<?> t = locallyFifo ? locallyDeqTask() : popTask();
 511             if (t != null)
 512                 t.quietlyExec();
 513             else if (sp == base)
 514                 break;
 515         }
 516     }
 517 
 518     /*
 519      * Intrinsics-based atomic writes for queue slots. These are
 520      * basically the same as methods in AtomicReferenceArray, but
 521      * specialized for (1) ForkJoinTask elements (2) requirement that
 522      * nullness and bounds checks have already been performed by
 523      * callers and (3) effective offsets are known not to overflow
 524      * from int to long (because of MAXIMUM_QUEUE_CAPACITY). We don't
 525      * need corresponding version for reads: plain array reads are OK
 526      * because they are protected by other volatile reads and are
 527      * confirmed by CASes.
 528      *
 529      * Most uses don't actually call these methods, but instead contain
 530      * inlined forms that enable more predictable optimization.  We
 531      * don't define the version of write used in pushTask at all, but
 532      * instead inline there a store-fenced array slot write.
 533      */
 534 
 535     /**
 536      * CASes slot i of array q from t to null. Caller must ensure q is
 537      * non-null and index is in range.
 538      */
 539     private static final boolean casSlotNull(ForkJoinTask<?>[] q, int i,
 540                                              ForkJoinTask<?> t) {
 541         return UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, t, null);
 542     }
 543 
 544     /**
 545      * Performs a volatile write of the given task at given slot of
 546      * array q.  Caller must ensure q is non-null and index is in
 547      * range. This method is used only during resets and backouts.
 548      */
 549     private static final void writeSlot(ForkJoinTask<?>[] q, int i,
 550                                         ForkJoinTask<?> t) {
 551         UNSAFE.putObjectVolatile(q, (i << qShift) + qBase, t);
 552     }
 553 
 554     // queue methods
 555 
 556     /**
 557      * Pushes a task. Call only from this thread.
 558      *
 559      * @param t the task. Caller must ensure non-null.
 560      */
 561     final void pushTask(ForkJoinTask<?> t) {
 562         ForkJoinTask<?>[] q = queue;
 563         int mask = q.length - 1; // implicit assert q != null
 564         int s = sp++;            // ok to increment sp before slot write
 565         UNSAFE.putOrderedObject(q, ((s & mask) << qShift) + qBase, t);
 566         if ((s -= base) == 0)
 567             pool.signalWork();   // was empty
 568         else if (s == mask)
 569             growQueue();         // is full
 570     }
 571 
 572     /**
 573      * Tries to take a task from the base of the queue, failing if
 574      * empty or contended. Note: Specializations of this code appear
 575      * in locallyDeqTask and elsewhere.
 576      *
 577      * @return a task, or null if none or contended
 578      */
 579     final ForkJoinTask<?> deqTask() {
 580         ForkJoinTask<?> t;
 581         ForkJoinTask<?>[] q;
 582         int b, i;
 583         if (sp != (b = base) &&
 584             (q = queue) != null && // must read q after b
 585             (t = q[i = (q.length - 1) & b]) != null && base == b &&
 586             UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase, t, null)) {
 587             base = b + 1;
 588             return t;
 589         }
 590         return null;
 591     }
 592 
 593     /**
 594      * Tries to take a task from the base of own queue. Assumes active
 595      * status.  Called only by this thread.
 596      *
 597      * @return a task, or null if none
 598      */
 599     final ForkJoinTask<?> locallyDeqTask() {
 600         ForkJoinTask<?>[] q = queue;
 601         if (q != null) {
 602             ForkJoinTask<?> t;
 603             int b, i;
 604             while (sp != (b = base)) {
 605                 if ((t = q[i = (q.length - 1) & b]) != null && base == b &&
 606                     UNSAFE.compareAndSwapObject(q, (i << qShift) + qBase,
 607                                                 t, null)) {
 608                     base = b + 1;
 609                     return t;
 610                 }
 611             }
 612         }
 613         return null;
 614     }
 615 
 616     /**
 617      * Returns a popped task, or null if empty. Assumes active status.
 618      * Called only by this thread.
 619      */
 620     private ForkJoinTask<?> popTask() {
 621         ForkJoinTask<?>[] q = queue;
 622         if (q != null) {
 623             int s;
 624             while ((s = sp) != base) {
 625                 int i = (q.length - 1) & --s;
 626                 long u = (i << qShift) + qBase; // raw offset
 627                 ForkJoinTask<?> t = q[i];
 628                 if (t == null)   // lost to stealer
 629                     break;
 630                 if (UNSAFE.compareAndSwapObject(q, u, t, null)) {













 631                     sp = s; // putOrderedInt may encourage more timely write
 632                     // UNSAFE.putOrderedInt(this, spOffset, s);
 633                     return t;
 634                 }
 635             }
 636         }
 637         return null;
 638     }
 639 
 640     /**
 641      * Specialized version of popTask to pop only if topmost element
 642      * is the given task. Called only by this thread while active.
 643      *
 644      * @param t the task. Caller must ensure non-null.
 645      */
 646     final boolean unpushTask(ForkJoinTask<?> t) {
 647         int s;
 648         ForkJoinTask<?>[] q = queue;
 649         if ((s = sp) != base && q != null &&
 650             UNSAFE.compareAndSwapObject
 651             (q, (((q.length - 1) & --s) << qShift) + qBase, t, null)) {
 652             sp = s; // putOrderedInt may encourage more timely write
 653             // UNSAFE.putOrderedInt(this, spOffset, s);
 654             return true;
 655         }
 656         return false;
 657     }
 658 
 659     /**
 660      * Returns next task, or null if empty or contended.
 661      */
 662     final ForkJoinTask<?> peekTask() {
 663         ForkJoinTask<?>[] q = queue;
 664         if (q == null)
 665             return null;
 666         int mask = q.length - 1;
 667         int i = locallyFifo ? base : (sp - 1);
 668         return q[i & mask];
 669     }
 670 
 671     /**
 672      * Doubles queue array size. Transfers elements by emulating
 673      * steals (deqs) from old array and placing, oldest first, into
 674      * new array.
 675      */
 676     private void growQueue() {
 677         ForkJoinTask<?>[] oldQ = queue;
 678         int oldSize = oldQ.length;
 679         int newSize = oldSize << 1;
 680         if (newSize > MAXIMUM_QUEUE_CAPACITY)
 681             throw new RejectedExecutionException("Queue capacity exceeded");
 682         ForkJoinTask<?>[] newQ = queue = new ForkJoinTask<?>[newSize];
 683 
 684         int b = base;
 685         int bf = b + oldSize;
 686         int oldMask = oldSize - 1;
 687         int newMask = newSize - 1;
 688         do {
 689             int oldIndex = b & oldMask;
 690             ForkJoinTask<?> t = oldQ[oldIndex];
 691             if (t != null && !casSlotNull(oldQ, oldIndex, t))
 692                 t = null;
 693             writeSlot(newQ, b & newMask, t);
 694         } while (++b != bf);
 695         pool.signalWork();
 696     }
 697 
 698     /**
 699      * Computes next value for random victim probe in scan().  Scans
 700      * don't require a very high quality generator, but also not a
 701      * crummy one.  Marsaglia xor-shift is cheap and works well enough.
 702      * Note: This is manually inlined in scan().
 703      */
 704     private static final int xorShift(int r) {
 705         r ^= r << 13;
 706         r ^= r >>> 17;
 707         return r ^ (r << 5);
 708     }
 709 
 710     /**
 711      * Tries to steal a task from another worker. Starts at a random
 712      * index of workers array, and probes workers until finding one
 713      * with non-empty queue or finding that all are empty.  It
 714      * randomly selects the first n probes. If these are empty, it
 715      * resorts to a circular sweep, which is necessary to accurately
 716      * set active status. (The circular sweep uses steps of
 717      * approximately half the array size plus 1, to avoid bias
 718      * stemming from leftmost packing of the array in ForkJoinPool.)
 719      *
 720      * This method must be both fast and quiet -- usually avoiding
 721      * memory accesses that could disrupt cache sharing etc other than
 722      * those needed to check for and take tasks (or to activate if not
 723      * already active). This accounts for, among other things,
 724      * updating random seed in place without storing it until exit.
 725      *
 726      * @return a task, or null if none found
 727      */
 728     private ForkJoinTask<?> scan() {
 729         ForkJoinPool p = pool;
 730         ForkJoinWorkerThread[] ws;        // worker array
 731         int n;                            // upper bound of #workers
 732         if ((ws = p.workers) != null && (n = ws.length) > 1) {
 733             boolean canSteal = active;    // shadow active status
 734             int r = seed;                 // extract seed once
 735             int mask = n - 1;
 736             int j = -n;                   // loop counter
 737             int k = r;                    // worker index, random if j < 0
 738             for (;;) {
 739                 ForkJoinWorkerThread v = ws[k & mask];
 740                 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // inline xorshift
 741                 ForkJoinTask<?>[] q; ForkJoinTask<?> t; int b, a;
 742                 if (v != null && (b = v.base) != v.sp &&
 743                     (q = v.queue) != null) {
 744                     int i = (q.length - 1) & b;
 745                     long u = (i << qShift) + qBase; // raw offset
 746                     int pid = poolIndex;
 747                     if ((t = q[i]) != null) {
 748                         if (!canSteal &&  // inline p.tryIncrementActiveCount
 749                             UNSAFE.compareAndSwapInt(p, poolRunStateOffset,
 750                                                      a = p.runState, a + 1))
 751                             canSteal = active = true;
 752                         if (canSteal && v.base == b++ &&
 753                             UNSAFE.compareAndSwapObject(q, u, t, null)) {
 754                             v.base = b;
 755                             v.stealHint = pid;
 756                             UNSAFE.putOrderedObject(this,
 757                                                     currentStealOffset, t);
 758                             seed = r;
 759                             ++stealCount;
 760                             return t;
 761                         }
 762                     }
 763                     j = -n;
 764                     k = r;                // restart on contention
 765                 }
 766                 else if (++j <= 0)
 767                     k = r;
 768                 else if (j <= n)
 769                     k += (n >>> 1) | 1;
 770                 else
 771                     break;
 772             }
 773         }
 774         return null;
 775     }
 776 
 777     // Run State management
 778 
 779     // status check methods used mainly by ForkJoinPool
 780     final boolean isRunning()     { return runState == 0; }
 781     final boolean isTerminated()  { return (runState & TERMINATED) != 0; }
 782     final boolean isSuspended()   { return (runState & SUSPENDED) != 0; }
 783     final boolean isTrimmed()     { return (runState & TRIMMED) != 0; }
 784 
 785     final boolean isTerminating() {
 786         if ((runState & TERMINATING) != 0)
 787             return true;
 788         if (pool.isAtLeastTerminating()) { // propagate pool state
 789             shutdown();
 790             return true;
 791         }
 792         return false;
 793     }
 794 
 795     /**
 796      * Sets state to TERMINATING. Does NOT unpark or interrupt
 797      * to wake up if currently blocked. Callers must do so if desired.
 798      */
 799     final void shutdown() {
 800         for (;;) {
 801             int s = runState;
 802             if ((s & (TERMINATING|TERMINATED)) != 0)
 803                 break;
 804             if ((s & SUSPENDED) != 0) { // kill and wakeup if suspended
 805                 if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
 806                                              (s & ~SUSPENDED) |
 807                                              (TRIMMED|TERMINATING)))
 808                     break;
 809             }
 810             else if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
 811                                               s | TERMINATING))
 812                 break;
 813         }
 814     }
 815 
 816     /**
 817      * Sets state to TERMINATED. Called only by onTermination().
 818      */
 819     private void setTerminated() {
 820         int s;
 821         do {} while (!UNSAFE.compareAndSwapInt(this, runStateOffset,
 822                                                s = runState,
 823                                                s | (TERMINATING|TERMINATED)));
 824     }
 825 
 826     /**
 827      * If suspended, tries to set status to unsuspended.
 828      * Does NOT wake up if blocked.
 829      *
 830      * @return true if successful
 831      */
 832     final boolean tryUnsuspend() {
 833         int s;
 834         while (((s = runState) & SUSPENDED) != 0) {
 835             if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
 836                                          s & ~SUSPENDED))
 837                 return true;
 838         }
 839         return false;
 840     }
 841 
 842     /**
 843      * Sets suspended status and blocks as spare until resumed
 844      * or shutdown.
 845      */
 846     final void suspendAsSpare() {
 847         for (;;) {                  // set suspended unless terminating
 848             int s = runState;
 849             if ((s & TERMINATING) != 0) { // must kill
 850                 if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
 851                                              s | (TRIMMED | TERMINATING)))
 852                     return;
 853             }
 854             else if (UNSAFE.compareAndSwapInt(this, runStateOffset, s,
 855                                               s | SUSPENDED))
 856                 break;
 857         }
 858         ForkJoinPool p = pool;
 859         p.pushSpare(this);
 860         while ((runState & SUSPENDED) != 0) {
 861             if (p.tryAccumulateStealCount(this)) {
 862                 interrupted();          // clear/ignore interrupts
 863                 if ((runState & SUSPENDED) == 0)
 864                     break;
 865                 LockSupport.park(this);
 866             }
 867         }
 868     }
 869 
 870     // Misc support methods for ForkJoinPool
 871 
 872     /**
 873      * Returns an estimate of the number of tasks in the queue.  Also
 874      * used by ForkJoinTask.
 875      */
 876     final int getQueueSize() {
 877         int n; // external calls must read base first
 878         return (n = -base + sp) <= 0 ? 0 : n;
 879     }
 880 
 881     /**
 882      * Removes and cancels all tasks in queue.  Can be called from any
 883      * thread.
 884      */
 885     final void cancelTasks() {
 886         ForkJoinTask<?> cj = currentJoin; // try to cancel ongoing tasks
 887         if (cj != null) {
 888             currentJoin = null;
 889             cj.cancelIgnoringExceptions();
 890             try {
 891                 this.interrupt(); // awaken wait
 892             } catch (SecurityException ignore) {
 893             }
 894         }
 895         ForkJoinTask<?> cs = currentSteal;
 896         if (cs != null) {
 897             currentSteal = null;
 898             cs.cancelIgnoringExceptions();
 899         }
 900         while (base != sp) {
 901             ForkJoinTask<?> t = deqTask();
 902             if (t != null)
 903                 t.cancelIgnoringExceptions();
 904         }
 905     }
 906 
 907     /**
 908      * Drains tasks to given collection c.
 909      *
 910      * @return the number of tasks drained
 911      */
 912     final int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
 913         int n = 0;
 914         while (base != sp) {
 915             ForkJoinTask<?> t = deqTask();
 916             if (t != null) {
 917                 c.add(t);
 918                 ++n;
 919             }
 920         }
 921         return n;
 922     }
 923 
 924     // Support methods for ForkJoinTask
 925 
 926     /**
 927      * Gets and removes a local task.
 928      *
 929      * @return a task, if available
 930      */
 931     final ForkJoinTask<?> pollLocalTask() {
 932         ForkJoinPool p = pool;
 933         while (sp != base) {
 934             int a; // inline p.tryIncrementActiveCount
 935             if (active ||
 936                 (active = UNSAFE.compareAndSwapInt(p, poolRunStateOffset,
 937                                                    a = p.runState, a + 1)))
 938                 return locallyFifo ? locallyDeqTask() : popTask();
 939         }
 940         return null;
 941     }
 942 
 943     /**
 944      * Gets and removes a local or stolen task.
 945      *
 946      * @return a task, if available
 947      */
 948     final ForkJoinTask<?> pollTask() {
 949         ForkJoinTask<?> t = pollLocalTask();
 950         if (t == null) {
 951             t = scan();
 952             // cannot retain/track/help steal
 953             UNSAFE.putOrderedObject(this, currentStealOffset, null);
 954         }
 955         return t;
 956     }
 957 
 958     /**
 959      * Possibly runs some tasks and/or blocks, until task is done.
 960      *
 961      * @param joinMe the task to join


 962      */
 963     final void joinTask(ForkJoinTask<?> joinMe) {
 964         // currentJoin only written by this thread; only need ordered store
 965         ForkJoinTask<?> prevJoin = currentJoin;
 966         UNSAFE.putOrderedObject(this, currentJoinOffset, joinMe);
 967         if (sp != base)
 968             localHelpJoinTask(joinMe);
 969         if (joinMe.status >= 0)
 970             pool.awaitJoin(joinMe, this);
 971         UNSAFE.putOrderedObject(this, currentJoinOffset, prevJoin);
 972     }
 973 
 974     /**
 975      * Run tasks in local queue until given task is done.




 976      *







 977      * @param joinMe the task to join



 978      */
 979     private void localHelpJoinTask(ForkJoinTask<?> joinMe) {
 980         int s;







 981         ForkJoinTask<?>[] q;
 982         while (joinMe.status >= 0 && (s = sp) != base && (q = queue) != null) {









 983             int i = (q.length - 1) & --s;
 984             long u = (i << qShift) + qBase; // raw offset
 985             ForkJoinTask<?> t = q[i];
 986             if (t == null)  // lost to a stealer
 987                 break;
 988             if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
 989                 /*
 990                  * This recheck (and similarly in helpJoinTask)
 991                  * handles cases where joinMe is independently
 992                  * cancelled or forced even though there is other work
 993                  * available. Back out of the pop by putting t back
 994                  * into slot before we commit by writing sp.
 995                  */
 996                 if (joinMe.status < 0) {
 997                     UNSAFE.putObjectVolatile(q, u, t);
 998                     break;
 999                 }
1000                 sp = s;
1001                 // UNSAFE.putOrderedInt(this, spOffset, s);
1002                 t.quietlyExec();
1003             }
1004         }
1005     }
1006 
1007     /**
1008      * Unless terminating, tries to locate and help perform tasks for
1009      * a stealer of the given task, or in turn one of its stealers.
1010      * Traces currentSteal->currentJoin links looking for a thread
1011      * working on a descendant of the given task and with a non-empty
1012      * queue to steal back and execute tasks from.
1013      *
1014      * The implementation is very branchy to cope with potential
1015      * inconsistencies or loops encountering chains that are stale,
1016      * unknown, or of length greater than MAX_HELP_DEPTH links.  All
1017      * of these cases are dealt with by just returning back to the
1018      * caller, who is expected to retry if other join mechanisms also
1019      * don't work out.
1020      *
1021      * @param joinMe the task to join
1022      */
1023     final void helpJoinTask(ForkJoinTask<?> joinMe) {
1024         ForkJoinWorkerThread[] ws;
1025         int n;
1026         if (joinMe.status < 0)                // already done
1027             return;
1028         if ((runState & TERMINATING) != 0) {  // cancel if shutting down
1029             joinMe.cancelIgnoringExceptions();
1030             return;
1031         }
1032         if ((ws = pool.workers) == null || (n = ws.length) <= 1)
1033             return;                           // need at least 2 workers
1034 
1035         ForkJoinTask<?> task = joinMe;        // base of chain
1036         ForkJoinWorkerThread thread = this;   // thread with stolen task
1037         for (int d = 0; d < MAX_HELP_DEPTH; ++d) { // chain length

1038             // Try to find v, the stealer of task, by first using hint
1039             ForkJoinWorkerThread v = ws[thread.stealHint & (n - 1)];
1040             if (v == null || v.currentSteal != task) {
1041                 for (int j = 0; ; ++j) {      // search array
1042                     if (j < n) {
1043                         ForkJoinTask<?> vs;
1044                         if ((v = ws[j]) != null &&
1045                             (vs = v.currentSteal) != null) {
1046                             if (joinMe.status < 0 || task.status < 0)
1047                                 return;       // stale or done
1048                             if (vs == task) {


1049                                 thread.stealHint = j;
1050                                 break;        // save hint for next time
1051                             }
1052                         }
1053                     }
1054                     else
1055                         return;               // no stealer
1056                 }
1057             }
1058             for (;;) { // Try to help v, using specialized form of deqTask


1059                 if (joinMe.status < 0)
1060                     return;
1061                 int b = v.base;
1062                 ForkJoinTask<?>[] q = v.queue;
1063                 if (b == v.sp || q == null)
1064                     break;
1065                 int i = (q.length - 1) & b;
1066                 long u = (i << qShift) + qBase;
1067                 ForkJoinTask<?> t = q[i];
1068                 int pid = poolIndex;
1069                 ForkJoinTask<?> ps = currentSteal;
1070                 if (task.status < 0)
1071                     return;                   // stale or done
1072                 if (t != null && v.base == b++ &&



1073                     UNSAFE.compareAndSwapObject(q, u, t, null)) {
1074                     if (joinMe.status < 0) {
1075                         UNSAFE.putObjectVolatile(q, u, t);
1076                         return;               // back out on cancel
1077                     }
1078                     v.base = b;



1079                     v.stealHint = pid;
1080                     UNSAFE.putOrderedObject(this, currentStealOffset, t);

1081                     t.quietlyExec();
1082                     UNSAFE.putOrderedObject(this, currentStealOffset, ps);

1083                 }
1084             }






1085             // Try to descend to find v's stealer
1086             ForkJoinTask<?> next = v.currentJoin;
1087             if (task.status < 0 || next == null || next == task ||
1088                 joinMe.status < 0)
1089                 return;
1090             task = next;
1091             thread = v;
1092         }
1093     }


1094 
1095     /**
1096      * Implements ForkJoinTask.getSurplusQueuedTaskCount().
1097      * Returns an estimate of the number of tasks, offset by a
1098      * function of number of idle workers.
1099      *
1100      * This method provides a cheap heuristic guide for task
1101      * partitioning when programmers, frameworks, tools, or languages
1102      * have little or no idea about task granularity.  In essence by
1103      * offering this method, we ask users only about tradeoffs in
1104      * overhead vs expected throughput and its variance, rather than
1105      * how finely to partition tasks.
1106      *
1107      * In a steady state strict (tree-structured) computation, each
1108      * thread makes available for stealing enough tasks for other
1109      * threads to remain active. Inductively, if all threads play by
1110      * the same rules, each thread should make available only a
1111      * constant number of tasks.
1112      *
1113      * The minimum useful constant is just 1. But using a value of 1
1114      * would require immediate replenishment upon each steal to
1115      * maintain enough tasks, which is infeasible.  Further,
1116      * partitionings/granularities of offered tasks should minimize
1117      * steal rates, which in general means that threads nearer the top
1118      * of computation tree should generate more than those nearer the
1119      * bottom. In perfect steady state, each thread is at
1120      * approximately the same level of computation tree. However,
1121      * producing extra tasks amortizes the uncertainty of progress and
1122      * diffusion assumptions.
1123      *
1124      * So, users will want to use values larger, but not much larger
1125      * than 1 to both smooth over transient shortages and hedge
1126      * against uneven progress; as traded off against the cost of
1127      * extra task overhead. We leave the user to pick a threshold
1128      * value to compare with the results of this call to guide
1129      * decisions, but recommend values such as 3.
1130      *
1131      * When all threads are active, it is on average OK to estimate
1132      * surplus strictly locally. In steady-state, if one thread is
1133      * maintaining say 2 surplus tasks, then so are others. So we can
1134      * just use estimated queue length (although note that (sp - base)
1135      * can be an overestimate because of stealers lagging increments
1136      * of base).  However, this strategy alone leads to serious
1137      * mis-estimates in some non-steady-state conditions (ramp-up,
1138      * ramp-down, other stalls). We can detect many of these by
1139      * further considering the number of "idle" threads, that are
1140      * known to have zero queued tasks, so compensate by a factor of
1141      * (#idle/#active) threads.
1142      */
1143     final int getEstimatedSurplusTaskCount() {
1144         return sp - base - pool.idlePerActive();
1145     }
1146 
1147     /**
1148      * Runs tasks until {@code pool.isQuiescent()}.
1149      */
1150     final void helpQuiescePool() {
1151         ForkJoinTask<?> ps = currentSteal; // to restore below
1152         for (;;) {
1153             ForkJoinTask<?> t = pollLocalTask();
1154             if (t != null || (t = scan()) != null)
1155                 t.quietlyExec();
1156             else {
1157                 ForkJoinPool p = pool;
1158                 int a; // to inline CASes
1159                 if (active) {
1160                     if (!UNSAFE.compareAndSwapInt
1161                         (p, poolRunStateOffset, a = p.runState, a - 1))
1162                         continue;   // retry later
1163                     active = false; // inactivate
1164                     UNSAFE.putOrderedObject(this, currentStealOffset, ps);
1165                 }
1166                 if (p.isQuiescent()) {
1167                     active = true; // re-activate
1168                     do {} while (!UNSAFE.compareAndSwapInt
1169                                  (p, poolRunStateOffset, a = p.runState, a+1));
1170                     return;
1171                 }
1172             }
1173         }
1174     }
1175 
1176     // Unsafe mechanics
1177 
1178     private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
1179     private static final long spOffset =
1180         objectFieldOffset("sp", ForkJoinWorkerThread.class);
1181     private static final long runStateOffset =
1182         objectFieldOffset("runState", ForkJoinWorkerThread.class);
1183     private static final long currentJoinOffset =
1184         objectFieldOffset("currentJoin", ForkJoinWorkerThread.class);
1185     private static final long currentStealOffset =
1186         objectFieldOffset("currentSteal", ForkJoinWorkerThread.class);
1187     private static final long qBase =
1188         UNSAFE.arrayBaseOffset(ForkJoinTask[].class);
1189     private static final long poolRunStateOffset = // to inline CAS
1190         objectFieldOffset("runState", ForkJoinPool.class);
1191 
1192     private static final int qShift;
1193 
1194     static {
1195         int s = UNSAFE.arrayIndexScale(ForkJoinTask[].class);
1196         if ((s & (s-1)) != 0)
1197             throw new Error("data type scale not a power of two");
1198         qShift = 31 - Integer.numberOfLeadingZeros(s);
1199         MAXIMUM_QUEUE_CAPACITY = 1 << (31 - qShift);
1200     }
1201 
1202     private static long objectFieldOffset(String field, Class<?> klazz) {
1203         try {
1204             return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
1205         } catch (NoSuchFieldException e) {
1206             // Convert Exception to corresponding Error
1207             NoSuchFieldError error = new NoSuchFieldError(field);
1208             error.initCause(e);
1209             throw error;
1210         }
1211     }
1212 }
--- EOF ---