1 /*
   2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   3  *
   4  * This code is free software; you can redistribute it and/or modify it
   5  * under the terms of the GNU General Public License version 2 only, as
   6  * published by the Free Software Foundation.  Oracle designates this
   7  * particular file as subject to the "Classpath" exception as provided
   8  * by Oracle in the LICENSE file that accompanied this code.
   9  *
  10  * This code is distributed in the hope that it will be useful, but WITHOUT
  11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  13  * version 2 for more details (a copy is included in the LICENSE file that
  14  * accompanied this code).
  15  *
  16  * You should have received a copy of the GNU General Public License version
  17  * 2 along with this work; if not, write to the Free Software Foundation,
  18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  19  *
  20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  21  * or visit www.oracle.com if you need additional information or have any
  22  * questions.
  23  */
  24 
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/publicdomain/zero/1.0/
  34  */
  35 
  36 package java.util.concurrent;
  37 
  38 import java.util.ArrayList;
  39 import java.util.Arrays;
  40 import java.util.Collection;
  41 import java.util.Collections;
  42 import java.util.List;
  43 import java.util.concurrent.AbstractExecutorService;
  44 import java.util.concurrent.Callable;
  45 import java.util.concurrent.ExecutorService;
  46 import java.util.concurrent.Future;
  47 import java.util.concurrent.RejectedExecutionException;
  48 import java.util.concurrent.RunnableFuture;
  49 import java.util.concurrent.TimeUnit;
  50 
  51 /**
  52  * An {@link ExecutorService} for running {@link ForkJoinTask}s.
  53  * A {@code ForkJoinPool} provides the entry point for submissions
  54  * from non-{@code ForkJoinTask} clients, as well as management and
  55  * monitoring operations.
  56  *
  57  * <p>A {@code ForkJoinPool} differs from other kinds of {@link
  58  * ExecutorService} mainly by virtue of employing
  59  * <em>work-stealing</em>: all threads in the pool attempt to find and
  60  * execute tasks submitted to the pool and/or created by other active
  61  * tasks (eventually blocking waiting for work if none exist). This
  62  * enables efficient processing when most tasks spawn other subtasks
  63  * (as do most {@code ForkJoinTask}s), as well as when many small
  64  * tasks are submitted to the pool from external clients.  Especially
  65  * when setting <em>asyncMode</em> to true in constructors, {@code
  66  * ForkJoinPool}s may also be appropriate for use with event-style
  67  * tasks that are never joined.
  68  *
  69  * <p>A static {@link #commonPool()} is available and appropriate for
  70  * most applications. The common pool is used by any ForkJoinTask that
  71  * is not explicitly submitted to a specified pool. Using the common
  72  * pool normally reduces resource usage (its threads are slowly
  73  * reclaimed during periods of non-use, and reinstated upon subsequent
  74  * use).
  75  *
  76  * <p>For applications that require separate or custom pools, a {@code
  77  * ForkJoinPool} may be constructed with a given target parallelism
  78  * level; by default, equal to the number of available processors. The
  79  * pool attempts to maintain enough active (or available) threads by
  80  * dynamically adding, suspending, or resuming internal worker
  81  * threads, even if some tasks are stalled waiting to join
  82  * others. However, no such adjustments are guaranteed in the face of
  83  * blocked I/O or other unmanaged synchronization. The nested {@link
  84  * ManagedBlocker} interface enables extension of the kinds of
  85  * synchronization accommodated.
  86  *
  87  * <p>In addition to execution and lifecycle control methods, this
  88  * class provides status check methods (for example
  89  * {@link #getStealCount}) that are intended to aid in developing,
  90  * tuning, and monitoring fork/join applications. Also, method
  91  * {@link #toString} returns indications of pool state in a
  92  * convenient form for informal monitoring.
  93  *
  94  * <p>As is the case with other ExecutorServices, there are three
  95  * main task execution methods summarized in the following table.
  96  * These are designed to be used primarily by clients not already
  97  * engaged in fork/join computations in the current pool.  The main
  98  * forms of these methods accept instances of {@code ForkJoinTask},
  99  * but overloaded forms also allow mixed execution of plain {@code
 100  * Runnable}- or {@code Callable}- based activities as well.  However,
 101  * tasks that are already executing in a pool should normally instead
 102  * use the within-computation forms listed in the table unless using
 103  * async event-style tasks that are not usually joined, in which case
 104  * there is little difference among choice of methods.
 105  *
 106  * <table BORDER CELLPADDING=3 CELLSPACING=1>
 107  *  <tr>
 108  *    <td></td>
 109  *    <td ALIGN=CENTER> <b>Call from non-fork/join clients</b></td>
 110  *    <td ALIGN=CENTER> <b>Call from within fork/join computations</b></td>
 111  *  </tr>
 112  *  <tr>
 113  *    <td> <b>Arrange async execution</td>
 114  *    <td> {@link #execute(ForkJoinTask)}</td>
 115  *    <td> {@link ForkJoinTask#fork}</td>
 116  *  </tr>
 117  *  <tr>
 118  *    <td> <b>Await and obtain result</td>
 119  *    <td> {@link #invoke(ForkJoinTask)}</td>
 120  *    <td> {@link ForkJoinTask#invoke}</td>
 121  *  </tr>
 122  *  <tr>
 123  *    <td> <b>Arrange exec and obtain Future</td>
 124  *    <td> {@link #submit(ForkJoinTask)}</td>
 125  *    <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td>
 126  *  </tr>
 127  * </table>
 128  *
 129  * <p>The common pool is by default constructed with default
 130  * parameters, but these may be controlled by setting three {@link
 131  * System#getProperty system properties} with prefix {@code
 132  * java.util.concurrent.ForkJoinPool.common}: {@code parallelism} --
 133  * an integer greater than zero, {@code threadFactory} -- the class
 134  * name of a {@link ForkJoinWorkerThreadFactory}, and {@code
 135  * exceptionHandler} -- the class name of a {@link
 136  * java.lang.Thread.UncaughtExceptionHandler
 137  * Thread.UncaughtExceptionHandler}. Upon any error in establishing
 138  * these settings, default parameters are used.
 139  *
 140  * <p><b>Implementation notes</b>: This implementation restricts the
 141  * maximum number of running threads to 32767. Attempts to create
 142  * pools with greater than the maximum number result in
 143  * {@code IllegalArgumentException}.
 144  *
 145  * <p>This implementation rejects submitted tasks (that is, by throwing
 146  * {@link RejectedExecutionException}) only when the pool is shut down
 147  * or internal resources have been exhausted.
 148  *
 149  * @since 1.7
 150  * @author Doug Lea
 151  */
 152 public class ForkJoinPool extends AbstractExecutorService {
 153 
 154     /*
 155      * Implementation Overview
 156      *
 157      * This class and its nested classes provide the main
 158      * functionality and control for a set of worker threads:
 159      * Submissions from non-FJ threads enter into submission queues.
 160      * Workers take these tasks and typically split them into subtasks
 161      * that may be stolen by other workers.  Preference rules give
 162      * first priority to processing tasks from their own queues (LIFO
 163      * or FIFO, depending on mode), then to randomized FIFO steals of
 164      * tasks in other queues.
 165      *
 166      * WorkQueues
 167      * ==========
 168      *
 169      * Most operations occur within work-stealing queues (in nested
 170      * class WorkQueue).  These are special forms of Deques that
 171      * support only three of the four possible end-operations -- push,
 172      * pop, and poll (aka steal), under the further constraints that
 173      * push and pop are called only from the owning thread (or, as
 174      * extended here, under a lock), while poll may be called from
 175      * other threads.  (If you are unfamiliar with them, you probably
 176      * want to read Herlihy and Shavit's book "The Art of
 177      * Multiprocessor programming", chapter 16 describing these in
 178      * more detail before proceeding.)  The main work-stealing queue
 179      * design is roughly similar to those in the papers "Dynamic
 180      * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
 181      * (http://research.sun.com/scalable/pubs/index.html) and
 182      * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
 183      * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
 184      * The main differences ultimately stem from GC requirements that
 185      * we null out taken slots as soon as we can, to maintain as small
 186      * a footprint as possible even in programs generating huge
 187      * numbers of tasks. To accomplish this, we shift the CAS
 188      * arbitrating pop vs poll (steal) from being on the indices
 189      * ("base" and "top") to the slots themselves.  So, both a
 190      * successful pop and poll mainly entail a CAS of a slot from
 191      * non-null to null.  Because we rely on CASes of references, we
 192      * do not need tag bits on base or top.  They are simple ints as
 193      * used in any circular array-based queue (see for example
 194      * ArrayDeque).  Updates to the indices must still be ordered in a
 195      * way that guarantees that top == base means the queue is empty,
 196      * but otherwise may err on the side of possibly making the queue
 197      * appear nonempty when a push, pop, or poll have not fully
 198      * committed. Note that this means that the poll operation,
 199      * considered individually, is not wait-free. One thief cannot
 200      * successfully continue until another in-progress one (or, if
 201      * previously empty, a push) completes.  However, in the
 202      * aggregate, we ensure at least probabilistic non-blockingness.
 203      * If an attempted steal fails, a thief always chooses a different
 204      * random victim target to try next. So, in order for one thief to
 205      * progress, it suffices for any in-progress poll or new push on
 206      * any empty queue to complete. (This is why we normally use
 207      * method pollAt and its variants that try once at the apparent
 208      * base index, else consider alternative actions, rather than
 209      * method poll.)
 210      *
 211      * This approach also enables support of a user mode in which local
 212      * task processing is in FIFO, not LIFO order, simply by using
 213      * poll rather than pop.  This can be useful in message-passing
 214      * frameworks in which tasks are never joined.  However neither
 215      * mode considers affinities, loads, cache localities, etc, so
 216      * rarely provide the best possible performance on a given
 217      * machine, but portably provide good throughput by averaging over
 218      * these factors.  (Further, even if we did try to use such
 219      * information, we do not usually have a basis for exploiting it.
 220      * For example, some sets of tasks profit from cache affinities,
 221      * but others are harmed by cache pollution effects.)
 222      *
 223      * WorkQueues are also used in a similar way for tasks submitted
 224      * to the pool. We cannot mix these tasks in the same queues used
 225      * for work-stealing (this would contaminate lifo/fifo
 226      * processing). Instead, we randomly associate submission queues
 227      * with submitting threads, using a form of hashing.  The
 228      * ThreadLocal Submitter class contains a value initially used as
 229      * a hash code for choosing existing queues, but may be randomly
 230      * repositioned upon contention with other submitters.  In
 231      * essence, submitters act like workers except that they are
 232      * restricted to executing local tasks that they submitted (or in
 233      * the case of CountedCompleters, others with the same root task).
 234      * However, because most shared/external queue operations are more
 235      * expensive than internal, and because, at steady state, external
 236      * submitters will compete for CPU with workers, ForkJoinTask.join
 237      * and related methods disable them from repeatedly helping to
 238      * process tasks if all workers are active.  Insertion of tasks in
 239      * shared mode requires a lock (mainly to protect in the case of
 240      * resizing) but we use only a simple spinlock (using bits in
 241      * field qlock), because submitters encountering a busy queue move
 242      * on to try or create other queues -- they block only when
 243      * creating and registering new queues.
 244      *
 245      * Management
 246      * ==========
 247      *
 248      * The main throughput advantages of work-stealing stem from
 249      * decentralized control -- workers mostly take tasks from
 250      * themselves or each other. We cannot negate this in the
 251      * implementation of other management responsibilities. The main
 252      * tactic for avoiding bottlenecks is packing nearly all
 253      * essentially atomic control state into two volatile variables
 254      * that are by far most often read (not written) as status and
 255      * consistency checks.
 256      *
 257      * Field "ctl" contains 64 bits holding all the information needed
 258      * to atomically decide to add, inactivate, enqueue (on an event
 259      * queue), dequeue, and/or re-activate workers.  To enable this
 260      * packing, we restrict maximum parallelism to (1<<15)-1 (which is
 261      * far in excess of normal operating range) to allow ids, counts,
 262      * and their negations (used for thresholding) to fit into 16bit
 263      * fields.
 264      *
 265      * Field "plock" is a form of sequence lock with a saturating
 266      * shutdown bit (similarly for per-queue "qlocks"), mainly
 267      * protecting updates to the workQueues array, as well as to
 268      * enable shutdown.  When used as a lock, it is normally only very
 269      * briefly held, so is nearly always available after at most a
 270      * brief spin, but we use a monitor-based backup strategy to
 271      * block when needed.
 272      *
 273      * Recording WorkQueues.  WorkQueues are recorded in the
 274      * "workQueues" array that is created upon first use and expanded
 275      * if necessary.  Updates to the array while recording new workers
 276      * and unrecording terminated ones are protected from each other
 277      * by a lock but the array is otherwise concurrently readable, and
 278      * accessed directly.  To simplify index-based operations, the
 279      * array size is always a power of two, and all readers must
 280      * tolerate null slots. Worker queues are at odd indices. Shared
 281      * (submission) queues are at even indices, up to a maximum of 64
 282      * slots, to limit growth even if array needs to expand to add
 283      * more workers. Grouping them together in this way simplifies and
 284      * speeds up task scanning.
 285      *
 286      * All worker thread creation is on-demand, triggered by task
 287      * submissions, replacement of terminated workers, and/or
 288      * compensation for blocked workers. However, all other support
 289      * code is set up to work with other policies.  To ensure that we
 290      * do not hold on to worker references that would prevent GC, ALL
 291      * accesses to workQueues are via indices into the workQueues
 292      * array (which is one source of some of the messy code
 293      * constructions here). In essence, the workQueues array serves as
 294      * a weak reference mechanism. Thus for example the wait queue
 295      * field of ctl stores indices, not references.  Access to the
 296      * workQueues in associated methods (for example signalWork) must
 297      * both index-check and null-check the IDs. All such accesses
 298      * ignore bad IDs by returning out early from what they are doing,
 299      * since this can only be associated with termination, in which
 300      * case it is OK to give up.  All uses of the workQueues array
 301      * also check that it is non-null (even if previously
 302      * non-null). This allows nulling during termination, which is
 303      * currently not necessary, but remains an option for
 304      * resource-revocation-based shutdown schemes. It also helps
 305      * reduce JIT issuance of uncommon-trap code, which tends to
 306      * unnecessarily complicate control flow in some methods.
 307      *
 308      * Event Queuing. Unlike HPC work-stealing frameworks, we cannot
 309      * let workers spin indefinitely scanning for tasks when none can
 310      * be found immediately, and we cannot start/resume workers unless
 311      * there appear to be tasks available.  On the other hand, we must
 312      * quickly prod them into action when new tasks are submitted or
 313      * generated. In many usages, ramp-up time to activate workers is
 314      * the main limiting factor in overall performance (this is
 315      * compounded at program start-up by JIT compilation and
 316      * allocation). So we try to streamline this as much as possible.
 317      * We park/unpark workers after placing in an event wait queue
 318      * when they cannot find work. This "queue" is actually a simple
 319      * Treiber stack, headed by the "id" field of ctl, plus a 15bit
 320      * counter value (that reflects the number of times a worker has
 321      * been inactivated) to avoid ABA effects (we need only as many
 322      * version numbers as worker threads). Successors are held in
 323      * field WorkQueue.nextWait.  Queuing deals with several intrinsic
 324      * races, mainly that a task-producing thread can miss seeing (and
 325      * signalling) another thread that gave up looking for work but
 326      * has not yet entered the wait queue. We solve this by requiring
 327      * a full sweep of all workers (via repeated calls to method
 328      * scan()) both before and after a newly waiting worker is added
 329      * to the wait queue. During a rescan, the worker might release
 330      * some other queued worker rather than itself, which has the same
 331      * net effect. Because enqueued workers may actually be rescanning
 332      * rather than waiting, we set and clear the "parker" field of
 333      * WorkQueues to reduce unnecessary calls to unpark.  (This
 334      * requires a secondary recheck to avoid missed signals.)  Note
 335      * the unusual conventions about Thread.interrupts surrounding
 336      * parking and other blocking: Because interrupts are used solely
 337      * to alert threads to check termination, which is checked anyway
 338      * upon blocking, we clear status (using Thread.interrupted)
 339      * before any call to park, so that park does not immediately
 340      * return due to status being set via some other unrelated call to
 341      * interrupt in user code.
 342      *
 343      * Signalling.  We create or wake up workers only when there
 344      * appears to be at least one task they might be able to find and
 345      * execute. However, many other threads may notice the same task
 346      * and each signal to wake up a thread that might take it. So in
 347      * general, pools will be over-signalled.  When a submission is
 348      * added or another worker adds a task to a queue that has fewer
 349      * than two tasks, they signal waiting workers (or trigger
 350      * creation of new ones if fewer than the given parallelism level
 351      * -- signalWork), and may leave a hint to the unparked worker to
 352      * help signal others upon wakeup).  These primary signals are
 353      * buttressed by others (see method helpSignal) whenever other
 354      * threads scan for work or do not have a task to process.  On
 355      * most platforms, signalling (unpark) overhead time is noticeably
 356      * long, and the time between signalling a thread and it actually
 357      * making progress can be very noticeably long, so it is worth
 358      * offloading these delays from critical paths as much as
 359      * possible.
 360      *
 361      * Trimming workers. To release resources after periods of lack of
 362      * use, a worker starting to wait when the pool is quiescent will
 363      * time out and terminate if the pool has remained quiescent for a
 364      * given period -- a short period if there are more threads than
 365      * parallelism, longer as the number of threads decreases. This
 366      * will slowly propagate, eventually terminating all workers after
 367      * periods of non-use.
 368      *
 369      * Shutdown and Termination. A call to shutdownNow atomically sets
 370      * a plock bit and then (non-atomically) sets each worker's
 371      * qlock status, cancels all unprocessed tasks, and wakes up
 372      * all waiting workers.  Detecting whether termination should
 373      * commence after a non-abrupt shutdown() call requires more work
 374      * and bookkeeping. We need consensus about quiescence (i.e., that
 375      * there is no more work). The active count provides a primary
 376      * indication but non-abrupt shutdown still requires a rechecking
 377      * scan for any workers that are inactive but not queued.
 378      *
 379      * Joining Tasks
 380      * =============
 381      *
 382      * Any of several actions may be taken when one worker is waiting
 383      * to join a task stolen (or always held) by another.  Because we
 384      * are multiplexing many tasks on to a pool of workers, we can't
 385      * just let them block (as in Thread.join).  We also cannot just
 386      * reassign the joiner's run-time stack with another and replace
 387      * it later, which would be a form of "continuation", that even if
 388      * possible is not necessarily a good idea since we sometimes need
 389      * both an unblocked task and its continuation to progress.
 390      * Instead we combine two tactics:
 391      *
 392      *   Helping: Arranging for the joiner to execute some task that it
 393      *      would be running if the steal had not occurred.
 394      *
 395      *   Compensating: Unless there are already enough live threads,
 396      *      method tryCompensate() may create or re-activate a spare
 397      *      thread to compensate for blocked joiners until they unblock.
 398      *
 399      * A third form (implemented in tryRemoveAndExec) amounts to
 400      * helping a hypothetical compensator: If we can readily tell that
 401      * a possible action of a compensator is to steal and execute the
 402      * task being joined, the joining thread can do so directly,
 403      * without the need for a compensation thread (although at the
 404      * expense of larger run-time stacks, but the tradeoff is
 405      * typically worthwhile).
 406      *
 407      * The ManagedBlocker extension API can't use helping so relies
 408      * only on compensation in method awaitBlocker.
 409      *
 410      * The algorithm in tryHelpStealer entails a form of "linear"
 411      * helping: Each worker records (in field currentSteal) the most
 412      * recent task it stole from some other worker. Plus, it records
 413      * (in field currentJoin) the task it is currently actively
 414      * joining. Method tryHelpStealer uses these markers to try to
 415      * find a worker to help (i.e., steal back a task from and execute
 416      * it) that could hasten completion of the actively joined task.
 417      * In essence, the joiner executes a task that would be on its own
 418      * local deque had the to-be-joined task not been stolen. This may
 419      * be seen as a conservative variant of the approach in Wagner &
 420      * Calder "Leapfrogging: a portable technique for implementing
 421      * efficient futures" SIGPLAN Notices, 1993
 422      * (http://portal.acm.org/citation.cfm?id=155354). It differs in
 423      * that: (1) We only maintain dependency links across workers upon
 424      * steals, rather than use per-task bookkeeping.  This sometimes
 425      * requires a linear scan of workQueues array to locate stealers,
 426      * but often doesn't because stealers leave hints (that may become
 427      * stale/wrong) of where to locate them.  It is only a hint
 428      * because a worker might have had multiple steals and the hint
 429      * records only one of them (usually the most current).  Hinting
 430      * isolates cost to when it is needed, rather than adding to
 431      * per-task overhead.  (2) It is "shallow", ignoring nesting and
 432      * potentially cyclic mutual steals.  (3) It is intentionally
 433      * racy: field currentJoin is updated only while actively joining,
 434      * which means that we miss links in the chain during long-lived
 435      * tasks, GC stalls etc (which is OK since blocking in such cases
 436      * is usually a good idea).  (4) We bound the number of attempts
 437      * to find work (see MAX_HELP) and fall back to suspending the
 438      * worker and if necessary replacing it with another.
 439      *
 440      * Helping actions for CountedCompleters are much simpler: Method
 441      * helpComplete can take and execute any task with the same root
 442      * as the task being waited on. However, this still entails some
 443      * traversal of completer chains, so is less efficient than using
 444      * CountedCompleters without explicit joins.
 445      *
 446      * It is impossible to keep exactly the target parallelism number
 447      * of threads running at any given time.  Determining the
 448      * existence of conservatively safe helping targets, the
 449      * availability of already-created spares, and the apparent need
 450      * to create new spares are all racy, so we rely on multiple
 451      * retries of each.  Compensation in the apparent absence of
 452      * helping opportunities is challenging to control on JVMs, where
 453      * GC and other activities can stall progress of tasks that in
 454      * turn stall out many other dependent tasks, without us being
 455      * able to determine whether they will ever require compensation.
 456      * Even though work-stealing otherwise encounters little
 457      * degradation in the presence of more threads than cores,
 458      * aggressively adding new threads in such cases entails risk of
 459      * unwanted positive feedback control loops in which more threads
 460      * cause more dependent stalls (as well as delayed progress of
 461      * unblocked threads to the point that we know they are available)
 462      * leading to more situations requiring more threads, and so
 463      * on. This aspect of control can be seen as an (analytically
 464      * intractable) game with an opponent that may choose the worst
 465      * (for us) active thread to stall at any time.  We take several
 466      * precautions to bound losses (and thus bound gains), mainly in
 467      * methods tryCompensate and awaitJoin.
 468      *
 469      * Common Pool
 470      * ===========
 471      *
 472      * The static commonPool always exists after static
 473      * initialization.  Since it (or any other created pool) need
 474      * never be used, we minimize initial construction overhead and
 475      * footprint to the setup of about a dozen fields, with no nested
 476      * allocation. Most bootstrapping occurs within method
 477      * fullExternalPush during the first submission to the pool.
 478      *
 479      * When external threads submit to the common pool, they can
 480      * perform some subtask processing (see externalHelpJoin and
 481      * related methods).  We do not need to record whether these
 482      * submissions are to the common pool -- if not, externalHelpJoin
 483      * returns quickly (at the most helping to signal some common pool
 484      * workers). These submitters would otherwise be blocked waiting
 485      * for completion, so the extra effort (with liberally sprinkled
 486      * task status checks) in inapplicable cases amounts to an odd
 487      * form of limited spin-wait before blocking in ForkJoinTask.join.
 488      *
 489      * Style notes
 490      * ===========
 491      *
 492      * There is a lot of representation-level coupling among classes
 493      * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask.  The
 494      * fields of WorkQueue maintain data structures managed by
 495      * ForkJoinPool, so are directly accessed.  There is little point
 496      * trying to reduce this, since any associated future changes in
 497      * representations will need to be accompanied by algorithmic
 498      * changes anyway. Several methods intrinsically sprawl because
 499      * they must accumulate sets of consistent reads of volatiles held
 500      * in local variables.  Methods signalWork() and scan() are the
 501      * main bottlenecks, so are especially heavily
 502      * micro-optimized/mangled.  There are lots of inline assignments
 503      * (of form "while ((local = field) != 0)") which are usually the
 504      * simplest way to ensure the required read orderings (which are
 505      * sometimes critical). This leads to a "C"-like style of listing
 506      * declarations of these locals at the heads of methods or blocks.
 507      * There are several occurrences of the unusual "do {} while
 508      * (!cas...)"  which is the simplest way to force an update of a
 509      * CAS'ed variable. There are also other coding oddities (including
 510      * several unnecessary-looking hoisted null checks) that help
 511      * some methods perform reasonably even when interpreted (not
 512      * compiled).
 513      *
 514      * The order of declarations in this file is:
 515      * (1) Static utility functions
 516      * (2) Nested (static) classes
 517      * (3) Static fields
 518      * (4) Fields, along with constants used when unpacking some of them
 519      * (5) Internal control methods
 520      * (6) Callbacks and other support for ForkJoinTask methods
 521      * (7) Exported methods
 522      * (8) Static block initializing statics in minimally dependent order
 523      */
 524 
 525     // Static utilities
 526 
 527     /**
 528      * If there is a security manager, makes sure caller has
 529      * permission to modify threads.
 530      */
 531     private static void checkPermission() {
 532         SecurityManager security = System.getSecurityManager();
 533         if (security != null)
 534             security.checkPermission(modifyThreadPermission);
 535     }
 536 
 537     // Nested classes
 538 
 539     /**
 540      * Factory for creating new {@link ForkJoinWorkerThread}s.
 541      * A {@code ForkJoinWorkerThreadFactory} must be defined and used
 542      * for {@code ForkJoinWorkerThread} subclasses that extend base
 543      * functionality or initialize threads with different contexts.
 544      */
 545     public static interface ForkJoinWorkerThreadFactory {
 546         /**
 547          * Returns a new worker thread operating in the given pool.
 548          *
 549          * @param pool the pool this thread works in
 550          * @throws NullPointerException if the pool is null
 551          */
 552         public ForkJoinWorkerThread newThread(ForkJoinPool pool);
 553     }
 554 
 555     /**
 556      * Default ForkJoinWorkerThreadFactory implementation; creates a
 557      * new ForkJoinWorkerThread.
 558      */
 559     static final class DefaultForkJoinWorkerThreadFactory
 560         implements ForkJoinWorkerThreadFactory {
 561         public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
 562             return new ForkJoinWorkerThread(pool);
 563         }
 564     }
 565 
 566     /**
 567      * Per-thread records for threads that submit to pools. Currently
 568      * holds only pseudo-random seed / index that is used to choose
 569      * submission queues in method externalPush. In the future, this may
 570      * also incorporate a means to implement different task rejection
 571      * and resubmission policies.
 572      *
 573      * Seeds for submitters and workers/workQueues work in basically
 574      * the same way but are initialized and updated using slightly
 575      * different mechanics. Both are initialized using the same
 576      * approach as in class ThreadLocal, where successive values are
 577      * unlikely to collide with previous values. Seeds are then
 578      * randomly modified upon collisions using xorshifts, which
 579      * requires a non-zero seed.
 580      */
 581     static final class Submitter {
 582         int seed;
 583         Submitter(int s) { seed = s; }
 584     }
 585 
 586     /**
 587      * Class for artificial tasks that are used to replace the target
 588      * of local joins if they are removed from an interior queue slot
 589      * in WorkQueue.tryRemoveAndExec. We don't need the proxy to
 590      * actually do anything beyond having a unique identity.
 591      */
 592     static final class EmptyTask extends ForkJoinTask<Void> {
 593         private static final long serialVersionUID = -7721805057305804111L;
 594         EmptyTask() { status = ForkJoinTask.NORMAL; } // force done
 595         public final Void getRawResult() { return null; }
 596         public final void setRawResult(Void x) {}
 597         public final boolean exec() { return true; }
 598     }
 599 
 600     /**
 601      * Queues supporting work-stealing as well as external task
 602      * submission. See above for main rationale and algorithms.
 603      * Implementation relies heavily on "Unsafe" intrinsics
 604      * and selective use of "volatile":
 605      *
 606      * Field "base" is the index (mod array.length) of the least valid
 607      * queue slot, which is always the next position to steal (poll)
 608      * from if nonempty. Reads and writes require volatile orderings
 609      * but not CAS, because updates are only performed after slot
 610      * CASes.
 611      *
 612      * Field "top" is the index (mod array.length) of the next queue
 613      * slot to push to or pop from. It is written only by owner thread
 614      * for push, or under lock for external/shared push, and accessed
 615      * by other threads only after reading (volatile) base.  Both top
 616      * and base are allowed to wrap around on overflow, but (top -
 617      * base) (or more commonly -(base - top) to force volatile read of
 618      * base before top) still estimates size. The lock ("qlock") is
 619      * forced to -1 on termination, causing all further lock attempts
 620      * to fail. (Note: we don't need CAS for termination state because
 621      * upon pool shutdown, all shared-queues will stop being used
 622      * anyway.)  Nearly all lock bodies are set up so that exceptions
 623      * within lock bodies are "impossible" (modulo JVM errors that
 624      * would cause failure anyway.)
 625      *
 626      * The array slots are read and written using the emulation of
 627      * volatiles/atomics provided by Unsafe. Insertions must in
 628      * general use putOrderedObject as a form of releasing store to
 629      * ensure that all writes to the task object are ordered before
 630      * its publication in the queue.  All removals entail a CAS to
 631      * null.  The array is always a power of two. To ensure safety of
 632      * Unsafe array operations, all accesses perform explicit null
 633      * checks and implicit bounds checks via power-of-two masking.
 634      *
 635      * In addition to basic queuing support, this class contains
 636      * fields described elsewhere to control execution. It turns out
 637      * to work better memory-layout-wise to include them in this class
 638      * rather than a separate class.
 639      *
 640      * Performance on most platforms is very sensitive to placement of
 641      * instances of both WorkQueues and their arrays -- we absolutely
 642      * do not want multiple WorkQueue instances or multiple queue
 643      * arrays sharing cache lines. (It would be best for queue objects
 644      * and their arrays to share, but there is nothing available to
 645      * help arrange that).  Unfortunately, because they are recorded
 646      * in a common array, WorkQueue instances are often moved to be
 647      * adjacent by garbage collectors. To reduce impact, we use field
 648      * padding that works OK on common platforms; this effectively
 649      * trades off slightly slower average field access for the sake of
 650      * avoiding really bad worst-case access. (Until better JVM
 651      * support is in place, this padding is dependent on transient
 652      * properties of JVM field layout rules.) We also take care in
 653      * allocating, sizing and resizing the array. Non-shared queue
 654      * arrays are initialized by workers before use. Others are
 655      * allocated on first use.
 656      */
 657     static final class WorkQueue {
 658         /**
 659          * Capacity of work-stealing queue array upon initialization.
 660          * Must be a power of two; at least 4, but should be larger to
 661          * reduce or eliminate cacheline sharing among queues.
 662          * Currently, it is much larger, as a partial workaround for
 663          * the fact that JVMs often place arrays in locations that
 664          * share GC bookkeeping (especially cardmarks) such that
 665          * per-write accesses encounter serious memory contention.
 666          */
 667         static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
 668 
 669         /**
 670          * Maximum size for queue arrays. Must be a power of two less
 671          * than or equal to 1 << (31 - width of array entry) to ensure
 672          * lack of wraparound of index calculations, but defined to a
 673          * value a bit less than this to help users trap runaway
 674          * programs before saturating systems.
 675          */
 676         static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
 677 
 678         // Heuristic padding to ameliorate unfortunate memory placements
 679         volatile long pad00, pad01, pad02, pad03, pad04, pad05, pad06;
 680 
 681         int seed;                  // for random scanning; initialize nonzero
 682         volatile int eventCount;   // encoded inactivation count; < 0 if inactive
 683         int nextWait;              // encoded record of next event waiter
 684         int hint;                  // steal or signal hint (index)
 685         int poolIndex;             // index of this queue in pool (or 0)
 686         final int mode;            // 0: lifo, > 0: fifo, < 0: shared
 687         int nsteals;               // number of steals
 688         volatile int qlock;        // 1: locked, -1: terminate; else 0
 689         volatile int base;         // index of next slot for poll
 690         int top;                   // index of next slot for push
 691         ForkJoinTask<?>[] array;   // the elements (initially unallocated)
 692         final ForkJoinPool pool;   // the containing pool (may be null)
 693         final ForkJoinWorkerThread owner; // owning thread or null if shared
 694         volatile Thread parker;    // == owner during call to park; else null
 695         volatile ForkJoinTask<?> currentJoin;  // task being joined in awaitJoin
 696         ForkJoinTask<?> currentSteal; // current non-local task being executed
 697 
 698         volatile Object pad10, pad11, pad12, pad13, pad14, pad15, pad16, pad17;
 699         volatile Object pad18, pad19, pad1a, pad1b, pad1c, pad1d;
 700 
 701         WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner, int mode,
 702                   int seed) {
 703             this.pool = pool;
 704             this.owner = owner;
 705             this.mode = mode;
 706             this.seed = seed;
 707             // Place indices in the center of array (that is not yet allocated)
 708             base = top = INITIAL_QUEUE_CAPACITY >>> 1;
 709         }
 710 
 711         /**
 712          * Returns the approximate number of tasks in the queue.
 713          */
 714         final int queueSize() {
 715             int n = base - top;       // non-owner callers must read base first
 716             return (n >= 0) ? 0 : -n; // ignore transient negative
 717         }
 718 
 719        /**
 720          * Provides a more accurate estimate of whether this queue has
 721          * any tasks than does queueSize, by checking whether a
 722          * near-empty queue has at least one unclaimed task.
 723          */
 724         final boolean isEmpty() {
 725             ForkJoinTask<?>[] a; int m, s;
 726             int n = base - (s = top);
 727             return (n >= 0 ||
 728                     (n == -1 &&
 729                      ((a = array) == null ||
 730                       (m = a.length - 1) < 0 ||
 731                       U.getObject
 732                       (a, (long)((m & (s - 1)) << ASHIFT) + ABASE) == null)));
 733         }
 734 
 735         /**
 736          * Pushes a task. Call only by owner in unshared queues.  (The
 737          * shared-queue version is embedded in method externalPush.)
 738          *
 739          * @param task the task. Caller must ensure non-null.
 740          * @throw RejectedExecutionException if array cannot be resized
 741          */
 742         final void push(ForkJoinTask<?> task) {
 743             ForkJoinTask<?>[] a; ForkJoinPool p;
 744             int s = top, m, n;
 745             if ((a = array) != null) {    // ignore if queue removed
 746                 int j = (((m = a.length - 1) & s) << ASHIFT) + ABASE;
 747                 U.putOrderedObject(a, j, task);
 748                 if ((n = (top = s + 1) - base) <= 2) {
 749                     if ((p = pool) != null)
 750                         p.signalWork(this);
 751                 }
 752                 else if (n >= m)
 753                     growArray();
 754             }
 755         }
 756 
 757        /**
 758          * Initializes or doubles the capacity of array. Call either
 759          * by owner or with lock held -- it is OK for base, but not
 760          * top, to move while resizings are in progress.
 761          */
 762         final ForkJoinTask<?>[] growArray() {
 763             ForkJoinTask<?>[] oldA = array;
 764             int size = oldA != null ? oldA.length << 1 : INITIAL_QUEUE_CAPACITY;
 765             if (size > MAXIMUM_QUEUE_CAPACITY)
 766                 throw new RejectedExecutionException("Queue capacity exceeded");
 767             int oldMask, t, b;
 768             ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size];
 769             if (oldA != null && (oldMask = oldA.length - 1) >= 0 &&
 770                 (t = top) - (b = base) > 0) {
 771                 int mask = size - 1;
 772                 do {
 773                     ForkJoinTask<?> x;
 774                     int oldj = ((b & oldMask) << ASHIFT) + ABASE;
 775                     int j    = ((b &    mask) << ASHIFT) + ABASE;
 776                     x = (ForkJoinTask<?>)U.getObjectVolatile(oldA, oldj);
 777                     if (x != null &&
 778                         U.compareAndSwapObject(oldA, oldj, x, null))
 779                         U.putObjectVolatile(a, j, x);
 780                 } while (++b != t);
 781             }
 782             return a;
 783         }
 784 
 785         /**
 786          * Takes next task, if one exists, in LIFO order.  Call only
 787          * by owner in unshared queues.
 788          */
 789         final ForkJoinTask<?> pop() {
 790             ForkJoinTask<?>[] a; ForkJoinTask<?> t; int m;
 791             if ((a = array) != null && (m = a.length - 1) >= 0) {
 792                 for (int s; (s = top - 1) - base >= 0;) {
 793                     long j = ((m & s) << ASHIFT) + ABASE;
 794                     if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)
 795                         break;
 796                     if (U.compareAndSwapObject(a, j, t, null)) {
 797                         top = s;
 798                         return t;
 799                     }
 800                 }
 801             }
 802             return null;
 803         }
 804 
 805         /**
 806          * Takes a task in FIFO order if b is base of queue and a task
 807          * can be claimed without contention. Specialized versions
 808          * appear in ForkJoinPool methods scan and tryHelpStealer.
 809          */
 810         final ForkJoinTask<?> pollAt(int b) {
 811             ForkJoinTask<?> t; ForkJoinTask<?>[] a;
 812             if ((a = array) != null) {
 813                 int j = (((a.length - 1) & b) << ASHIFT) + ABASE;
 814                 if ((t = (ForkJoinTask<?>)U.getObjectVolatile(a, j)) != null &&
 815                     base == b &&
 816                     U.compareAndSwapObject(a, j, t, null)) {
 817                     base = b + 1;
 818                     return t;
 819                 }
 820             }
 821             return null;
 822         }
 823 
 824         /**
 825          * Takes next task, if one exists, in FIFO order.
 826          */
 827         final ForkJoinTask<?> poll() {
 828             ForkJoinTask<?>[] a; int b; ForkJoinTask<?> t;
 829             while ((b = base) - top < 0 && (a = array) != null) {
 830                 int j = (((a.length - 1) & b) << ASHIFT) + ABASE;
 831                 t = (ForkJoinTask<?>)U.getObjectVolatile(a, j);
 832                 if (t != null) {
 833                     if (base == b &&
 834                         U.compareAndSwapObject(a, j, t, null)) {
 835                         base = b + 1;
 836                         return t;
 837                     }
 838                 }
 839                 else if (base == b) {
 840                     if (b + 1 == top)
 841                         break;
 842                     Thread.yield(); // wait for lagging update (very rare)
 843                 }
 844             }
 845             return null;
 846         }
 847 
 848         /**
 849          * Takes next task, if one exists, in order specified by mode.
 850          */
 851         final ForkJoinTask<?> nextLocalTask() {
 852             return mode == 0 ? pop() : poll();
 853         }
 854 
 855         /**
 856          * Returns next task, if one exists, in order specified by mode.
 857          */
 858         final ForkJoinTask<?> peek() {
 859             ForkJoinTask<?>[] a = array; int m;
 860             if (a == null || (m = a.length - 1) < 0)
 861                 return null;
 862             int i = mode == 0 ? top - 1 : base;
 863             int j = ((i & m) << ASHIFT) + ABASE;
 864             return (ForkJoinTask<?>)U.getObjectVolatile(a, j);
 865         }
 866 
 867         /**
 868          * Pops the given task only if it is at the current top.
 869          * (A shared version is available only via FJP.tryExternalUnpush)
 870          */
 871         final boolean tryUnpush(ForkJoinTask<?> t) {
 872             ForkJoinTask<?>[] a; int s;
 873             if ((a = array) != null && (s = top) != base &&
 874                 U.compareAndSwapObject
 875                 (a, (((a.length - 1) & --s) << ASHIFT) + ABASE, t, null)) {
 876                 top = s;
 877                 return true;
 878             }
 879             return false;
 880         }
 881 
 882         /**
 883          * Removes and cancels all known tasks, ignoring any exceptions.
 884          */
 885         final void cancelAll() {
 886             ForkJoinTask.cancelIgnoringExceptions(currentJoin);
 887             ForkJoinTask.cancelIgnoringExceptions(currentSteal);
 888             for (ForkJoinTask<?> t; (t = poll()) != null; )
 889                 ForkJoinTask.cancelIgnoringExceptions(t);
 890         }
 891 
 892         /**
 893          * Computes next value for random probes.  Scans don't require
 894          * a very high quality generator, but also not a crummy one.
 895          * Marsaglia xor-shift is cheap and works well enough.  Note:
 896          * This is manually inlined in its usages in ForkJoinPool to
 897          * avoid writes inside busy scan loops.
 898          */
 899         final int nextSeed() {
 900             int r = seed;
 901             r ^= r << 13;
 902             r ^= r >>> 17;
 903             return seed = r ^= r << 5;
 904         }
 905 
 906         // Specialized execution methods
 907 
 908         /**
 909          * Pops and runs tasks until empty.
 910          */
 911         private void popAndExecAll() {
 912             // A bit faster than repeated pop calls
 913             ForkJoinTask<?>[] a; int m, s; long j; ForkJoinTask<?> t;
 914             while ((a = array) != null && (m = a.length - 1) >= 0 &&
 915                    (s = top - 1) - base >= 0 &&
 916                    (t = ((ForkJoinTask<?>)
 917                          U.getObject(a, j = ((m & s) << ASHIFT) + ABASE)))
 918                    != null) {
 919                 if (U.compareAndSwapObject(a, j, t, null)) {
 920                     top = s;
 921                     t.doExec();
 922                 }
 923             }
 924         }
 925 
 926         /**
 927          * Polls and runs tasks until empty.
 928          */
 929         private void pollAndExecAll() {
 930             for (ForkJoinTask<?> t; (t = poll()) != null;)
 931                 t.doExec();
 932         }
 933 
 934         /**
 935          * If present, removes from queue and executes the given task,
 936          * or any other cancelled task. Returns (true) on any CAS
 937          * or consistency check failure so caller can retry.
 938          *
 939          * @return false if no progress can be made, else true;
 940          */
 941         final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
 942             boolean stat = true, removed = false, empty = true;
 943             ForkJoinTask<?>[] a; int m, s, b, n;
 944             if ((a = array) != null && (m = a.length - 1) >= 0 &&
 945                 (n = (s = top) - (b = base)) > 0) {
 946                 for (ForkJoinTask<?> t;;) {           // traverse from s to b
 947                     int j = ((--s & m) << ASHIFT) + ABASE;
 948                     t = (ForkJoinTask<?>)U.getObjectVolatile(a, j);
 949                     if (t == null)                    // inconsistent length
 950                         break;
 951                     else if (t == task) {
 952                         if (s + 1 == top) {           // pop
 953                             if (!U.compareAndSwapObject(a, j, task, null))
 954                                 break;
 955                             top = s;
 956                             removed = true;
 957                         }
 958                         else if (base == b)           // replace with proxy
 959                             removed = U.compareAndSwapObject(a, j, task,
 960                                                              new EmptyTask());
 961                         break;
 962                     }
 963                     else if (t.status >= 0)
 964                         empty = false;
 965                     else if (s + 1 == top) {          // pop and throw away
 966                         if (U.compareAndSwapObject(a, j, t, null))
 967                             top = s;
 968                         break;
 969                     }
 970                     if (--n == 0) {
 971                         if (!empty && base == b)
 972                             stat = false;
 973                         break;
 974                     }
 975                 }
 976             }
 977             if (removed)
 978                 task.doExec();
 979             return stat;
 980         }
 981 
 982         /**
 983          * Polls for and executes the given task or any other task in
 984          * its CountedCompleter computation
 985          */
 986         final boolean pollAndExecCC(ForkJoinTask<?> root) {
 987             ForkJoinTask<?>[] a; int b; Object o;
 988             outer: while ((b = base) - top < 0 && (a = array) != null) {
 989                 long j = (((a.length - 1) & b) << ASHIFT) + ABASE;
 990                 if ((o = U.getObject(a, j)) == null ||
 991                     !(o instanceof CountedCompleter))
 992                     break;
 993                 for (CountedCompleter<?> t = (CountedCompleter<?>)o, r = t;;) {
 994                     if (r == root) {
 995                         if (base == b &&
 996                             U.compareAndSwapObject(a, j, t, null)) {
 997                             base = b + 1;
 998                             t.doExec();
 999                             return true;
1000                         }
1001                         else
1002                             break; // restart
1003                     }
1004                     if ((r = r.completer) == null)
1005                         break outer; // not part of root computation
1006                 }
1007             }
1008             return false;
1009         }
1010 
1011         /**
1012          * Executes a top-level task and any local tasks remaining
1013          * after execution.
1014          */
1015         final void runTask(ForkJoinTask<?> t) {
1016             if (t != null) {
1017                 (currentSteal = t).doExec();
1018                 currentSteal = null;
1019                 ++nsteals;
1020                 if (base - top < 0) {       // process remaining local tasks
1021                     if (mode == 0)
1022                         popAndExecAll();
1023                     else
1024                         pollAndExecAll();
1025                 }
1026             }
1027         }
1028 
1029         /**
1030          * Executes a non-top-level (stolen) task.
1031          */
1032         final void runSubtask(ForkJoinTask<?> t) {
1033             if (t != null) {
1034                 ForkJoinTask<?> ps = currentSteal;
1035                 (currentSteal = t).doExec();
1036                 currentSteal = ps;
1037             }
1038         }
1039 
1040         /**
1041          * Returns true if owned and not known to be blocked.
1042          */
1043         final boolean isApparentlyUnblocked() {
1044             Thread wt; Thread.State s;
1045             return (eventCount >= 0 &&
1046                     (wt = owner) != null &&
1047                     (s = wt.getState()) != Thread.State.BLOCKED &&
1048                     s != Thread.State.WAITING &&
1049                     s != Thread.State.TIMED_WAITING);
1050         }
1051 
1052         // Unsafe mechanics
1053         private static final sun.misc.Unsafe U;
1054         private static final long QLOCK;
1055         private static final int ABASE;
1056         private static final int ASHIFT;
1057         static {
1058             int s;
1059             try {
1060                 U = sun.misc.Unsafe.getUnsafe();
1061                 Class<?> k = WorkQueue.class;
1062                 Class<?> ak = ForkJoinTask[].class;
1063                 QLOCK = U.objectFieldOffset
1064                     (k.getDeclaredField("qlock"));
1065                 ABASE = U.arrayBaseOffset(ak);
1066                 s = U.arrayIndexScale(ak);
1067             } catch (Exception e) {
1068                 throw new Error(e);
1069             }
1070             if ((s & (s-1)) != 0)
1071                 throw new Error("data type scale not a power of two");
1072             ASHIFT = 31 - Integer.numberOfLeadingZeros(s);
1073         }
1074     }
1075 
1076     // static fields (initialized in static initializer below)
1077 
1078     /**
1079      * Creates a new ForkJoinWorkerThread. This factory is used unless
1080      * overridden in ForkJoinPool constructors.
1081      */
1082     public static final ForkJoinWorkerThreadFactory
1083         defaultForkJoinWorkerThreadFactory;
1084 
1085     /**
1086      * Per-thread submission bookkeeping. Shared across all pools
1087      * to reduce ThreadLocal pollution and because random motion
1088      * to avoid contention in one pool is likely to hold for others.
1089      * Lazily initialized on first submission (but null-checked
1090      * in other contexts to avoid unnecessary initialization).
1091      */
1092     static final ThreadLocal<Submitter> submitters;
1093 
1094     /**
1095      * Permission required for callers of methods that may start or
1096      * kill threads.
1097      */
1098     private static final RuntimePermission modifyThreadPermission;
1099 
1100     /**
1101      * Common (static) pool. Non-null for public use unless a static
1102      * construction exception, but internal usages null-check on use
1103      * to paranoically avoid potential initialization circularities
1104      * as well as to simplify generated code.
1105      */
1106     static final ForkJoinPool commonPool;
1107 
1108     /**
1109      * Common pool parallelism. Must equal commonPool.parallelism.
1110      */
1111     static final int commonPoolParallelism;
1112 
1113     /**
1114      * Sequence number for creating workerNamePrefix.
1115      */
1116     private static int poolNumberSequence;
1117 
1118     /**
1119      * Return the next sequence number. We don't expect this to
1120      * ever contend so use simple builtin sync.
1121      */
1122     private static final synchronized int nextPoolId() {
1123         return ++poolNumberSequence;
1124     }
1125 
1126     // static constants
1127 
1128     /**
1129      * Initial timeout value (in nanoseconds) for the thread
1130      * triggering quiescence to park waiting for new work. On timeout,
1131      * the thread will instead try to shrink the number of
1132      * workers. The value should be large enough to avoid overly
1133      * aggressive shrinkage during most transient stalls (long GCs
1134      * etc).
1135      */
1136     private static final long IDLE_TIMEOUT      = 2000L * 1000L * 1000L; // 2sec
1137 
1138     /**
1139      * Timeout value when there are more threads than parallelism level
1140      */
1141     private static final long FAST_IDLE_TIMEOUT =  200L * 1000L * 1000L;
1142 
1143     /**
1144      * Tolerance for idle timeouts, to cope with timer undershoots
1145      */
1146     private static final long TIMEOUT_SLOP = 2000000L;
1147 
1148     /**
1149      * The maximum stolen->joining link depth allowed in method
1150      * tryHelpStealer.  Must be a power of two.  Depths for legitimate
1151      * chains are unbounded, but we use a fixed constant to avoid
1152      * (otherwise unchecked) cycles and to bound staleness of
1153      * traversal parameters at the expense of sometimes blocking when
1154      * we could be helping.
1155      */
1156     private static final int MAX_HELP = 64;
1157 
1158     /**
1159      * Increment for seed generators. See class ThreadLocal for
1160      * explanation.
1161      */
1162     private static final int SEED_INCREMENT = 0x61c88647;
1163 
1164     /**
1165      * Bits and masks for control variables
1166      *
1167      * Field ctl is a long packed with:
1168      * AC: Number of active running workers minus target parallelism (16 bits)
1169      * TC: Number of total workers minus target parallelism (16 bits)
1170      * ST: true if pool is terminating (1 bit)
1171      * EC: the wait count of top waiting thread (15 bits)
1172      * ID: poolIndex of top of Treiber stack of waiters (16 bits)
1173      *
1174      * When convenient, we can extract the upper 32 bits of counts and
1175      * the lower 32 bits of queue state, u = (int)(ctl >>> 32) and e =
1176      * (int)ctl.  The ec field is never accessed alone, but always
1177      * together with id and st. The offsets of counts by the target
1178      * parallelism and the positionings of fields makes it possible to
1179      * perform the most common checks via sign tests of fields: When
1180      * ac is negative, there are not enough active workers, when tc is
1181      * negative, there are not enough total workers, and when e is
1182      * negative, the pool is terminating.  To deal with these possibly
1183      * negative fields, we use casts in and out of "short" and/or
1184      * signed shifts to maintain signedness.
1185      *
1186      * When a thread is queued (inactivated), its eventCount field is
1187      * set negative, which is the only way to tell if a worker is
1188      * prevented from executing tasks, even though it must continue to
1189      * scan for them to avoid queuing races. Note however that
1190      * eventCount updates lag releases so usage requires care.
1191      *
1192      * Field plock is an int packed with:
1193      * SHUTDOWN: true if shutdown is enabled (1 bit)
1194      * SEQ:  a sequence lock, with PL_LOCK bit set if locked (30 bits)
1195      * SIGNAL: set when threads may be waiting on the lock (1 bit)
1196      *
1197      * The sequence number enables simple consistency checks:
1198      * Staleness of read-only operations on the workQueues array can
1199      * be checked by comparing plock before vs after the reads.
1200      */
1201 
1202     // bit positions/shifts for fields
1203     private static final int  AC_SHIFT   = 48;
1204     private static final int  TC_SHIFT   = 32;
1205     private static final int  ST_SHIFT   = 31;
1206     private static final int  EC_SHIFT   = 16;
1207 
1208     // bounds
1209     private static final int  SMASK      = 0xffff;  // short bits
1210     private static final int  MAX_CAP    = 0x7fff;  // max #workers - 1
1211     private static final int  EVENMASK   = 0xfffe;  // even short bits
1212     private static final int  SQMASK     = 0x007e;  // max 64 (even) slots
1213     private static final int  SHORT_SIGN = 1 << 15;
1214     private static final int  INT_SIGN   = 1 << 31;
1215 
1216     // masks
1217     private static final long STOP_BIT   = 0x0001L << ST_SHIFT;
1218     private static final long AC_MASK    = ((long)SMASK) << AC_SHIFT;
1219     private static final long TC_MASK    = ((long)SMASK) << TC_SHIFT;
1220 
1221     // units for incrementing and decrementing
1222     private static final long TC_UNIT    = 1L << TC_SHIFT;
1223     private static final long AC_UNIT    = 1L << AC_SHIFT;
1224 
1225     // masks and units for dealing with u = (int)(ctl >>> 32)
1226     private static final int  UAC_SHIFT  = AC_SHIFT - 32;
1227     private static final int  UTC_SHIFT  = TC_SHIFT - 32;
1228     private static final int  UAC_MASK   = SMASK << UAC_SHIFT;
1229     private static final int  UTC_MASK   = SMASK << UTC_SHIFT;
1230     private static final int  UAC_UNIT   = 1 << UAC_SHIFT;
1231     private static final int  UTC_UNIT   = 1 << UTC_SHIFT;
1232 
1233     // masks and units for dealing with e = (int)ctl
1234     private static final int E_MASK      = 0x7fffffff; // no STOP_BIT
1235     private static final int E_SEQ       = 1 << EC_SHIFT;
1236 
1237     // plock bits
1238     private static final int SHUTDOWN    = 1 << 31;
1239     private static final int PL_LOCK     = 2;
1240     private static final int PL_SIGNAL   = 1;
1241     private static final int PL_SPINS    = 1 << 8;
1242 
1243     // access mode for WorkQueue
1244     static final int LIFO_QUEUE          =  0;
1245     static final int FIFO_QUEUE          =  1;
1246     static final int SHARED_QUEUE        = -1;
1247 
1248     // bounds for #steps in scan loop -- must be power 2 minus 1
1249     private static final int MIN_SCAN    = 0x1ff;   // cover estimation slop
1250     private static final int MAX_SCAN    = 0x1ffff; // 4 * max workers
1251 
1252     // Instance fields
1253 
1254     /*
1255      * Field layout of this class tends to matter more than one would
1256      * like. Runtime layout order is only loosely related to
1257      * declaration order and may differ across JVMs, but the following
1258      * empirically works OK on current JVMs.
1259      */
1260 
1261     // Heuristic padding to ameliorate unfortunate memory placements
1262     volatile long pad00, pad01, pad02, pad03, pad04, pad05, pad06;
1263 
1264     volatile long stealCount;                  // collects worker counts
1265     volatile long ctl;                         // main pool control
1266     volatile int plock;                        // shutdown status and seqLock
1267     volatile int indexSeed;                    // worker/submitter index seed
1268     final int config;                          // mode and parallelism level
1269     WorkQueue[] workQueues;                    // main registry
1270     final ForkJoinWorkerThreadFactory factory;
1271     final Thread.UncaughtExceptionHandler ueh; // per-worker UEH
1272     final String workerNamePrefix;             // to create worker name string
1273 
1274     volatile Object pad10, pad11, pad12, pad13, pad14, pad15, pad16, pad17;
1275     volatile Object pad18, pad19, pad1a, pad1b;
1276 
1277     /*
1278      * Acquires the plock lock to protect worker array and related
1279      * updates. This method is called only if an initial CAS on plock
1280      * fails. This acts as a spinLock for normal cases, but falls back
1281      * to builtin monitor to block when (rarely) needed. This would be
1282      * a terrible idea for a highly contended lock, but works fine as
1283      * a more conservative alternative to a pure spinlock.
1284      */
1285     private int acquirePlock() {
1286         int spins = PL_SPINS, r = 0, ps, nps;
1287         for (;;) {
1288             if (((ps = plock) & PL_LOCK) == 0 &&
1289                 U.compareAndSwapInt(this, PLOCK, ps, nps = ps + PL_LOCK))
1290                 return nps;
1291             else if (r == 0) { // randomize spins if possible
1292                 Thread t = Thread.currentThread(); WorkQueue w; Submitter z;
1293                 if ((t instanceof ForkJoinWorkerThread) &&
1294                     (w = ((ForkJoinWorkerThread)t).workQueue) != null)
1295                     r = w.seed;
1296                 else if ((z = submitters.get()) != null)
1297                     r = z.seed;
1298                 else
1299                     r = 1;
1300             }
1301             else if (spins >= 0) {
1302                 r ^= r << 1; r ^= r >>> 3; r ^= r << 10; // xorshift
1303                 if (r >= 0)
1304                     --spins;
1305             }
1306             else if (U.compareAndSwapInt(this, PLOCK, ps, ps | PL_SIGNAL)) {
1307                 synchronized (this) {
1308                     if ((plock & PL_SIGNAL) != 0) {
1309                         try {
1310                             wait();
1311                         } catch (InterruptedException ie) {
1312                             try {
1313                                 Thread.currentThread().interrupt();
1314                             } catch (SecurityException ignore) {
1315                             }
1316                         }
1317                     }
1318                     else
1319                         notifyAll();
1320                 }
1321             }
1322         }
1323     }
1324 
1325     /**
1326      * Unlocks and signals any thread waiting for plock. Called only
1327      * when CAS of seq value for unlock fails.
1328      */
1329     private void releasePlock(int ps) {
1330         plock = ps;
1331         synchronized (this) { notifyAll(); }
1332     }
1333 
1334     /**
1335      * Performs secondary initialization, called when plock is zero.
1336      * Creates workQueue array and sets plock to a valid value.  The
1337      * lock body must be exception-free (so no try/finally) so we
1338      * optimistically allocate new array outside the lock and throw
1339      * away if (very rarely) not needed. (A similar tactic is used in
1340      * fullExternalPush.)  Because the plock seq value can eventually
1341      * wrap around zero, this method harmlessly fails to reinitialize
1342      * if workQueues exists, while still advancing plock.
1343      *
1344      * Additionally tries to create the first worker.
1345      */
1346     private void initWorkers() {
1347         WorkQueue[] ws, nws; int ps;
1348         int p = config & SMASK;        // find power of two table size
1349         int n = (p > 1) ? p - 1 : 1;   // ensure at least 2 slots
1350         n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16;
1351         n = (n + 1) << 1;
1352         if ((ws = workQueues) == null || ws.length == 0)
1353             nws = new WorkQueue[n];
1354         else
1355             nws = null;
1356         if (((ps = plock) & PL_LOCK) != 0 ||
1357             !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
1358             ps = acquirePlock();
1359         if (((ws = workQueues) == null || ws.length == 0) && nws != null)
1360             workQueues = nws;
1361         int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
1362         if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
1363             releasePlock(nps);
1364         tryAddWorker();
1365     }
1366 
1367     /**
1368      * Tries to create and start one worker if fewer than target
1369      * parallelism level exist. Adjusts counts etc on failure.
1370      */
1371     private void tryAddWorker() {
1372         long c; int u;
1373         while ((u = (int)((c = ctl) >>> 32)) < 0 &&
1374                (u & SHORT_SIGN) != 0 && (int)c == 0) {
1375             long nc = (long)(((u + UTC_UNIT) & UTC_MASK) |
1376                              ((u + UAC_UNIT) & UAC_MASK)) << 32;
1377             if (U.compareAndSwapLong(this, CTL, c, nc)) {
1378                 ForkJoinWorkerThreadFactory fac;
1379                 Throwable ex = null;
1380                 ForkJoinWorkerThread wt = null;
1381                 try {
1382                     if ((fac = factory) != null &&
1383                         (wt = fac.newThread(this)) != null) {
1384                         wt.start();
1385                         break;
1386                     }
1387                 } catch (Throwable e) {
1388                     ex = e;
1389                 }
1390                 deregisterWorker(wt, ex);
1391                 break;
1392             }
1393         }
1394     }
1395 
1396     //  Registering and deregistering workers
1397 
1398     /**
1399      * Callback from ForkJoinWorkerThread to establish and record its
1400      * WorkQueue. To avoid scanning bias due to packing entries in
1401      * front of the workQueues array, we treat the array as a simple
1402      * power-of-two hash table using per-thread seed as hash,
1403      * expanding as needed.
1404      *
1405      * @param wt the worker thread
1406      * @return the worker's queue
1407      */
1408     final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
1409         Thread.UncaughtExceptionHandler handler; WorkQueue[] ws; int s, ps;
1410         wt.setDaemon(true);
1411         if ((handler = ueh) != null)
1412             wt.setUncaughtExceptionHandler(handler);
1413         do {} while (!U.compareAndSwapInt(this, INDEXSEED, s = indexSeed,
1414                                           s += SEED_INCREMENT) ||
1415                      s == 0); // skip 0
1416         WorkQueue w = new WorkQueue(this, wt, config >>> 16, s);
1417         if (((ps = plock) & PL_LOCK) != 0 ||
1418             !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
1419             ps = acquirePlock();
1420         int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
1421         try {
1422             if ((ws = workQueues) != null) {    // skip if shutting down
1423                 int n = ws.length, m = n - 1;
1424                 int r = (s << 1) | 1;           // use odd-numbered indices
1425                 if (ws[r &= m] != null) {       // collision
1426                     int probes = 0;             // step by approx half size
1427                     int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
1428                     while (ws[r = (r + step) & m] != null) {
1429                         if (++probes >= n) {
1430                             workQueues = ws = Arrays.copyOf(ws, n <<= 1);
1431                             m = n - 1;
1432                             probes = 0;
1433                         }
1434                     }
1435                 }
1436                 w.eventCount = w.poolIndex = r; // volatile write orders
1437                 ws[r] = w;
1438             }
1439         } finally {
1440             if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
1441                 releasePlock(nps);
1442         }
1443         wt.setName(workerNamePrefix.concat(Integer.toString(w.poolIndex)));
1444         return w;
1445     }
1446 
1447     /**
1448      * Final callback from terminating worker, as well as upon failure
1449      * to construct or start a worker.  Removes record of worker from
1450      * array, and adjusts counts. If pool is shutting down, tries to
1451      * complete termination.
1452      *
1453      * @param wt the worker thread or null if construction failed
1454      * @param ex the exception causing failure, or null if none
1455      */
1456     final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
1457         WorkQueue w = null;
1458         if (wt != null && (w = wt.workQueue) != null) {
1459             int ps;
1460             w.qlock = -1;                // ensure set
1461             long ns = w.nsteals, sc;     // collect steal count
1462             do {} while (!U.compareAndSwapLong(this, STEALCOUNT,
1463                                                sc = stealCount, sc + ns));
1464             if (((ps = plock) & PL_LOCK) != 0 ||
1465                 !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
1466                 ps = acquirePlock();
1467             int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
1468             try {
1469                 int idx = w.poolIndex;
1470                 WorkQueue[] ws = workQueues;
1471                 if (ws != null && idx >= 0 && idx < ws.length && ws[idx] == w)
1472                     ws[idx] = null;
1473             } finally {
1474                 if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
1475                     releasePlock(nps);
1476             }
1477         }
1478 
1479         long c;                          // adjust ctl counts
1480         do {} while (!U.compareAndSwapLong
1481                      (this, CTL, c = ctl, (((c - AC_UNIT) & AC_MASK) |
1482                                            ((c - TC_UNIT) & TC_MASK) |
1483                                            (c & ~(AC_MASK|TC_MASK)))));
1484 
1485         if (!tryTerminate(false, false) && w != null && w.array != null) {
1486             w.cancelAll();               // cancel remaining tasks
1487             WorkQueue[] ws; WorkQueue v; Thread p; int u, i, e;
1488             while ((u = (int)((c = ctl) >>> 32)) < 0 && (e = (int)c) >= 0) {
1489                 if (e > 0) {             // activate or create replacement
1490                     if ((ws = workQueues) == null ||
1491                         (i = e & SMASK) >= ws.length ||
1492                         (v = ws[i]) != null)
1493                         break;
1494                     long nc = (((long)(v.nextWait & E_MASK)) |
1495                                ((long)(u + UAC_UNIT) << 32));
1496                     if (v.eventCount != (e | INT_SIGN))
1497                         break;
1498                     if (U.compareAndSwapLong(this, CTL, c, nc)) {
1499                         v.eventCount = (e + E_SEQ) & E_MASK;
1500                         if ((p = v.parker) != null)
1501                             U.unpark(p);
1502                         break;
1503                     }
1504                 }
1505                 else {
1506                     if ((short)u < 0)
1507                         tryAddWorker();
1508                     break;
1509                 }
1510             }
1511         }
1512         if (ex == null)                     // help clean refs on way out
1513             ForkJoinTask.helpExpungeStaleExceptions();
1514         else                                // rethrow
1515             ForkJoinTask.rethrow(ex);
1516     }
1517 
1518     // Submissions
1519 
1520     /**
1521      * Unless shutting down, adds the given task to a submission queue
1522      * at submitter's current queue index (modulo submission
1523      * range). Only the most common path is directly handled in this
1524      * method. All others are relayed to fullExternalPush.
1525      *
1526      * @param task the task. Caller must ensure non-null.
1527      */
1528     final void externalPush(ForkJoinTask<?> task) {
1529         WorkQueue[] ws; WorkQueue q; Submitter z; int m; ForkJoinTask<?>[] a;
1530         if ((z = submitters.get()) != null && plock > 0 &&
1531             (ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
1532             (q = ws[m & z.seed & SQMASK]) != null &&
1533             U.compareAndSwapInt(q, QLOCK, 0, 1)) { // lock
1534             int b = q.base, s = q.top, n, an;
1535             if ((a = q.array) != null && (an = a.length) > (n = s + 1 - b)) {
1536                 int j = (((an - 1) & s) << ASHIFT) + ABASE;
1537                 U.putOrderedObject(a, j, task);
1538                 q.top = s + 1;                     // push on to deque
1539                 q.qlock = 0;
1540                 if (n <= 2)
1541                     signalWork(q);
1542                 return;
1543             }
1544             q.qlock = 0;
1545         }
1546         fullExternalPush(task);
1547     }
1548 
1549     /**
1550      * Full version of externalPush. This method is called, among
1551      * other times, upon the first submission of the first task to the
1552      * pool, so must perform secondary initialization (via
1553      * initWorkers). It also detects first submission by an external
1554      * thread by looking up its ThreadLocal, and creates a new shared
1555      * queue if the one at index if empty or contended. The plock lock
1556      * body must be exception-free (so no try/finally) so we
1557      * optimistically allocate new queues outside the lock and throw
1558      * them away if (very rarely) not needed.
1559      */
1560     private void fullExternalPush(ForkJoinTask<?> task) {
1561         int r = 0; // random index seed
1562         for (Submitter z = submitters.get();;) {
1563             WorkQueue[] ws; WorkQueue q; int ps, m, k;
1564             if (z == null) {
1565                 if (U.compareAndSwapInt(this, INDEXSEED, r = indexSeed,
1566                                         r += SEED_INCREMENT) && r != 0)
1567                     submitters.set(z = new Submitter(r));
1568             }
1569             else if (r == 0) {               // move to a different index
1570                 r = z.seed;
1571                 r ^= r << 13;                // same xorshift as WorkQueues
1572                 r ^= r >>> 17;
1573                 z.seed = r ^ (r << 5);
1574             }
1575             else if ((ps = plock) < 0)
1576                 throw new RejectedExecutionException();
1577             else if (ps == 0 || (ws = workQueues) == null ||
1578                      (m = ws.length - 1) < 0)
1579                 initWorkers();
1580             else if ((q = ws[k = r & m & SQMASK]) != null) {
1581                 if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
1582                     ForkJoinTask<?>[] a = q.array;
1583                     int s = q.top;
1584                     boolean submitted = false;
1585                     try {                      // locked version of push
1586                         if ((a != null && a.length > s + 1 - q.base) ||
1587                             (a = q.growArray()) != null) {   // must presize
1588                             int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
1589                             U.putOrderedObject(a, j, task);
1590                             q.top = s + 1;
1591                             submitted = true;
1592                         }
1593                     } finally {
1594                         q.qlock = 0;  // unlock
1595                     }
1596                     if (submitted) {
1597                         signalWork(q);
1598                         return;
1599                     }
1600                 }
1601                 r = 0; // move on failure
1602             }
1603             else if (((ps = plock) & PL_LOCK) == 0) { // create new queue
1604                 q = new WorkQueue(this, null, SHARED_QUEUE, r);
1605                 if (((ps = plock) & PL_LOCK) != 0 ||
1606                     !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
1607                     ps = acquirePlock();
1608                 if ((ws = workQueues) != null && k < ws.length && ws[k] == null)
1609                     ws[k] = q;
1610                 int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
1611                 if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
1612                     releasePlock(nps);
1613             }
1614             else
1615                 r = 0; // try elsewhere while lock held
1616         }
1617     }
1618 
1619     // Maintaining ctl counts
1620 
1621     /**
1622      * Increments active count; mainly called upon return from blocking.
1623      */
1624     final void incrementActiveCount() {
1625         long c;
1626         do {} while (!U.compareAndSwapLong(this, CTL, c = ctl, c + AC_UNIT));
1627     }
1628 
1629     /**
1630      * Tries to create or activate a worker if too few are active.
1631      *
1632      * @param q the (non-null) queue holding tasks to be signalled
1633      */
1634     final void signalWork(WorkQueue q) {
1635         int hint = q.poolIndex;
1636         long c; int e, u, i, n; WorkQueue[] ws; WorkQueue w; Thread p;
1637         while ((u = (int)((c = ctl) >>> 32)) < 0) {
1638             if ((e = (int)c) > 0) {
1639                 if ((ws = workQueues) != null && ws.length > (i = e & SMASK) &&
1640                     (w = ws[i]) != null && w.eventCount == (e | INT_SIGN)) {
1641                     long nc = (((long)(w.nextWait & E_MASK)) |
1642                                ((long)(u + UAC_UNIT) << 32));
1643                     if (U.compareAndSwapLong(this, CTL, c, nc)) {
1644                         w.hint = hint;
1645                         w.eventCount = (e + E_SEQ) & E_MASK;
1646                         if ((p = w.parker) != null)
1647                             U.unpark(p);
1648                         break;
1649                     }
1650                     if (q.top - q.base <= 0)
1651                         break;
1652                 }
1653                 else
1654                     break;
1655             }
1656             else {
1657                 if ((short)u < 0)
1658                     tryAddWorker();
1659                 break;
1660             }
1661         }
1662     }
1663 
1664     // Scanning for tasks
1665 
1666     /**
1667      * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
1668      */
1669     final void runWorker(WorkQueue w) {
1670         w.growArray(); // allocate queue
1671         do { w.runTask(scan(w)); } while (w.qlock >= 0);
1672     }
1673 
1674     /**
1675      * Scans for and, if found, returns one task, else possibly
1676      * inactivates the worker. This method operates on single reads of
1677      * volatile state and is designed to be re-invoked continuously,
1678      * in part because it returns upon detecting inconsistencies,
1679      * contention, or state changes that indicate possible success on
1680      * re-invocation.
1681      *
1682      * The scan searches for tasks across queues (starting at a random
1683      * index, and relying on registerWorker to irregularly scatter
1684      * them within array to avoid bias), checking each at least twice.
1685      * The scan terminates upon either finding a non-empty queue, or
1686      * completing the sweep. If the worker is not inactivated, it
1687      * takes and returns a task from this queue. Otherwise, if not
1688      * activated, it signals workers (that may include itself) and
1689      * returns so caller can retry. Also returns for true if the
1690      * worker array may have changed during an empty scan.  On failure
1691      * to find a task, we take one of the following actions, after
1692      * which the caller will retry calling this method unless
1693      * terminated.
1694      *
1695      * * If pool is terminating, terminate the worker.
1696      *
1697      * * If not already enqueued, try to inactivate and enqueue the
1698      * worker on wait queue. Or, if inactivating has caused the pool
1699      * to be quiescent, relay to idleAwaitWork to possibly shrink
1700      * pool.
1701      *
1702      * * If already enqueued and none of the above apply, possibly
1703      * park awaiting signal, else lingering to help scan and signal.
1704      *
1705      * * If a non-empty queue discovered or left as a hint,
1706      * help wake up other workers before return
1707      *
1708      * @param w the worker (via its WorkQueue)
1709      * @return a task or null if none found
1710      */
1711     private final ForkJoinTask<?> scan(WorkQueue w) {
1712         WorkQueue[] ws; int m;
1713         int ps = plock;                          // read plock before ws
1714         if (w != null && (ws = workQueues) != null && (m = ws.length - 1) >= 0) {
1715             int ec = w.eventCount;               // ec is negative if inactive
1716             int r = w.seed; r ^= r << 13; r ^= r >>> 17; w.seed = r ^= r << 5;
1717             w.hint = -1;                         // update seed and clear hint
1718             int j = ((m + m + 1) | MIN_SCAN) & MAX_SCAN;
1719             do {
1720                 WorkQueue q; ForkJoinTask<?>[] a; int b;
1721                 if ((q = ws[(r + j) & m]) != null && (b = q.base) - q.top < 0 &&
1722                     (a = q.array) != null) {     // probably nonempty
1723                     int i = (((a.length - 1) & b) << ASHIFT) + ABASE;
1724                     ForkJoinTask<?> t = (ForkJoinTask<?>)
1725                         U.getObjectVolatile(a, i);
1726                     if (q.base == b && ec >= 0 && t != null &&
1727                         U.compareAndSwapObject(a, i, t, null)) {
1728                         if ((q.base = b + 1) - q.top < 0)
1729                             signalWork(q);
1730                         return t;                // taken
1731                     }
1732                     else if ((ec < 0 || j < m) && (int)(ctl >> AC_SHIFT) <= 0) {
1733                         w.hint = (r + j) & m;    // help signal below
1734                         break;                   // cannot take
1735                     }
1736                 }
1737             } while (--j >= 0);
1738 
1739             int h, e, ns; long c, sc; WorkQueue q;
1740             if ((ns = w.nsteals) != 0) {
1741                 if (U.compareAndSwapLong(this, STEALCOUNT,
1742                                          sc = stealCount, sc + ns))
1743                     w.nsteals = 0;               // collect steals and rescan
1744             }
1745             else if (plock != ps)                // consistency check
1746                 ;                                // skip
1747             else if ((e = (int)(c = ctl)) < 0)
1748                 w.qlock = -1;                    // pool is terminating
1749             else {
1750                 if ((h = w.hint) < 0) {
1751                     if (ec >= 0) {               // try to enqueue/inactivate
1752                         long nc = (((long)ec |
1753                                     ((c - AC_UNIT) & (AC_MASK|TC_MASK))));
1754                         w.nextWait = e;          // link and mark inactive
1755                         w.eventCount = ec | INT_SIGN;
1756                         if (ctl != c || !U.compareAndSwapLong(this, CTL, c, nc))
1757                             w.eventCount = ec;   // unmark on CAS failure
1758                         else if ((int)(c >> AC_SHIFT) == 1 - (config & SMASK))
1759                             idleAwaitWork(w, nc, c);
1760                     }
1761                     else if (w.eventCount < 0 && !tryTerminate(false, false) &&
1762                              ctl == c) {         // block
1763                         Thread wt = Thread.currentThread();
1764                         Thread.interrupted();    // clear status
1765                         U.putObject(wt, PARKBLOCKER, this);
1766                         w.parker = wt;           // emulate LockSupport.park
1767                         if (w.eventCount < 0)    // recheck
1768                             U.park(false, 0L);
1769                         w.parker = null;
1770                         U.putObject(wt, PARKBLOCKER, null);
1771                     }
1772                 }
1773                 if ((h >= 0 || (h = w.hint) >= 0) &&
1774                     (ws = workQueues) != null && h < ws.length &&
1775                     (q = ws[h]) != null) {      // signal others before retry
1776                     WorkQueue v; Thread p; int u, i, s;
1777                     for (int n = (config & SMASK) >>> 1;;) {
1778                         int idleCount = (w.eventCount < 0) ? 0 : -1;
1779                         if (((s = idleCount - q.base + q.top) <= n &&
1780                              (n = s) <= 0) ||
1781                             (u = (int)((c = ctl) >>> 32)) >= 0 ||
1782                             (e = (int)c) <= 0 || m < (i = e & SMASK) ||
1783                             (v = ws[i]) == null)
1784                             break;
1785                         long nc = (((long)(v.nextWait & E_MASK)) |
1786                                    ((long)(u + UAC_UNIT) << 32));
1787                         if (v.eventCount != (e | INT_SIGN) ||
1788                             !U.compareAndSwapLong(this, CTL, c, nc))
1789                             break;
1790                         v.hint = h;
1791                         v.eventCount = (e + E_SEQ) & E_MASK;
1792                         if ((p = v.parker) != null)
1793                             U.unpark(p);
1794                         if (--n <= 0)
1795                             break;
1796                     }
1797                 }
1798             }
1799         }
1800         return null;
1801     }
1802 
1803     /**
1804      * If inactivating worker w has caused the pool to become
1805      * quiescent, checks for pool termination, and, so long as this is
1806      * not the only worker, waits for event for up to a given
1807      * duration.  On timeout, if ctl has not changed, terminates the
1808      * worker, which will in turn wake up another worker to possibly
1809      * repeat this process.
1810      *
1811      * @param w the calling worker
1812      * @param currentCtl the ctl value triggering possible quiescence
1813      * @param prevCtl the ctl value to restore if thread is terminated
1814      */
1815     private void idleAwaitWork(WorkQueue w, long currentCtl, long prevCtl) {
1816         if (w != null && w.eventCount < 0 &&
1817             !tryTerminate(false, false) && (int)prevCtl != 0) {
1818             int dc = -(short)(currentCtl >>> TC_SHIFT);
1819             long parkTime = dc < 0 ? FAST_IDLE_TIMEOUT: (dc + 1) * IDLE_TIMEOUT;
1820             long deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
1821             Thread wt = Thread.currentThread();
1822             while (ctl == currentCtl) {
1823                 Thread.interrupted();  // timed variant of version in scan()
1824                 U.putObject(wt, PARKBLOCKER, this);
1825                 w.parker = wt;
1826                 if (ctl == currentCtl)
1827                     U.park(false, parkTime);
1828                 w.parker = null;
1829                 U.putObject(wt, PARKBLOCKER, null);
1830                 if (ctl != currentCtl)
1831                     break;
1832                 if (deadline - System.nanoTime() <= 0L &&
1833                     U.compareAndSwapLong(this, CTL, currentCtl, prevCtl)) {
1834                     w.eventCount = (w.eventCount + E_SEQ) | E_MASK;
1835                     w.qlock = -1;   // shrink
1836                     break;
1837                 }
1838             }
1839         }
1840     }
1841 
1842     /**
1843      * Scans through queues looking for work while joining a task; if
1844      * any present, signals. May return early if more signalling is
1845      * detectably unneeded.
1846      *
1847      * @param task return early if done
1848      * @param origin an index to start scan
1849      */
1850     private void helpSignal(ForkJoinTask<?> task, int origin) {
1851         WorkQueue[] ws; WorkQueue w; Thread p; long c; int m, u, e, i, s;
1852         if (task != null && task.status >= 0 &&
1853             (u = (int)(ctl >>> 32)) < 0 && (u >> UAC_SHIFT) < 0 &&
1854             (ws = workQueues) != null && (m = ws.length - 1) >= 0) {
1855             outer: for (int k = origin, j = m; j >= 0; --j) {
1856                 WorkQueue q = ws[k++ & m];
1857                 for (int n = m;;) { // limit to at most m signals
1858                     if (task.status < 0)
1859                         break outer;
1860                     if (q == null ||
1861                         ((s = -q.base + q.top) <= n && (n = s) <= 0))
1862                         break;
1863                     if ((u = (int)((c = ctl) >>> 32)) >= 0 ||
1864                         (e = (int)c) <= 0 || m < (i = e & SMASK) ||
1865                         (w = ws[i]) == null)
1866                         break outer;
1867                     long nc = (((long)(w.nextWait & E_MASK)) |
1868                                ((long)(u + UAC_UNIT) << 32));
1869                     if (w.eventCount != (e | INT_SIGN))
1870                         break outer;
1871                     if (U.compareAndSwapLong(this, CTL, c, nc)) {
1872                         w.eventCount = (e + E_SEQ) & E_MASK;
1873                         if ((p = w.parker) != null)
1874                             U.unpark(p);
1875                         if (--n <= 0)
1876                             break;
1877                     }
1878                 }
1879             }
1880         }
1881     }
1882 
1883     /**
1884      * Tries to locate and execute tasks for a stealer of the given
1885      * task, or in turn one of its stealers, Traces currentSteal ->
1886      * currentJoin links looking for a thread working on a descendant
1887      * of the given task and with a non-empty queue to steal back and
1888      * execute tasks from. The first call to this method upon a
1889      * waiting join will often entail scanning/search, (which is OK
1890      * because the joiner has nothing better to do), but this method
1891      * leaves hints in workers to speed up subsequent calls. The
1892      * implementation is very branchy to cope with potential
1893      * inconsistencies or loops encountering chains that are stale,
1894      * unknown, or so long that they are likely cyclic.
1895      *
1896      * @param joiner the joining worker
1897      * @param task the task to join
1898      * @return 0 if no progress can be made, negative if task
1899      * known complete, else positive
1900      */
1901     private int tryHelpStealer(WorkQueue joiner, ForkJoinTask<?> task) {
1902         int stat = 0, steps = 0;                    // bound to avoid cycles
1903         if (joiner != null && task != null) {       // hoist null checks
1904             restart: for (;;) {
1905                 ForkJoinTask<?> subtask = task;     // current target
1906                 for (WorkQueue j = joiner, v;;) {   // v is stealer of subtask
1907                     WorkQueue[] ws; int m, s, h;
1908                     if ((s = task.status) < 0) {
1909                         stat = s;
1910                         break restart;
1911                     }
1912                     if ((ws = workQueues) == null || (m = ws.length - 1) <= 0)
1913                         break restart;              // shutting down
1914                     if ((v = ws[h = (j.hint | 1) & m]) == null ||
1915                         v.currentSteal != subtask) {
1916                         for (int origin = h;;) {    // find stealer
1917                             if (((h = (h + 2) & m) & 15) == 1 &&
1918                                 (subtask.status < 0 || j.currentJoin != subtask))
1919                                 continue restart;   // occasional staleness check
1920                             if ((v = ws[h]) != null &&
1921                                 v.currentSteal == subtask) {
1922                                 j.hint = h;        // save hint
1923                                 break;
1924                             }
1925                             if (h == origin)
1926                                 break restart;      // cannot find stealer
1927                         }
1928                     }
1929                     for (;;) { // help stealer or descend to its stealer
1930                         ForkJoinTask[] a;  int b;
1931                         if (subtask.status < 0)     // surround probes with
1932                             continue restart;       //   consistency checks
1933                         if ((b = v.base) - v.top < 0 && (a = v.array) != null) {
1934                             int i = (((a.length - 1) & b) << ASHIFT) + ABASE;
1935                             ForkJoinTask<?> t =
1936                                 (ForkJoinTask<?>)U.getObjectVolatile(a, i);
1937                             if (subtask.status < 0 || j.currentJoin != subtask ||
1938                                 v.currentSteal != subtask)
1939                                 continue restart;   // stale
1940                             stat = 1;               // apparent progress
1941                             if (t != null && v.base == b &&
1942                                 U.compareAndSwapObject(a, i, t, null)) {
1943                                 v.base = b + 1;     // help stealer
1944                                 joiner.runSubtask(t);
1945                             }
1946                             else if (v.base == b && ++steps == MAX_HELP)
1947                                 break restart;      // v apparently stalled
1948                         }
1949                         else {                      // empty -- try to descend
1950                             ForkJoinTask<?> next = v.currentJoin;
1951                             if (subtask.status < 0 || j.currentJoin != subtask ||
1952                                 v.currentSteal != subtask)
1953                                 continue restart;   // stale
1954                             else if (next == null || ++steps == MAX_HELP)
1955                                 break restart;      // dead-end or maybe cyclic
1956                             else {
1957                                 subtask = next;
1958                                 j = v;
1959                                 break;
1960                             }
1961                         }
1962                     }
1963                 }
1964             }
1965         }
1966         return stat;
1967     }
1968 
1969     /**
1970      * Analog of tryHelpStealer for CountedCompleters. Tries to steal
1971      * and run tasks within the target's computation.
1972      *
1973      * @param task the task to join
1974      * @param mode if shared, exit upon completing any task
1975      * if all workers are active
1976      *
1977      */
1978     private int helpComplete(ForkJoinTask<?> task, int mode) {
1979         WorkQueue[] ws; WorkQueue q; int m, n, s, u;
1980         if (task != null && (ws = workQueues) != null &&
1981             (m = ws.length - 1) >= 0) {
1982             for (int j = 1, origin = j;;) {
1983                 if ((s = task.status) < 0)
1984                     return s;
1985                 if ((q = ws[j & m]) != null && q.pollAndExecCC(task)) {
1986                     origin = j;
1987                     if (mode == SHARED_QUEUE &&
1988                         ((u = (int)(ctl >>> 32)) >= 0 || (u >> UAC_SHIFT) >= 0))
1989                         break;
1990                 }
1991                 else if ((j = (j + 2) & m) == origin)
1992                     break;
1993             }
1994         }
1995         return 0;
1996     }
1997 
1998     /**
1999      * Tries to decrement active count (sometimes implicitly) and
2000      * possibly release or create a compensating worker in preparation
2001      * for blocking. Fails on contention or termination. Otherwise,
2002      * adds a new thread if no idle workers are available and pool
2003      * may become starved.
2004      */
2005     final boolean tryCompensate() {
2006         int pc = config & SMASK, e, i, tc; long c;
2007         WorkQueue[] ws; WorkQueue w; Thread p;
2008         if ((ws = workQueues) != null && (e = (int)(c = ctl)) >= 0) {
2009             if (e != 0 && (i = e & SMASK) < ws.length &&
2010                 (w = ws[i]) != null && w.eventCount == (e | INT_SIGN)) {
2011                 long nc = ((long)(w.nextWait & E_MASK) |
2012                            (c & (AC_MASK|TC_MASK)));
2013                 if (U.compareAndSwapLong(this, CTL, c, nc)) {
2014                     w.eventCount = (e + E_SEQ) & E_MASK;
2015                     if ((p = w.parker) != null)
2016                         U.unpark(p);
2017                     return true;   // replace with idle worker
2018                 }
2019             }
2020             else if ((tc = (short)(c >>> TC_SHIFT)) >= 0 &&
2021                      (int)(c >> AC_SHIFT) + pc > 1) {
2022                 long nc = ((c - AC_UNIT) & AC_MASK) | (c & ~AC_MASK);
2023                 if (U.compareAndSwapLong(this, CTL, c, nc))
2024                     return true;   // no compensation
2025             }
2026             else if (tc + pc < MAX_CAP) {
2027                 long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK);
2028                 if (U.compareAndSwapLong(this, CTL, c, nc)) {
2029                     ForkJoinWorkerThreadFactory fac;
2030                     Throwable ex = null;
2031                     ForkJoinWorkerThread wt = null;
2032                     try {
2033                         if ((fac = factory) != null &&
2034                             (wt = fac.newThread(this)) != null) {
2035                             wt.start();
2036                             return true;
2037                         }
2038                     } catch (Throwable rex) {
2039                         ex = rex;
2040                     }
2041                     deregisterWorker(wt, ex); // clean up and return false
2042                 }
2043             }
2044         }
2045         return false;
2046     }
2047 
2048     /**
2049      * Helps and/or blocks until the given task is done.
2050      *
2051      * @param joiner the joining worker
2052      * @param task the task
2053      * @return task status on exit
2054      */
2055     final int awaitJoin(WorkQueue joiner, ForkJoinTask<?> task) {
2056         int s = 0;
2057         if (joiner != null && task != null && (s = task.status) >= 0) {
2058             ForkJoinTask<?> prevJoin = joiner.currentJoin;
2059             joiner.currentJoin = task;
2060             do {} while ((s = task.status) >= 0 && !joiner.isEmpty() &&
2061                          joiner.tryRemoveAndExec(task)); // process local tasks
2062             if (s >= 0 && (s = task.status) >= 0) {
2063                 helpSignal(task, joiner.poolIndex);
2064                 if ((s = task.status) >= 0 &&
2065                     (task instanceof CountedCompleter))
2066                     s = helpComplete(task, LIFO_QUEUE);
2067             }
2068             while (s >= 0 && (s = task.status) >= 0) {
2069                 if ((!joiner.isEmpty() ||           // try helping
2070                      (s = tryHelpStealer(joiner, task)) == 0) &&
2071                     (s = task.status) >= 0) {
2072                     helpSignal(task, joiner.poolIndex);
2073                     if ((s = task.status) >= 0 && tryCompensate()) {
2074                         if (task.trySetSignal() && (s = task.status) >= 0) {
2075                             synchronized (task) {
2076                                 if (task.status >= 0) {
2077                                     try {                // see ForkJoinTask
2078                                         task.wait();     //  for explanation
2079                                     } catch (InterruptedException ie) {
2080                                     }
2081                                 }
2082                                 else
2083                                     task.notifyAll();
2084                             }
2085                         }
2086                         long c;                          // re-activate
2087                         do {} while (!U.compareAndSwapLong
2088                                      (this, CTL, c = ctl, c + AC_UNIT));
2089                     }
2090                 }
2091             }
2092             joiner.currentJoin = prevJoin;
2093         }
2094         return s;
2095     }
2096 
2097     /**
2098      * Stripped-down variant of awaitJoin used by timed joins. Tries
2099      * to help join only while there is continuous progress. (Caller
2100      * will then enter a timed wait.)
2101      *
2102      * @param joiner the joining worker
2103      * @param task the task
2104      */
2105     final void helpJoinOnce(WorkQueue joiner, ForkJoinTask<?> task) {
2106         int s;
2107         if (joiner != null && task != null && (s = task.status) >= 0) {
2108             ForkJoinTask<?> prevJoin = joiner.currentJoin;
2109             joiner.currentJoin = task;
2110             do {} while ((s = task.status) >= 0 && !joiner.isEmpty() &&
2111                          joiner.tryRemoveAndExec(task));
2112             if (s >= 0 && (s = task.status) >= 0) {
2113                 helpSignal(task, joiner.poolIndex);
2114                 if ((s = task.status) >= 0 &&
2115                     (task instanceof CountedCompleter))
2116                     s = helpComplete(task, LIFO_QUEUE);
2117             }
2118             if (s >= 0 && joiner.isEmpty()) {
2119                 do {} while (task.status >= 0 &&
2120                              tryHelpStealer(joiner, task) > 0);
2121             }
2122             joiner.currentJoin = prevJoin;
2123         }
2124     }
2125 
2126     /**
2127      * Returns a (probably) non-empty steal queue, if one is found
2128      * during a random, then cyclic scan, else null.  This method must
2129      * be retried by caller if, by the time it tries to use the queue,
2130      * it is empty.
2131      * @param r a (random) seed for scanning
2132      */
2133     private WorkQueue findNonEmptyStealQueue(int r) {
2134         for (WorkQueue[] ws;;) {
2135             int ps = plock, m, n;
2136             if ((ws = workQueues) == null || (m = ws.length - 1) < 1)
2137                 return null;
2138             for (int j = (m + 1) << 2; ;) {
2139                 WorkQueue q = ws[(((r + j) << 1) | 1) & m];
2140                 if (q != null && (n = q.base - q.top) < 0) {
2141                     if (n < -1)
2142                         signalWork(q);
2143                     return q;
2144                 }
2145                 else if (--j < 0) {
2146                     if (plock == ps)
2147                         return null;
2148                     break;
2149                 }
2150             }
2151         }
2152     }
2153 
2154     /**
2155      * Runs tasks until {@code isQuiescent()}. We piggyback on
2156      * active count ctl maintenance, but rather than blocking
2157      * when tasks cannot be found, we rescan until all others cannot
2158      * find tasks either.
2159      */
2160     final void helpQuiescePool(WorkQueue w) {
2161         for (boolean active = true;;) {
2162             ForkJoinTask<?> localTask; // exhaust local queue
2163             while ((localTask = w.nextLocalTask()) != null)
2164                 localTask.doExec();
2165             // Similar to loop in scan(), but ignoring submissions
2166             WorkQueue q = findNonEmptyStealQueue(w.nextSeed());
2167             if (q != null) {
2168                 ForkJoinTask<?> t; int b;
2169                 if (!active) {      // re-establish active count
2170                     long c;
2171                     active = true;
2172                     do {} while (!U.compareAndSwapLong
2173                                  (this, CTL, c = ctl, c + AC_UNIT));
2174                 }
2175                 if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null)
2176                     w.runSubtask(t);
2177             }
2178             else {
2179                 long c;
2180                 if (active) {       // decrement active count without queuing
2181                     active = false;
2182                     do {} while (!U.compareAndSwapLong
2183                                  (this, CTL, c = ctl, c -= AC_UNIT));
2184                 }
2185                 else
2186                     c = ctl;        // re-increment on exit
2187                 if ((int)(c >> AC_SHIFT) + (config & SMASK) == 0) {
2188                     do {} while (!U.compareAndSwapLong
2189                                  (this, CTL, c = ctl, c + AC_UNIT));
2190                     break;
2191                 }
2192             }
2193         }
2194     }
2195 
2196     /**
2197      * Gets and removes a local or stolen task for the given worker.
2198      *
2199      * @return a task, if available
2200      */
2201     final ForkJoinTask<?> nextTaskFor(WorkQueue w) {
2202         for (ForkJoinTask<?> t;;) {
2203             WorkQueue q; int b;
2204             if ((t = w.nextLocalTask()) != null)
2205                 return t;
2206             if ((q = findNonEmptyStealQueue(w.nextSeed())) == null)
2207                 return null;
2208             if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null)
2209                 return t;
2210         }
2211     }
2212 
2213     /**
2214      * Returns a cheap heuristic guide for task partitioning when
2215      * programmers, frameworks, tools, or languages have little or no
2216      * idea about task granularity.  In essence by offering this
2217      * method, we ask users only about tradeoffs in overhead vs
2218      * expected throughput and its variance, rather than how finely to
2219      * partition tasks.
2220      *
2221      * In a steady state strict (tree-structured) computation, each
2222      * thread makes available for stealing enough tasks for other
2223      * threads to remain active. Inductively, if all threads play by
2224      * the same rules, each thread should make available only a
2225      * constant number of tasks.
2226      *
2227      * The minimum useful constant is just 1. But using a value of 1
2228      * would require immediate replenishment upon each steal to
2229      * maintain enough tasks, which is infeasible.  Further,
2230      * partitionings/granularities of offered tasks should minimize
2231      * steal rates, which in general means that threads nearer the top
2232      * of computation tree should generate more than those nearer the
2233      * bottom. In perfect steady state, each thread is at
2234      * approximately the same level of computation tree. However,
2235      * producing extra tasks amortizes the uncertainty of progress and
2236      * diffusion assumptions.
2237      *
2238      * So, users will want to use values larger, but not much larger
2239      * than 1 to both smooth over transient shortages and hedge
2240      * against uneven progress; as traded off against the cost of
2241      * extra task overhead. We leave the user to pick a threshold
2242      * value to compare with the results of this call to guide
2243      * decisions, but recommend values such as 3.
2244      *
2245      * When all threads are active, it is on average OK to estimate
2246      * surplus strictly locally. In steady-state, if one thread is
2247      * maintaining say 2 surplus tasks, then so are others. So we can
2248      * just use estimated queue length.  However, this strategy alone
2249      * leads to serious mis-estimates in some non-steady-state
2250      * conditions (ramp-up, ramp-down, other stalls). We can detect
2251      * many of these by further considering the number of "idle"
2252      * threads, that are known to have zero queued tasks, so
2253      * compensate by a factor of (#idle/#active) threads.
2254      *
2255      * Note: The approximation of #busy workers as #active workers is
2256      * not very good under current signalling scheme, and should be
2257      * improved.
2258      */
2259     static int getSurplusQueuedTaskCount() {
2260         Thread t; ForkJoinWorkerThread wt; ForkJoinPool pool; WorkQueue q;
2261         if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)) {
2262             int p = (pool = (wt = (ForkJoinWorkerThread)t).pool).config & SMASK;
2263             int n = (q = wt.workQueue).top - q.base;
2264             int a = (int)(pool.ctl >> AC_SHIFT) + p;
2265             return n - (a > (p >>>= 1) ? 0 :
2266                         a > (p >>>= 1) ? 1 :
2267                         a > (p >>>= 1) ? 2 :
2268                         a > (p >>>= 1) ? 4 :
2269                         8);
2270         }
2271         return 0;
2272     }
2273 
2274     //  Termination
2275 
2276     /**
2277      * Possibly initiates and/or completes termination.  The caller
2278      * triggering termination runs three passes through workQueues:
2279      * (0) Setting termination status, followed by wakeups of queued
2280      * workers; (1) cancelling all tasks; (2) interrupting lagging
2281      * threads (likely in external tasks, but possibly also blocked in
2282      * joins).  Each pass repeats previous steps because of potential
2283      * lagging thread creation.
2284      *
2285      * @param now if true, unconditionally terminate, else only
2286      * if no work and no active workers
2287      * @param enable if true, enable shutdown when next possible
2288      * @return true if now terminating or terminated
2289      */
2290     private boolean tryTerminate(boolean now, boolean enable) {
2291         if (this == commonPool)                     // cannot shut down
2292             return false;
2293         for (long c;;) {
2294             if (((c = ctl) & STOP_BIT) != 0) {      // already terminating
2295                 if ((short)(c >>> TC_SHIFT) == -(config & SMASK)) {
2296                     synchronized (this) {
2297                         notifyAll();                // signal when 0 workers
2298                     }
2299                 }
2300                 return true;
2301             }
2302             if (plock >= 0) {                       // not yet enabled
2303                 int ps;
2304                 if (!enable)
2305                     return false;
2306                 if (((ps = plock) & PL_LOCK) != 0 ||
2307                     !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
2308                     ps = acquirePlock();
2309                 if (!U.compareAndSwapInt(this, PLOCK, ps, SHUTDOWN))
2310                     releasePlock(SHUTDOWN);
2311             }
2312             if (!now) {                             // check if idle & no tasks
2313                 if ((int)(c >> AC_SHIFT) != -(config & SMASK) ||
2314                     hasQueuedSubmissions())
2315                     return false;
2316                 // Check for unqueued inactive workers. One pass suffices.
2317                 WorkQueue[] ws = workQueues; WorkQueue w;
2318                 if (ws != null) {
2319                     for (int i = 1; i < ws.length; i += 2) {
2320                         if ((w = ws[i]) != null && w.eventCount >= 0)
2321                             return false;
2322                     }
2323                 }
2324             }
2325             if (U.compareAndSwapLong(this, CTL, c, c | STOP_BIT)) {
2326                 for (int pass = 0; pass < 3; ++pass) {
2327                     WorkQueue[] ws = workQueues;
2328                     if (ws != null) {
2329                         WorkQueue w; Thread wt;
2330                         int n = ws.length;
2331                         for (int i = 0; i < n; ++i) {
2332                             if ((w = ws[i]) != null) {
2333                                 w.qlock = -1;
2334                                 if (pass > 0) {
2335                                     w.cancelAll();
2336                                     if (pass > 1 && (wt = w.owner) != null) {
2337                                         if (!wt.isInterrupted()) {
2338                                             try {
2339                                                 wt.interrupt();
2340                                             } catch (SecurityException ignore) {
2341                                             }
2342                                         }
2343                                         U.unpark(wt);
2344                                     }
2345                                 }
2346                             }
2347                         }
2348                         // Wake up workers parked on event queue
2349                         int i, e; long cc; Thread p;
2350                         while ((e = (int)(cc = ctl) & E_MASK) != 0 &&
2351                                (i = e & SMASK) < n &&
2352                                (w = ws[i]) != null) {
2353                             long nc = ((long)(w.nextWait & E_MASK) |
2354                                        ((cc + AC_UNIT) & AC_MASK) |
2355                                        (cc & (TC_MASK|STOP_BIT)));
2356                             if (w.eventCount == (e | INT_SIGN) &&
2357                                 U.compareAndSwapLong(this, CTL, cc, nc)) {
2358                                 w.eventCount = (e + E_SEQ) & E_MASK;
2359                                 w.qlock = -1;
2360                                 if ((p = w.parker) != null)
2361                                     U.unpark(p);
2362                             }
2363                         }
2364                     }
2365                 }
2366             }
2367         }
2368     }
2369 
2370     // external operations on common pool
2371 
2372     /**
2373      * Returns common pool queue for a thread that has submitted at
2374      * least one task.
2375      */
2376     static WorkQueue commonSubmitterQueue() {
2377         ForkJoinPool p; WorkQueue[] ws; int m; Submitter z;
2378         return ((z = submitters.get()) != null &&
2379                 (p = commonPool) != null &&
2380                 (ws = p.workQueues) != null &&
2381                 (m = ws.length - 1) >= 0) ?
2382             ws[m & z.seed & SQMASK] : null;
2383     }
2384 
2385     /**
2386      * Tries to pop the given task from submitter's queue in common pool.
2387      */
2388     static boolean tryExternalUnpush(ForkJoinTask<?> t) {
2389         ForkJoinPool p; WorkQueue[] ws; WorkQueue q; Submitter z;
2390         ForkJoinTask<?>[] a;  int m, s;
2391         if (t != null &&
2392             (z = submitters.get()) != null &&
2393             (p = commonPool) != null &&
2394             (ws = p.workQueues) != null &&
2395             (m = ws.length - 1) >= 0 &&
2396             (q = ws[m & z.seed & SQMASK]) != null &&
2397             (s = q.top) != q.base &&
2398             (a = q.array) != null) {
2399             long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE;
2400             if (U.getObject(a, j) == t &&
2401                 U.compareAndSwapInt(q, QLOCK, 0, 1)) {
2402                 if (q.array == a && q.top == s && // recheck
2403                     U.compareAndSwapObject(a, j, t, null)) {
2404                     q.top = s - 1;
2405                     q.qlock = 0;
2406                     return true;
2407                 }
2408                 q.qlock = 0;
2409             }
2410         }
2411         return false;
2412     }
2413 
2414     /**
2415      * Tries to pop and run local tasks within the same computation
2416      * as the given root. On failure, tries to help complete from
2417      * other queues via helpComplete.
2418      */
2419     private void externalHelpComplete(WorkQueue q, ForkJoinTask<?> root) {
2420         ForkJoinTask<?>[] a; int m;
2421         if (q != null && (a = q.array) != null && (m = (a.length - 1)) >= 0 &&
2422             root != null && root.status >= 0) {
2423             for (;;) {
2424                 int s, u; Object o; CountedCompleter<?> task = null;
2425                 if ((s = q.top) - q.base > 0) {
2426                     long j = ((m & (s - 1)) << ASHIFT) + ABASE;
2427                     if ((o = U.getObject(a, j)) != null &&
2428                         (o instanceof CountedCompleter)) {
2429                         CountedCompleter<?> t = (CountedCompleter<?>)o, r = t;
2430                         do {
2431                             if (r == root) {
2432                                 if (U.compareAndSwapInt(q, QLOCK, 0, 1)) {
2433                                     if (q.array == a && q.top == s &&
2434                                         U.compareAndSwapObject(a, j, t, null)) {
2435                                         q.top = s - 1;
2436                                         task = t;
2437                                     }
2438                                     q.qlock = 0;
2439                                 }
2440                                 break;
2441                             }
2442                         } while ((r = r.completer) != null);
2443                     }
2444                 }
2445                 if (task != null)
2446                     task.doExec();
2447                 if (root.status < 0 ||
2448                     (u = (int)(ctl >>> 32)) >= 0 || (u >> UAC_SHIFT) >= 0)
2449                     break;
2450                 if (task == null) {
2451                     helpSignal(root, q.poolIndex);
2452                     if (root.status >= 0)
2453                         helpComplete(root, SHARED_QUEUE);
2454                     break;
2455                 }
2456             }
2457         }
2458     }
2459 
2460     /**
2461      * Tries to help execute or signal availability of the given task
2462      * from submitter's queue in common pool.
2463      */
2464     static void externalHelpJoin(ForkJoinTask<?> t) {
2465         // Some hard-to-avoid overlap with tryExternalUnpush
2466         ForkJoinPool p; WorkQueue[] ws; WorkQueue q, w; Submitter z;
2467         ForkJoinTask<?>[] a;  int m, s, n;
2468         if (t != null &&
2469             (z = submitters.get()) != null &&
2470             (p = commonPool) != null &&
2471             (ws = p.workQueues) != null &&
2472             (m = ws.length - 1) >= 0 &&
2473             (q = ws[m & z.seed & SQMASK]) != null &&
2474             (a = q.array) != null) {
2475             int am = a.length - 1;
2476             if ((s = q.top) != q.base) {
2477                 long j = ((am & (s - 1)) << ASHIFT) + ABASE;
2478                 if (U.getObject(a, j) == t &&
2479                     U.compareAndSwapInt(q, QLOCK, 0, 1)) {
2480                     if (q.array == a && q.top == s &&
2481                         U.compareAndSwapObject(a, j, t, null)) {
2482                         q.top = s - 1;
2483                         q.qlock = 0;
2484                         t.doExec();
2485                     }
2486                     else
2487                         q.qlock = 0;
2488                 }
2489             }
2490             if (t.status >= 0) {
2491                 if (t instanceof CountedCompleter)
2492                     p.externalHelpComplete(q, t);
2493                 else
2494                     p.helpSignal(t, q.poolIndex);
2495             }
2496         }
2497     }
2498 
2499     /**
2500      * Restricted version of helpQuiescePool for external callers
2501      */
2502     static void externalHelpQuiescePool() {
2503         ForkJoinPool p; ForkJoinTask<?> t; WorkQueue q; int b;
2504         if ((p = commonPool) != null &&
2505             (q = p.findNonEmptyStealQueue(1)) != null &&
2506             (b = q.base) - q.top < 0 &&
2507             (t = q.pollAt(b)) != null)
2508             t.doExec();
2509     }
2510 
2511     // Exported methods
2512 
2513     // Constructors
2514 
2515     /**
2516      * Creates a {@code ForkJoinPool} with parallelism equal to {@link
2517      * java.lang.Runtime#availableProcessors}, using the {@linkplain
2518      * #defaultForkJoinWorkerThreadFactory default thread factory},
2519      * no UncaughtExceptionHandler, and non-async LIFO processing mode.
2520      *
2521      * @throws SecurityException if a security manager exists and
2522      *         the caller is not permitted to modify threads
2523      *         because it does not hold {@link
2524      *         java.lang.RuntimePermission}{@code ("modifyThread")}
2525      */
2526     public ForkJoinPool() {
2527         this(Runtime.getRuntime().availableProcessors(),
2528              defaultForkJoinWorkerThreadFactory, null, false);
2529     }
2530 
2531     /**
2532      * Creates a {@code ForkJoinPool} with the indicated parallelism
2533      * level, the {@linkplain
2534      * #defaultForkJoinWorkerThreadFactory default thread factory},
2535      * no UncaughtExceptionHandler, and non-async LIFO processing mode.
2536      *
2537      * @param parallelism the parallelism level
2538      * @throws IllegalArgumentException if parallelism less than or
2539      *         equal to zero, or greater than implementation limit
2540      * @throws SecurityException if a security manager exists and
2541      *         the caller is not permitted to modify threads
2542      *         because it does not hold {@link
2543      *         java.lang.RuntimePermission}{@code ("modifyThread")}
2544      */
2545     public ForkJoinPool(int parallelism) {
2546         this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
2547     }
2548 
2549     /**
2550      * Creates a {@code ForkJoinPool} with the given parameters.
2551      *
2552      * @param parallelism the parallelism level. For default value,
2553      * use {@link java.lang.Runtime#availableProcessors}.
2554      * @param factory the factory for creating new threads. For default value,
2555      * use {@link #defaultForkJoinWorkerThreadFactory}.
2556      * @param handler the handler for internal worker threads that
2557      * terminate due to unrecoverable errors encountered while executing
2558      * tasks. For default value, use {@code null}.
2559      * @param asyncMode if true,
2560      * establishes local first-in-first-out scheduling mode for forked
2561      * tasks that are never joined. This mode may be more appropriate
2562      * than default locally stack-based mode in applications in which
2563      * worker threads only process event-style asynchronous tasks.
2564      * For default value, use {@code false}.
2565      * @throws IllegalArgumentException if parallelism less than or
2566      *         equal to zero, or greater than implementation limit
2567      * @throws NullPointerException if the factory is null
2568      * @throws SecurityException if a security manager exists and
2569      *         the caller is not permitted to modify threads
2570      *         because it does not hold {@link
2571      *         java.lang.RuntimePermission}{@code ("modifyThread")}
2572      */
2573     public ForkJoinPool(int parallelism,
2574                         ForkJoinWorkerThreadFactory factory,
2575                         Thread.UncaughtExceptionHandler handler,
2576                         boolean asyncMode) {
2577         checkPermission();
2578         if (factory == null)
2579             throw new NullPointerException();
2580         if (parallelism <= 0 || parallelism > MAX_CAP)
2581             throw new IllegalArgumentException();
2582         this.factory = factory;
2583         this.ueh = handler;
2584         this.config = parallelism | (asyncMode ? (FIFO_QUEUE << 16) : 0);
2585         long np = (long)(-parallelism); // offset ctl counts
2586         this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
2587         int pn = nextPoolId();
2588         StringBuilder sb = new StringBuilder("ForkJoinPool-");
2589         sb.append(Integer.toString(pn));
2590         sb.append("-worker-");
2591         this.workerNamePrefix = sb.toString();
2592     }
2593 
2594     /**
2595      * Constructor for common pool, suitable only for static initialization.
2596      * Basically the same as above, but uses smallest possible initial footprint.
2597      */
2598     ForkJoinPool(int parallelism, long ctl,
2599                  ForkJoinWorkerThreadFactory factory,
2600                  Thread.UncaughtExceptionHandler handler) {
2601         this.config = parallelism;
2602         this.ctl = ctl;
2603         this.factory = factory;
2604         this.ueh = handler;
2605         this.workerNamePrefix = "ForkJoinPool.commonPool-worker-";
2606     }
2607 
2608     /**
2609      * Returns the common pool instance. This pool is statically
2610      * constructed; its run state is unaffected by attempts to
2611      * {@link #shutdown} or {@link #shutdownNow}.
2612      *
2613      * @return the common pool instance
2614      * @since 1.8
2615      */
2616     public static ForkJoinPool commonPool() {
2617         // assert commonPool != null : "static init error";
2618         return commonPool;
2619     }
2620 
2621     // Execution methods
2622 
2623     /**
2624      * Performs the given task, returning its result upon completion.
2625      * If the computation encounters an unchecked Exception or Error,
2626      * it is rethrown as the outcome of this invocation.  Rethrown
2627      * exceptions behave in the same way as regular exceptions, but,
2628      * when possible, contain stack traces (as displayed for example
2629      * using {@code ex.printStackTrace()}) of both the current thread
2630      * as well as the thread actually encountering the exception;
2631      * minimally only the latter.
2632      *
2633      * @param task the task
2634      * @return the task's result
2635      * @throws NullPointerException if the task is null
2636      * @throws RejectedExecutionException if the task cannot be
2637      *         scheduled for execution
2638      */
2639     public <T> T invoke(ForkJoinTask<T> task) {
2640         if (task == null)
2641             throw new NullPointerException();
2642         externalPush(task);
2643         return task.join();
2644     }
2645 
2646     /**
2647      * Arranges for (asynchronous) execution of the given task.
2648      *
2649      * @param task the task
2650      * @throws NullPointerException if the task is null
2651      * @throws RejectedExecutionException if the task cannot be
2652      *         scheduled for execution
2653      */
2654     public void execute(ForkJoinTask<?> task) {
2655         if (task == null)
2656             throw new NullPointerException();
2657         externalPush(task);
2658     }
2659 
2660     // AbstractExecutorService methods
2661 
2662     /**
2663      * @throws NullPointerException if the task is null
2664      * @throws RejectedExecutionException if the task cannot be
2665      *         scheduled for execution
2666      */
2667     public void execute(Runnable task) {
2668         if (task == null)
2669             throw new NullPointerException();
2670         ForkJoinTask<?> job;
2671         if (task instanceof ForkJoinTask<?>) // avoid re-wrap
2672             job = (ForkJoinTask<?>) task;
2673         else
2674             job = new ForkJoinTask.AdaptedRunnableAction(task);
2675         externalPush(job);
2676     }
2677 
2678     /**
2679      * Submits a ForkJoinTask for execution.
2680      *
2681      * @param task the task to submit
2682      * @return the task
2683      * @throws NullPointerException if the task is null
2684      * @throws RejectedExecutionException if the task cannot be
2685      *         scheduled for execution
2686      */
2687     public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
2688         if (task == null)
2689             throw new NullPointerException();
2690         externalPush(task);
2691         return task;
2692     }
2693 
2694     /**
2695      * @throws NullPointerException if the task is null
2696      * @throws RejectedExecutionException if the task cannot be
2697      *         scheduled for execution
2698      */
2699     public <T> ForkJoinTask<T> submit(Callable<T> task) {
2700         ForkJoinTask<T> job = new ForkJoinTask.AdaptedCallable<T>(task);
2701         externalPush(job);
2702         return job;
2703     }
2704 
2705     /**
2706      * @throws NullPointerException if the task is null
2707      * @throws RejectedExecutionException if the task cannot be
2708      *         scheduled for execution
2709      */
2710     public <T> ForkJoinTask<T> submit(Runnable task, T result) {
2711         ForkJoinTask<T> job = new ForkJoinTask.AdaptedRunnable<T>(task, result);
2712         externalPush(job);
2713         return job;
2714     }
2715 
2716     /**
2717      * @throws NullPointerException if the task is null
2718      * @throws RejectedExecutionException if the task cannot be
2719      *         scheduled for execution
2720      */
2721     public ForkJoinTask<?> submit(Runnable task) {
2722         if (task == null)
2723             throw new NullPointerException();
2724         ForkJoinTask<?> job;
2725         if (task instanceof ForkJoinTask<?>) // avoid re-wrap
2726             job = (ForkJoinTask<?>) task;
2727         else
2728             job = new ForkJoinTask.AdaptedRunnableAction(task);
2729         externalPush(job);
2730         return job;
2731     }
2732 
2733     /**
2734      * @throws NullPointerException       {@inheritDoc}
2735      * @throws RejectedExecutionException {@inheritDoc}
2736      */
2737     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
2738         // In previous versions of this class, this method constructed
2739         // a task to run ForkJoinTask.invokeAll, but now external
2740         // invocation of multiple tasks is at least as efficient.
2741         List<ForkJoinTask<T>> fs = new ArrayList<ForkJoinTask<T>>(tasks.size());
2742         // Workaround needed because method wasn't declared with
2743         // wildcards in return type but should have been.
2744         @SuppressWarnings({"unchecked", "rawtypes"})
2745             List<Future<T>> futures = (List<Future<T>>) (List) fs;
2746 
2747         boolean done = false;
2748         try {
2749             for (Callable<T> t : tasks) {
2750                 ForkJoinTask<T> f = new ForkJoinTask.AdaptedCallable<T>(t);
2751                 externalPush(f);
2752                 fs.add(f);
2753             }
2754             for (ForkJoinTask<T> f : fs)
2755                 f.quietlyJoin();
2756             done = true;
2757             return futures;
2758         } finally {
2759             if (!done)
2760                 for (ForkJoinTask<T> f : fs)
2761                     f.cancel(false);
2762         }
2763     }
2764 
2765     /**
2766      * Returns the factory used for constructing new workers.
2767      *
2768      * @return the factory used for constructing new workers
2769      */
2770     public ForkJoinWorkerThreadFactory getFactory() {
2771         return factory;
2772     }
2773 
2774     /**
2775      * Returns the handler for internal worker threads that terminate
2776      * due to unrecoverable errors encountered while executing tasks.
2777      *
2778      * @return the handler, or {@code null} if none
2779      */
2780     public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
2781         return ueh;
2782     }
2783 
2784     /**
2785      * Returns the targeted parallelism level of this pool.
2786      *
2787      * @return the targeted parallelism level of this pool
2788      */
2789     public int getParallelism() {
2790         return config & SMASK;
2791     }
2792 
2793     /**
2794      * Returns the targeted parallelism level of the common pool.
2795      *
2796      * @return the targeted parallelism level of the common pool
2797      * @since 1.8
2798      */
2799     public static int getCommonPoolParallelism() {
2800         return commonPoolParallelism;
2801     }
2802 
2803     /**
2804      * Returns the number of worker threads that have started but not
2805      * yet terminated.  The result returned by this method may differ
2806      * from {@link #getParallelism} when threads are created to
2807      * maintain parallelism when others are cooperatively blocked.
2808      *
2809      * @return the number of worker threads
2810      */
2811     public int getPoolSize() {
2812         return (config & SMASK) + (short)(ctl >>> TC_SHIFT);
2813     }
2814 
2815     /**
2816      * Returns {@code true} if this pool uses local first-in-first-out
2817      * scheduling mode for forked tasks that are never joined.
2818      *
2819      * @return {@code true} if this pool uses async mode
2820      */
2821     public boolean getAsyncMode() {
2822         return (config >>> 16) == FIFO_QUEUE;
2823     }
2824 
2825     /**
2826      * Returns an estimate of the number of worker threads that are
2827      * not blocked waiting to join tasks or for other managed
2828      * synchronization. This method may overestimate the
2829      * number of running threads.
2830      *
2831      * @return the number of worker threads
2832      */
2833     public int getRunningThreadCount() {
2834         int rc = 0;
2835         WorkQueue[] ws; WorkQueue w;
2836         if ((ws = workQueues) != null) {
2837             for (int i = 1; i < ws.length; i += 2) {
2838                 if ((w = ws[i]) != null && w.isApparentlyUnblocked())
2839                     ++rc;
2840             }
2841         }
2842         return rc;
2843     }
2844 
2845     /**
2846      * Returns an estimate of the number of threads that are currently
2847      * stealing or executing tasks. This method may overestimate the
2848      * number of active threads.
2849      *
2850      * @return the number of active threads
2851      */
2852     public int getActiveThreadCount() {
2853         int r = (config & SMASK) + (int)(ctl >> AC_SHIFT);
2854         return (r <= 0) ? 0 : r; // suppress momentarily negative values
2855     }
2856 
2857     /**
2858      * Returns {@code true} if all worker threads are currently idle.
2859      * An idle worker is one that cannot obtain a task to execute
2860      * because none are available to steal from other threads, and
2861      * there are no pending submissions to the pool. This method is
2862      * conservative; it might not return {@code true} immediately upon
2863      * idleness of all threads, but will eventually become true if
2864      * threads remain inactive.
2865      *
2866      * @return {@code true} if all threads are currently idle
2867      */
2868     public boolean isQuiescent() {
2869         return (int)(ctl >> AC_SHIFT) + (config & SMASK) == 0;
2870     }
2871 
2872     /**
2873      * Returns an estimate of the total number of tasks stolen from
2874      * one thread's work queue by another. The reported value
2875      * underestimates the actual total number of steals when the pool
2876      * is not quiescent. This value may be useful for monitoring and
2877      * tuning fork/join programs: in general, steal counts should be
2878      * high enough to keep threads busy, but low enough to avoid
2879      * overhead and contention across threads.
2880      *
2881      * @return the number of steals
2882      */
2883     public long getStealCount() {
2884         long count = stealCount;
2885         WorkQueue[] ws; WorkQueue w;
2886         if ((ws = workQueues) != null) {
2887             for (int i = 1; i < ws.length; i += 2) {
2888                 if ((w = ws[i]) != null)
2889                     count += w.nsteals;
2890             }
2891         }
2892         return count;
2893     }
2894 
2895     /**
2896      * Returns an estimate of the total number of tasks currently held
2897      * in queues by worker threads (but not including tasks submitted
2898      * to the pool that have not begun executing). This value is only
2899      * an approximation, obtained by iterating across all threads in
2900      * the pool. This method may be useful for tuning task
2901      * granularities.
2902      *
2903      * @return the number of queued tasks
2904      */
2905     public long getQueuedTaskCount() {
2906         long count = 0;
2907         WorkQueue[] ws; WorkQueue w;
2908         if ((ws = workQueues) != null) {
2909             for (int i = 1; i < ws.length; i += 2) {
2910                 if ((w = ws[i]) != null)
2911                     count += w.queueSize();
2912             }
2913         }
2914         return count;
2915     }
2916 
2917     /**
2918      * Returns an estimate of the number of tasks submitted to this
2919      * pool that have not yet begun executing.  This method may take
2920      * time proportional to the number of submissions.
2921      *
2922      * @return the number of queued submissions
2923      */
2924     public int getQueuedSubmissionCount() {
2925         int count = 0;
2926         WorkQueue[] ws; WorkQueue w;
2927         if ((ws = workQueues) != null) {
2928             for (int i = 0; i < ws.length; i += 2) {
2929                 if ((w = ws[i]) != null)
2930                     count += w.queueSize();
2931             }
2932         }
2933         return count;
2934     }
2935 
2936     /**
2937      * Returns {@code true} if there are any tasks submitted to this
2938      * pool that have not yet begun executing.
2939      *
2940      * @return {@code true} if there are any queued submissions
2941      */
2942     public boolean hasQueuedSubmissions() {
2943         WorkQueue[] ws; WorkQueue w;
2944         if ((ws = workQueues) != null) {
2945             for (int i = 0; i < ws.length; i += 2) {
2946                 if ((w = ws[i]) != null && !w.isEmpty())
2947                     return true;
2948             }
2949         }
2950         return false;
2951     }
2952 
2953     /**
2954      * Removes and returns the next unexecuted submission if one is
2955      * available.  This method may be useful in extensions to this
2956      * class that re-assign work in systems with multiple pools.
2957      *
2958      * @return the next submission, or {@code null} if none
2959      */
2960     protected ForkJoinTask<?> pollSubmission() {
2961         WorkQueue[] ws; WorkQueue w; ForkJoinTask<?> t;
2962         if ((ws = workQueues) != null) {
2963             for (int i = 0; i < ws.length; i += 2) {
2964                 if ((w = ws[i]) != null && (t = w.poll()) != null)
2965                     return t;
2966             }
2967         }
2968         return null;
2969     }
2970 
2971     /**
2972      * Removes all available unexecuted submitted and forked tasks
2973      * from scheduling queues and adds them to the given collection,
2974      * without altering their execution status. These may include
2975      * artificially generated or wrapped tasks. This method is
2976      * designed to be invoked only when the pool is known to be
2977      * quiescent. Invocations at other times may not remove all
2978      * tasks. A failure encountered while attempting to add elements
2979      * to collection {@code c} may result in elements being in
2980      * neither, either or both collections when the associated
2981      * exception is thrown.  The behavior of this operation is
2982      * undefined if the specified collection is modified while the
2983      * operation is in progress.
2984      *
2985      * @param c the collection to transfer elements into
2986      * @return the number of elements transferred
2987      */
2988     protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
2989         int count = 0;
2990         WorkQueue[] ws; WorkQueue w; ForkJoinTask<?> t;
2991         if ((ws = workQueues) != null) {
2992             for (int i = 0; i < ws.length; ++i) {
2993                 if ((w = ws[i]) != null) {
2994                     while ((t = w.poll()) != null) {
2995                         c.add(t);
2996                         ++count;
2997                     }
2998                 }
2999             }
3000         }
3001         return count;
3002     }
3003 
3004     /**
3005      * Returns a string identifying this pool, as well as its state,
3006      * including indications of run state, parallelism level, and
3007      * worker and task counts.
3008      *
3009      * @return a string identifying this pool, as well as its state
3010      */
3011     public String toString() {
3012         // Use a single pass through workQueues to collect counts
3013         long qt = 0L, qs = 0L; int rc = 0;
3014         long st = stealCount;
3015         long c = ctl;
3016         WorkQueue[] ws; WorkQueue w;
3017         if ((ws = workQueues) != null) {
3018             for (int i = 0; i < ws.length; ++i) {
3019                 if ((w = ws[i]) != null) {
3020                     int size = w.queueSize();
3021                     if ((i & 1) == 0)
3022                         qs += size;
3023                     else {
3024                         qt += size;
3025                         st += w.nsteals;
3026                         if (w.isApparentlyUnblocked())
3027                             ++rc;
3028                     }
3029                 }
3030             }
3031         }
3032         int pc = (config & SMASK);
3033         int tc = pc + (short)(c >>> TC_SHIFT);
3034         int ac = pc + (int)(c >> AC_SHIFT);
3035         if (ac < 0) // ignore transient negative
3036             ac = 0;
3037         String level;
3038         if ((c & STOP_BIT) != 0)
3039             level = (tc == 0) ? "Terminated" : "Terminating";
3040         else
3041             level = plock < 0 ? "Shutting down" : "Running";
3042         return super.toString() +
3043             "[" + level +
3044             ", parallelism = " + pc +
3045             ", size = " + tc +
3046             ", active = " + ac +
3047             ", running = " + rc +
3048             ", steals = " + st +
3049             ", tasks = " + qt +
3050             ", submissions = " + qs +
3051             "]";
3052     }
3053 
3054     /**
3055      * Possibly initiates an orderly shutdown in which previously
3056      * submitted tasks are executed, but no new tasks will be
3057      * accepted. Invocation has no effect on execution state if this
3058      * is the {@link #commonPool}, and no additional effect if
3059      * already shut down.  Tasks that are in the process of being
3060      * submitted concurrently during the course of this method may or
3061      * may not be rejected.
3062      *
3063      * @throws SecurityException if a security manager exists and
3064      *         the caller is not permitted to modify threads
3065      *         because it does not hold {@link
3066      *         java.lang.RuntimePermission}{@code ("modifyThread")}
3067      */
3068     public void shutdown() {
3069         checkPermission();
3070         tryTerminate(false, true);
3071     }
3072 
3073     /**
3074      * Possibly attempts to cancel and/or stop all tasks, and reject
3075      * all subsequently submitted tasks.  Invocation has no effect on
3076      * execution state if this is the {@link #commonPool}, and no
3077      * additional effect if already shut down. Otherwise, tasks that
3078      * are in the process of being submitted or executed concurrently
3079      * during the course of this method may or may not be
3080      * rejected. This method cancels both existing and unexecuted
3081      * tasks, in order to permit termination in the presence of task
3082      * dependencies. So the method always returns an empty list
3083      * (unlike the case for some other Executors).
3084      *
3085      * @return an empty list
3086      * @throws SecurityException if a security manager exists and
3087      *         the caller is not permitted to modify threads
3088      *         because it does not hold {@link
3089      *         java.lang.RuntimePermission}{@code ("modifyThread")}
3090      */
3091     public List<Runnable> shutdownNow() {
3092         checkPermission();
3093         tryTerminate(true, true);
3094         return Collections.emptyList();
3095     }
3096 
3097     /**
3098      * Returns {@code true} if all tasks have completed following shut down.
3099      *
3100      * @return {@code true} if all tasks have completed following shut down
3101      */
3102     public boolean isTerminated() {
3103         long c = ctl;
3104         return ((c & STOP_BIT) != 0L &&
3105                 (short)(c >>> TC_SHIFT) == -(config & SMASK));
3106     }
3107 
3108     /**
3109      * Returns {@code true} if the process of termination has
3110      * commenced but not yet completed.  This method may be useful for
3111      * debugging. A return of {@code true} reported a sufficient
3112      * period after shutdown may indicate that submitted tasks have
3113      * ignored or suppressed interruption, or are waiting for I/O,
3114      * causing this executor not to properly terminate. (See the
3115      * advisory notes for class {@link ForkJoinTask} stating that
3116      * tasks should not normally entail blocking operations.  But if
3117      * they do, they must abort them on interrupt.)
3118      *
3119      * @return {@code true} if terminating but not yet terminated
3120      */
3121     public boolean isTerminating() {
3122         long c = ctl;
3123         return ((c & STOP_BIT) != 0L &&
3124                 (short)(c >>> TC_SHIFT) != -(config & SMASK));
3125     }
3126 
3127     /**
3128      * Returns {@code true} if this pool has been shut down.
3129      *
3130      * @return {@code true} if this pool has been shut down
3131      */
3132     public boolean isShutdown() {
3133         return plock < 0;
3134     }
3135 
3136     /**
3137      * Blocks until all tasks have completed execution after a
3138      * shutdown request, or the timeout occurs, or the current thread
3139      * is interrupted, whichever happens first. Note that the {@link
3140      * #commonPool()} never terminates until program shutdown so
3141      * this method will always time out.
3142      *
3143      * @param timeout the maximum time to wait
3144      * @param unit the time unit of the timeout argument
3145      * @return {@code true} if this executor terminated and
3146      *         {@code false} if the timeout elapsed before termination
3147      * @throws InterruptedException if interrupted while waiting
3148      */
3149     public boolean awaitTermination(long timeout, TimeUnit unit)
3150         throws InterruptedException {
3151         long nanos = unit.toNanos(timeout);
3152         if (isTerminated())
3153             return true;
3154         long startTime = System.nanoTime();
3155         boolean terminated = false;
3156         synchronized (this) {
3157             for (long waitTime = nanos, millis = 0L;;) {
3158                 if (terminated = isTerminated() ||
3159                     waitTime <= 0L ||
3160                     (millis = unit.toMillis(waitTime)) <= 0L)
3161                     break;
3162                 wait(millis);
3163                 waitTime = nanos - (System.nanoTime() - startTime);
3164             }
3165         }
3166         return terminated;
3167     }
3168 
3169     /**
3170      * Interface for extending managed parallelism for tasks running
3171      * in {@link ForkJoinPool}s.
3172      *
3173      * <p>A {@code ManagedBlocker} provides two methods.  Method
3174      * {@code isReleasable} must return {@code true} if blocking is
3175      * not necessary. Method {@code block} blocks the current thread
3176      * if necessary (perhaps internally invoking {@code isReleasable}
3177      * before actually blocking). These actions are performed by any
3178      * thread invoking {@link ForkJoinPool#managedBlock}.  The
3179      * unusual methods in this API accommodate synchronizers that may,
3180      * but don't usually, block for long periods. Similarly, they
3181      * allow more efficient internal handling of cases in which
3182      * additional workers may be, but usually are not, needed to
3183      * ensure sufficient parallelism.  Toward this end,
3184      * implementations of method {@code isReleasable} must be amenable
3185      * to repeated invocation.
3186      *
3187      * <p>For example, here is a ManagedBlocker based on a
3188      * ReentrantLock:
3189      *  <pre> {@code
3190      * class ManagedLocker implements ManagedBlocker {
3191      *   final ReentrantLock lock;
3192      *   boolean hasLock = false;
3193      *   ManagedLocker(ReentrantLock lock) { this.lock = lock; }
3194      *   public boolean block() {
3195      *     if (!hasLock)
3196      *       lock.lock();
3197      *     return true;
3198      *   }
3199      *   public boolean isReleasable() {
3200      *     return hasLock || (hasLock = lock.tryLock());
3201      *   }
3202      * }}</pre>
3203      *
3204      * <p>Here is a class that possibly blocks waiting for an
3205      * item on a given queue:
3206      *  <pre> {@code
3207      * class QueueTaker<E> implements ManagedBlocker {
3208      *   final BlockingQueue<E> queue;
3209      *   volatile E item = null;
3210      *   QueueTaker(BlockingQueue<E> q) { this.queue = q; }
3211      *   public boolean block() throws InterruptedException {
3212      *     if (item == null)
3213      *       item = queue.take();
3214      *     return true;
3215      *   }
3216      *   public boolean isReleasable() {
3217      *     return item != null || (item = queue.poll()) != null;
3218      *   }
3219      *   public E getItem() { // call after pool.managedBlock completes
3220      *     return item;
3221      *   }
3222      * }}</pre>
3223      */
3224     public static interface ManagedBlocker {
3225         /**
3226          * Possibly blocks the current thread, for example waiting for
3227          * a lock or condition.
3228          *
3229          * @return {@code true} if no additional blocking is necessary
3230          * (i.e., if isReleasable would return true)
3231          * @throws InterruptedException if interrupted while waiting
3232          * (the method is not required to do so, but is allowed to)
3233          */
3234         boolean block() throws InterruptedException;
3235 
3236         /**
3237          * Returns {@code true} if blocking is unnecessary.
3238          */
3239         boolean isReleasable();
3240     }
3241 
3242     /**
3243      * Blocks in accord with the given blocker.  If the current thread
3244      * is a {@link ForkJoinWorkerThread}, this method possibly
3245      * arranges for a spare thread to be activated if necessary to
3246      * ensure sufficient parallelism while the current thread is blocked.
3247      *
3248      * <p>If the caller is not a {@link ForkJoinTask}, this method is
3249      * behaviorally equivalent to
3250      *  <pre> {@code
3251      * while (!blocker.isReleasable())
3252      *   if (blocker.block())
3253      *     return;
3254      * }</pre>
3255      *
3256      * If the caller is a {@code ForkJoinTask}, then the pool may
3257      * first be expanded to ensure parallelism, and later adjusted.
3258      *
3259      * @param blocker the blocker
3260      * @throws InterruptedException if blocker.block did so
3261      */
3262     public static void managedBlock(ManagedBlocker blocker)
3263         throws InterruptedException {
3264         Thread t = Thread.currentThread();
3265         if (t instanceof ForkJoinWorkerThread) {
3266             ForkJoinPool p = ((ForkJoinWorkerThread)t).pool;
3267             while (!blocker.isReleasable()) { // variant of helpSignal
3268                 WorkQueue[] ws; WorkQueue q; int m, u;
3269                 if ((ws = p.workQueues) != null && (m = ws.length - 1) >= 0) {
3270                     for (int i = 0; i <= m; ++i) {
3271                         if (blocker.isReleasable())
3272                             return;
3273                         if ((q = ws[i]) != null && q.base - q.top < 0) {
3274                             p.signalWork(q);
3275                             if ((u = (int)(p.ctl >>> 32)) >= 0 ||
3276                                 (u >> UAC_SHIFT) >= 0)
3277                                 break;
3278                         }
3279                     }
3280                 }
3281                 if (p.tryCompensate()) {
3282                     try {
3283                         do {} while (!blocker.isReleasable() &&
3284                                      !blocker.block());
3285                     } finally {
3286                         p.incrementActiveCount();
3287                     }
3288                     break;
3289                 }
3290             }
3291         }
3292         else {
3293             do {} while (!blocker.isReleasable() &&
3294                          !blocker.block());
3295         }
3296     }
3297 
3298     // AbstractExecutorService overrides.  These rely on undocumented
3299     // fact that ForkJoinTask.adapt returns ForkJoinTasks that also
3300     // implement RunnableFuture.
3301 
3302     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
3303         return new ForkJoinTask.AdaptedRunnable<T>(runnable, value);
3304     }
3305 
3306     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
3307         return new ForkJoinTask.AdaptedCallable<T>(callable);
3308     }
3309 
3310     // Unsafe mechanics
3311     private static final sun.misc.Unsafe U;
3312     private static final long CTL;
3313     private static final long PARKBLOCKER;
3314     private static final int ABASE;
3315     private static final int ASHIFT;
3316     private static final long STEALCOUNT;
3317     private static final long PLOCK;
3318     private static final long INDEXSEED;
3319     private static final long QLOCK;
3320 
3321     static {
3322         int s; // initialize field offsets for CAS etc
3323         try {
3324             U = sun.misc.Unsafe.getUnsafe();
3325             Class<?> k = ForkJoinPool.class;
3326             CTL = U.objectFieldOffset
3327                 (k.getDeclaredField("ctl"));
3328             STEALCOUNT = U.objectFieldOffset
3329                 (k.getDeclaredField("stealCount"));
3330             PLOCK = U.objectFieldOffset
3331                 (k.getDeclaredField("plock"));
3332             INDEXSEED = U.objectFieldOffset
3333                 (k.getDeclaredField("indexSeed"));
3334             Class<?> tk = Thread.class;
3335             PARKBLOCKER = U.objectFieldOffset
3336                 (tk.getDeclaredField("parkBlocker"));
3337             Class<?> wk = WorkQueue.class;
3338             QLOCK = U.objectFieldOffset
3339                 (wk.getDeclaredField("qlock"));
3340             Class<?> ak = ForkJoinTask[].class;
3341             ABASE = U.arrayBaseOffset(ak);
3342             s = U.arrayIndexScale(ak);
3343             ASHIFT = 31 - Integer.numberOfLeadingZeros(s);
3344         } catch (Exception e) {
3345             throw new Error(e);
3346         }
3347         if ((s & (s-1)) != 0)
3348             throw new Error("data type scale not a power of two");
3349 
3350         submitters = new ThreadLocal<Submitter>();
3351         ForkJoinWorkerThreadFactory fac = defaultForkJoinWorkerThreadFactory =
3352             new DefaultForkJoinWorkerThreadFactory();
3353         modifyThreadPermission = new RuntimePermission("modifyThread");
3354 
3355         /*
3356          * Establish common pool parameters.  For extra caution,
3357          * computations to set up common pool state are here; the
3358          * constructor just assigns these values to fields.
3359          */
3360 
3361         int par = 0;
3362         Thread.UncaughtExceptionHandler handler = null;
3363         try {  // TBD: limit or report ignored exceptions?
3364             String pp = System.getProperty
3365                 ("java.util.concurrent.ForkJoinPool.common.parallelism");
3366             String hp = System.getProperty
3367                 ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
3368             String fp = System.getProperty
3369                 ("java.util.concurrent.ForkJoinPool.common.threadFactory");
3370             if (fp != null)
3371                 fac = ((ForkJoinWorkerThreadFactory)ClassLoader.
3372                        getSystemClassLoader().loadClass(fp).newInstance());
3373             if (hp != null)
3374                 handler = ((Thread.UncaughtExceptionHandler)ClassLoader.
3375                            getSystemClassLoader().loadClass(hp).newInstance());
3376             if (pp != null)
3377                 par = Integer.parseInt(pp);
3378         } catch (Exception ignore) {
3379         }
3380 
3381         if (par <= 0)
3382             par = Runtime.getRuntime().availableProcessors();
3383         if (par > MAX_CAP)
3384             par = MAX_CAP;
3385         commonPoolParallelism = par;
3386         long np = (long)(-par); // precompute initial ctl value
3387         long ct = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
3388 
3389         commonPool = new ForkJoinPool(par, ct, fac, handler);
3390     }
3391 
3392 }