1 /*
   2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   3  *
   4  * This code is free software; you can redistribute it and/or modify it
   5  * under the terms of the GNU General Public License version 2 only, as
   6  * published by the Free Software Foundation.  Oracle designates this
   7  * particular file as subject to the "Classpath" exception as provided
   8  * by Oracle in the LICENSE file that accompanied this code.
   9  *
  10  * This code is distributed in the hope that it will be useful, but WITHOUT
  11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  13  * version 2 for more details (a copy is included in the LICENSE file that
  14  * accompanied this code).
  15  *
  16  * You should have received a copy of the GNU General Public License version
  17  * 2 along with this work; if not, write to the Free Software Foundation,
  18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  19  *
  20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  21  * or visit www.oracle.com if you need additional information or have any
  22  * questions.
  23  */
  24 
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/publicdomain/zero/1.0/
  34  */
  35 
  36 package java.util.concurrent;
  37 
  38 import java.lang.Thread.UncaughtExceptionHandler;
  39 import java.lang.invoke.MethodHandles;
  40 import java.lang.invoke.VarHandle;
  41 import java.security.AccessController;
  42 import java.security.AccessControlContext;
  43 import java.security.Permission;
  44 import java.security.Permissions;
  45 import java.security.PrivilegedAction;
  46 import java.security.ProtectionDomain;
  47 import java.util.ArrayList;
  48 import java.util.Collection;
  49 import java.util.Collections;
  50 import java.util.List;
  51 import java.util.function.Predicate;
  52 import java.util.concurrent.locks.LockSupport;
  53 
  54 /**
  55  * An {@link ExecutorService} for running {@link ForkJoinTask}s.
  56  * A {@code ForkJoinPool} provides the entry point for submissions
  57  * from non-{@code ForkJoinTask} clients, as well as management and
  58  * monitoring operations.
  59  *
  60  * <p>A {@code ForkJoinPool} differs from other kinds of {@link
  61  * ExecutorService} mainly by virtue of employing
  62  * <em>work-stealing</em>: all threads in the pool attempt to find and
  63  * execute tasks submitted to the pool and/or created by other active
  64  * tasks (eventually blocking waiting for work if none exist). This
  65  * enables efficient processing when most tasks spawn other subtasks
  66  * (as do most {@code ForkJoinTask}s), as well as when many small
  67  * tasks are submitted to the pool from external clients.  Especially
  68  * when setting <em>asyncMode</em> to true in constructors, {@code
  69  * ForkJoinPool}s may also be appropriate for use with event-style
  70  * tasks that are never joined. All worker threads are initialized
  71  * with {@link Thread#isDaemon} set {@code true}.
  72  *
  73  * <p>A static {@link #commonPool()} is available and appropriate for
  74  * most applications. The common pool is used by any ForkJoinTask that
  75  * is not explicitly submitted to a specified pool. Using the common
  76  * pool normally reduces resource usage (its threads are slowly
  77  * reclaimed during periods of non-use, and reinstated upon subsequent
  78  * use).
  79  *
  80  * <p>For applications that require separate or custom pools, a {@code
  81  * ForkJoinPool} may be constructed with a given target parallelism
  82  * level; by default, equal to the number of available processors.
  83  * The pool attempts to maintain enough active (or available) threads
  84  * by dynamically adding, suspending, or resuming internal worker
  85  * threads, even if some tasks are stalled waiting to join others.
  86  * However, no such adjustments are guaranteed in the face of blocked
  87  * I/O or other unmanaged synchronization. The nested {@link
  88  * ManagedBlocker} interface enables extension of the kinds of
  89  * synchronization accommodated. The default policies may be
  90  * overridden using a constructor with parameters corresponding to
  91  * those documented in class {@link ThreadPoolExecutor}.
  92  *
  93  * <p>In addition to execution and lifecycle control methods, this
  94  * class provides status check methods (for example
  95  * {@link #getStealCount}) that are intended to aid in developing,
  96  * tuning, and monitoring fork/join applications. Also, method
  97  * {@link #toString} returns indications of pool state in a
  98  * convenient form for informal monitoring.
  99  *
 100  * <p>As is the case with other ExecutorServices, there are three
 101  * main task execution methods summarized in the following table.
 102  * These are designed to be used primarily by clients not already
 103  * engaged in fork/join computations in the current pool.  The main
 104  * forms of these methods accept instances of {@code ForkJoinTask},
 105  * but overloaded forms also allow mixed execution of plain {@code
 106  * Runnable}- or {@code Callable}- based activities as well.  However,
 107  * tasks that are already executing in a pool should normally instead
 108  * use the within-computation forms listed in the table unless using
 109  * async event-style tasks that are not usually joined, in which case
 110  * there is little difference among choice of methods.
 111  *
 112  * <table class="plain">
 113  * <caption>Summary of task execution methods</caption>
 114  *  <tr>
 115  *    <td></td>
 116  *    <td style="text-align:center"> <b>Call from non-fork/join clients</b></td>
 117  *    <td style="text-align:center"> <b>Call from within fork/join computations</b></td>
 118  *  </tr>
 119  *  <tr>
 120  *    <td> <b>Arrange async execution</b></td>
 121  *    <td> {@link #execute(ForkJoinTask)}</td>
 122  *    <td> {@link ForkJoinTask#fork}</td>
 123  *  </tr>
 124  *  <tr>
 125  *    <td> <b>Await and obtain result</b></td>
 126  *    <td> {@link #invoke(ForkJoinTask)}</td>
 127  *    <td> {@link ForkJoinTask#invoke}</td>
 128  *  </tr>
 129  *  <tr>
 130  *    <td> <b>Arrange exec and obtain Future</b></td>
 131  *    <td> {@link #submit(ForkJoinTask)}</td>
 132  *    <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td>
 133  *  </tr>
 134  * </table>
 135  *
 136  * <p>The parameters used to construct the common pool may be controlled by
 137  * setting the following {@linkplain System#getProperty system properties}:
 138  * <ul>
 139  * <li>{@code java.util.concurrent.ForkJoinPool.common.parallelism}
 140  * - the parallelism level, a non-negative integer
 141  * <li>{@code java.util.concurrent.ForkJoinPool.common.threadFactory}
 142  * - the class name of a {@link ForkJoinWorkerThreadFactory}.
 143  * The {@linkplain ClassLoader#getSystemClassLoader() system class loader}
 144  * is used to load this class.
 145  * <li>{@code java.util.concurrent.ForkJoinPool.common.exceptionHandler}
 146  * - the class name of a {@link UncaughtExceptionHandler}.
 147  * The {@linkplain ClassLoader#getSystemClassLoader() system class loader}
 148  * is used to load this class.
 149  * <li>{@code java.util.concurrent.ForkJoinPool.common.maximumSpares}
 150  * - the maximum number of allowed extra threads to maintain target
 151  * parallelism (default 256).
 152  * </ul>
 153  * If no thread factory is supplied via a system property, then the
 154  * common pool uses a factory that uses the system class loader as the
 155  * {@linkplain Thread#getContextClassLoader() thread context class loader}.
 156  * In addition, if a {@link SecurityManager} is present, then
 157  * the common pool uses a factory supplying threads that have no
 158  * {@link Permissions} enabled.
 159  *
 160  * Upon any error in establishing these settings, default parameters
 161  * are used. It is possible to disable or limit the use of threads in
 162  * the common pool by setting the parallelism property to zero, and/or
 163  * using a factory that may return {@code null}. However doing so may
 164  * cause unjoined tasks to never be executed.
 165  *
 166  * <p><b>Implementation notes</b>: This implementation restricts the
 167  * maximum number of running threads to 32767. Attempts to create
 168  * pools with greater than the maximum number result in
 169  * {@code IllegalArgumentException}.
 170  *
 171  * <p>This implementation rejects submitted tasks (that is, by throwing
 172  * {@link RejectedExecutionException}) only when the pool is shut down
 173  * or internal resources have been exhausted.
 174  *
 175  * @since 1.7
 176  * @author Doug Lea
 177  */
 178 public class ForkJoinPool extends AbstractExecutorService {
 179 
 180     /*
 181      * Implementation Overview
 182      *
 183      * This class and its nested classes provide the main
 184      * functionality and control for a set of worker threads:
 185      * Submissions from non-FJ threads enter into submission queues.
 186      * Workers take these tasks and typically split them into subtasks
 187      * that may be stolen by other workers.  Preference rules give
 188      * first priority to processing tasks from their own queues (LIFO
 189      * or FIFO, depending on mode), then to randomized FIFO steals of
 190      * tasks in other queues.  This framework began as vehicle for
 191      * supporting tree-structured parallelism using work-stealing.
 192      * Over time, its scalability advantages led to extensions and
 193      * changes to better support more diverse usage contexts.  Because
 194      * most internal methods and nested classes are interrelated,
 195      * their main rationale and descriptions are presented here;
 196      * individual methods and nested classes contain only brief
 197      * comments about details.
 198      *
 199      * WorkQueues
 200      * ==========
 201      *
 202      * Most operations occur within work-stealing queues (in nested
 203      * class WorkQueue).  These are special forms of Deques that
 204      * support only three of the four possible end-operations -- push,
 205      * pop, and poll (aka steal), under the further constraints that
 206      * push and pop are called only from the owning thread (or, as
 207      * extended here, under a lock), while poll may be called from
 208      * other threads.  (If you are unfamiliar with them, you probably
 209      * want to read Herlihy and Shavit's book "The Art of
 210      * Multiprocessor programming", chapter 16 describing these in
 211      * more detail before proceeding.)  The main work-stealing queue
 212      * design is roughly similar to those in the papers "Dynamic
 213      * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
 214      * (http://research.sun.com/scalable/pubs/index.html) and
 215      * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
 216      * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
 217      * The main differences ultimately stem from GC requirements that
 218      * we null out taken slots as soon as we can, to maintain as small
 219      * a footprint as possible even in programs generating huge
 220      * numbers of tasks. To accomplish this, we shift the CAS
 221      * arbitrating pop vs poll (steal) from being on the indices
 222      * ("base" and "top") to the slots themselves.
 223      *
 224      * Adding tasks then takes the form of a classic array push(task)
 225      * in a circular buffer:
 226      *    q.array[q.top++ % length] = task;
 227      *
 228      * (The actual code needs to null-check and size-check the array,
 229      * uses masking, not mod, for indexing a power-of-two-sized array,
 230      * properly fences accesses, and possibly signals waiting workers
 231      * to start scanning -- see below.)  Both a successful pop and
 232      * poll mainly entail a CAS of a slot from non-null to null.
 233      *
 234      * The pop operation (always performed by owner) is:
 235      *   if ((the task at top slot is not null) and
 236      *        (CAS slot to null))
 237      *           decrement top and return task;
 238      *
 239      * And the poll operation (usually by a stealer) is
 240      *    if ((the task at base slot is not null) and
 241      *        (CAS slot to null))
 242      *           increment base and return task;
 243      *
 244      * There are several variants of each of these. In particular,
 245      * almost all uses of poll occur within scan operations that also
 246      * interleave contention tracking (with associated code sprawl.)
 247      *
 248      * Memory ordering.  See "Correct and Efficient Work-Stealing for
 249      * Weak Memory Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013
 250      * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an
 251      * analysis of memory ordering requirements in work-stealing
 252      * algorithms similar to (but different than) the one used here.
 253      * Extracting tasks in array slots via (fully fenced) CAS provides
 254      * primary synchronization. The base and top indices imprecisely
 255      * guide where to extract from. We do not always require strict
 256      * orderings of array and index updates, so sometimes let them be
 257      * subject to compiler and processor reorderings. However, the
 258      * volatile "base" index also serves as a basis for memory
 259      * ordering: Slot accesses are preceded by a read of base,
 260      * ensuring happens-before ordering with respect to stealers (so
 261      * the slots themselves can be read via plain array reads.)  The
 262      * only other memory orderings relied on are maintained in the
 263      * course of signalling and activation (see below).  A check that
 264      * base == top indicates (momentary) emptiness, but otherwise may
 265      * err on the side of possibly making the queue appear nonempty
 266      * when a push, pop, or poll have not fully committed, or making
 267      * it appear empty when an update of top has not yet been visibly
 268      * written.  (Method isEmpty() checks the case of a partially
 269      * completed removal of the last element.)  Because of this, the
 270      * poll operation, considered individually, is not wait-free. One
 271      * thief cannot successfully continue until another in-progress
 272      * one (or, if previously empty, a push) visibly completes.
 273      * However, in the aggregate, we ensure at least probabilistic
 274      * non-blockingness.  If an attempted steal fails, a scanning
 275      * thief chooses a different random victim target to try next. So,
 276      * in order for one thief to progress, it suffices for any
 277      * in-progress poll or new push on any empty queue to
 278      * complete.
 279      *
 280      * This approach also enables support of a user mode in which
 281      * local task processing is in FIFO, not LIFO order, simply by
 282      * using poll rather than pop.  This can be useful in
 283      * message-passing frameworks in which tasks are never joined.
 284      *
 285      * WorkQueues are also used in a similar way for tasks submitted
 286      * to the pool. We cannot mix these tasks in the same queues used
 287      * by workers. Instead, we randomly associate submission queues
 288      * with submitting threads, using a form of hashing.  The
 289      * ThreadLocalRandom probe value serves as a hash code for
 290      * choosing existing queues, and may be randomly repositioned upon
 291      * contention with other submitters.  In essence, submitters act
 292      * like workers except that they are restricted to executing local
 293      * tasks that they submitted.  Insertion of tasks in shared mode
 294      * requires a lock but we use only a simple spinlock (using field
 295      * phase), because submitters encountering a busy queue move to a
 296      * different position to use or create other queues -- they block
 297      * only when creating and registering new queues. Because it is
 298      * used only as a spinlock, unlocking requires only a "releasing"
 299      * store (using setRelease).
 300      *
 301      * Management
 302      * ==========
 303      *
 304      * The main throughput advantages of work-stealing stem from
 305      * decentralized control -- workers mostly take tasks from
 306      * themselves or each other, at rates that can exceed a billion
 307      * per second.  The pool itself creates, activates (enables
 308      * scanning for and running tasks), deactivates, blocks, and
 309      * terminates threads, all with minimal central information.
 310      * There are only a few properties that we can globally track or
 311      * maintain, so we pack them into a small number of variables,
 312      * often maintaining atomicity without blocking or locking.
 313      * Nearly all essentially atomic control state is held in a few
 314      * volatile variables that are by far most often read (not
 315      * written) as status and consistency checks. We pack as much
 316      * information into them as we can.
 317      *
 318      * Field "ctl" contains 64 bits holding information needed to
 319      * atomically decide to add, enqueue (on an event queue), and
 320      * dequeue (and release)-activate workers.  To enable this
 321      * packing, we restrict maximum parallelism to (1<<15)-1 (which is
 322      * far in excess of normal operating range) to allow ids, counts,
 323      * and their negations (used for thresholding) to fit into 16bit
 324      * subfields.
 325      *
 326      * Field "mode" holds configuration parameters as well as lifetime
 327      * status, atomically and monotonically setting SHUTDOWN, STOP,
 328      * and finally TERMINATED bits.
 329      *
 330      * Field "workQueues" holds references to WorkQueues.  It is
 331      * updated (only during worker creation and termination) under
 332      * lock (using field workerNamePrefix as lock), but is otherwise
 333      * concurrently readable, and accessed directly. We also ensure
 334      * that uses of the array reference itself never become too stale
 335      * in case of resizing.  To simplify index-based operations, the
 336      * array size is always a power of two, and all readers must
 337      * tolerate null slots. Worker queues are at odd indices. Shared
 338      * (submission) queues are at even indices, up to a maximum of 64
 339      * slots, to limit growth even if array needs to expand to add
 340      * more workers. Grouping them together in this way simplifies and
 341      * speeds up task scanning.
 342      *
 343      * All worker thread creation is on-demand, triggered by task
 344      * submissions, replacement of terminated workers, and/or
 345      * compensation for blocked workers. However, all other support
 346      * code is set up to work with other policies.  To ensure that we
 347      * do not hold on to worker references that would prevent GC, all
 348      * accesses to workQueues are via indices into the workQueues
 349      * array (which is one source of some of the messy code
 350      * constructions here). In essence, the workQueues array serves as
 351      * a weak reference mechanism. Thus for example the stack top
 352      * subfield of ctl stores indices, not references.
 353      *
 354      * Queuing Idle Workers. Unlike HPC work-stealing frameworks, we
 355      * cannot let workers spin indefinitely scanning for tasks when
 356      * none can be found immediately, and we cannot start/resume
 357      * workers unless there appear to be tasks available.  On the
 358      * other hand, we must quickly prod them into action when new
 359      * tasks are submitted or generated. In many usages, ramp-up time
 360      * is the main limiting factor in overall performance, which is
 361      * compounded at program start-up by JIT compilation and
 362      * allocation. So we streamline this as much as possible.
 363      *
 364      * The "ctl" field atomically maintains total worker and
 365      * "released" worker counts, plus the head of the available worker
 366      * queue (actually stack, represented by the lower 32bit subfield
 367      * of ctl).  Released workers are those known to be scanning for
 368      * and/or running tasks. Unreleased ("available") workers are
 369      * recorded in the ctl stack. These workers are made available for
 370      * signalling by enqueuing in ctl (see method runWorker).  The
 371      * "queue" is a form of Treiber stack. This is ideal for
 372      * activating threads in most-recently used order, and improves
 373      * performance and locality, outweighing the disadvantages of
 374      * being prone to contention and inability to release a worker
 375      * unless it is topmost on stack.  To avoid missed signal problems
 376      * inherent in any wait/signal design, available workers rescan
 377      * for (and if found run) tasks after enqueuing.  Normally their
 378      * release status will be updated while doing so, but the released
 379      * worker ctl count may underestimate the number of active
 380      * threads. (However, it is still possible to determine quiescence
 381      * via a validation traversal -- see isQuiescent).  After an
 382      * unsuccessful rescan, available workers are blocked until
 383      * signalled (see signalWork).  The top stack state holds the
 384      * value of the "phase" field of the worker: its index and status,
 385      * plus a version counter that, in addition to the count subfields
 386      * (also serving as version stamps) provide protection against
 387      * Treiber stack ABA effects.
 388      *
 389      * Creating workers. To create a worker, we pre-increment counts
 390      * (serving as a reservation), and attempt to construct a
 391      * ForkJoinWorkerThread via its factory. Upon construction, the
 392      * new thread invokes registerWorker, where it constructs a
 393      * WorkQueue and is assigned an index in the workQueues array
 394      * (expanding the array if necessary). The thread is then started.
 395      * Upon any exception across these steps, or null return from
 396      * factory, deregisterWorker adjusts counts and records
 397      * accordingly.  If a null return, the pool continues running with
 398      * fewer than the target number workers. If exceptional, the
 399      * exception is propagated, generally to some external caller.
 400      * Worker index assignment avoids the bias in scanning that would
 401      * occur if entries were sequentially packed starting at the front
 402      * of the workQueues array. We treat the array as a simple
 403      * power-of-two hash table, expanding as needed. The seedIndex
 404      * increment ensures no collisions until a resize is needed or a
 405      * worker is deregistered and replaced, and thereafter keeps
 406      * probability of collision low. We cannot use
 407      * ThreadLocalRandom.getProbe() for similar purposes here because
 408      * the thread has not started yet, but do so for creating
 409      * submission queues for existing external threads (see
 410      * externalPush).
 411      *
 412      * WorkQueue field "phase" is used by both workers and the pool to
 413      * manage and track whether a worker is UNSIGNALLED (possibly
 414      * blocked waiting for a signal).  When a worker is enqueued its
 415      * phase field is set. Note that phase field updates lag queue CAS
 416      * releases so usage requires care -- seeing a negative phase does
 417      * not guarantee that the worker is available. When queued, the
 418      * lower 16 bits of scanState must hold its pool index. So we
 419      * place the index there upon initialization (see registerWorker)
 420      * and otherwise keep it there or restore it when necessary.
 421      *
 422      * The ctl field also serves as the basis for memory
 423      * synchronization surrounding activation. This uses a more
 424      * efficient version of a Dekker-like rule that task producers and
 425      * consumers sync with each other by both writing/CASing ctl (even
 426      * if to its current value).  This would be extremely costly. So
 427      * we relax it in several ways: (1) Producers only signal when
 428      * their queue is empty. Other workers propagate this signal (in
 429      * method scan) when they find tasks; to further reduce flailing,
 430      * each worker signals only one other per activation. (2) Workers
 431      * only enqueue after scanning (see below) and not finding any
 432      * tasks.  (3) Rather than CASing ctl to its current value in the
 433      * common case where no action is required, we reduce write
 434      * contention by equivalently prefacing signalWork when called by
 435      * an external task producer using a memory access with
 436      * full-volatile semantics or a "fullFence".
 437      *
 438      * Almost always, too many signals are issued. A task producer
 439      * cannot in general tell if some existing worker is in the midst
 440      * of finishing one task (or already scanning) and ready to take
 441      * another without being signalled. So the producer might instead
 442      * activate a different worker that does not find any work, and
 443      * then inactivates. This scarcely matters in steady-state
 444      * computations involving all workers, but can create contention
 445      * and bookkeeping bottlenecks during ramp-up, ramp-down, and small
 446      * computations involving only a few workers.
 447      *
 448      * Scanning. Method runWorker performs top-level scanning for
 449      * tasks.  Each scan traverses and tries to poll from each queue
 450      * starting at a random index and circularly stepping. Scans are
 451      * not performed in ideal random permutation order, to reduce
 452      * cacheline contention.  The pseudorandom generator need not have
 453      * high-quality statistical properties in the long term, but just
 454      * within computations; We use Marsaglia XorShifts (often via
 455      * ThreadLocalRandom.nextSecondarySeed), which are cheap and
 456      * suffice. Scanning also employs contention reduction: When
 457      * scanning workers fail to extract an apparently existing task,
 458      * they soon restart at a different pseudorandom index.  This
 459      * improves throughput when many threads are trying to take tasks
 460      * from few queues, which can be common in some usages.  Scans do
 461      * not otherwise explicitly take into account core affinities,
 462      * loads, cache localities, etc, However, they do exploit temporal
 463      * locality (which usually approximates these) by preferring to
 464      * re-poll (at most #workers times) from the same queue after a
 465      * successful poll before trying others.
 466      *
 467      * Trimming workers. To release resources after periods of lack of
 468      * use, a worker starting to wait when the pool is quiescent will
 469      * time out and terminate (see method scan) if the pool has
 470      * remained quiescent for period given by field keepAlive.
 471      *
 472      * Shutdown and Termination. A call to shutdownNow invokes
 473      * tryTerminate to atomically set a runState bit. The calling
 474      * thread, as well as every other worker thereafter terminating,
 475      * helps terminate others by cancelling their unprocessed tasks,
 476      * and waking them up, doing so repeatedly until stable. Calls to
 477      * non-abrupt shutdown() preface this by checking whether
 478      * termination should commence by sweeping through queues (until
 479      * stable) to ensure lack of in-flight submissions and workers
 480      * about to process them before triggering the "STOP" phase of
 481      * termination.
 482      *
 483      * Joining Tasks
 484      * =============
 485      *
 486      * Any of several actions may be taken when one worker is waiting
 487      * to join a task stolen (or always held) by another.  Because we
 488      * are multiplexing many tasks on to a pool of workers, we can't
 489      * always just let them block (as in Thread.join).  We also cannot
 490      * just reassign the joiner's run-time stack with another and
 491      * replace it later, which would be a form of "continuation", that
 492      * even if possible is not necessarily a good idea since we may
 493      * need both an unblocked task and its continuation to progress.
 494      * Instead we combine two tactics:
 495      *
 496      *   Helping: Arranging for the joiner to execute some task that it
 497      *      would be running if the steal had not occurred.
 498      *
 499      *   Compensating: Unless there are already enough live threads,
 500      *      method tryCompensate() may create or re-activate a spare
 501      *      thread to compensate for blocked joiners until they unblock.
 502      *
 503      * A third form (implemented in tryRemoveAndExec) amounts to
 504      * helping a hypothetical compensator: If we can readily tell that
 505      * a possible action of a compensator is to steal and execute the
 506      * task being joined, the joining thread can do so directly,
 507      * without the need for a compensation thread.
 508      *
 509      * The ManagedBlocker extension API can't use helping so relies
 510      * only on compensation in method awaitBlocker.
 511      *
 512      * The algorithm in awaitJoin entails a form of "linear helping".
 513      * Each worker records (in field source) the id of the queue from
 514      * which it last stole a task.  The scan in method awaitJoin uses
 515      * these markers to try to find a worker to help (i.e., steal back
 516      * a task from and execute it) that could hasten completion of the
 517      * actively joined task.  Thus, the joiner executes a task that
 518      * would be on its own local deque if the to-be-joined task had
 519      * not been stolen. This is a conservative variant of the approach
 520      * described in Wagner & Calder "Leapfrogging: a portable
 521      * technique for implementing efficient futures" SIGPLAN Notices,
 522      * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs
 523      * mainly in that we only record queue ids, not full dependency
 524      * links.  This requires a linear scan of the workQueues array to
 525      * locate stealers, but isolates cost to when it is needed, rather
 526      * than adding to per-task overhead. Searches can fail to locate
 527      * stealers GC stalls and the like delay recording sources.
 528      * Further, even when accurately identified, stealers might not
 529      * ever produce a task that the joiner can in turn help with. So,
 530      * compensation is tried upon failure to find tasks to run.
 531      *
 532      * Compensation does not by default aim to keep exactly the target
 533      * parallelism number of unblocked threads running at any given
 534      * time. Some previous versions of this class employed immediate
 535      * compensations for any blocked join. However, in practice, the
 536      * vast majority of blockages are transient byproducts of GC and
 537      * other JVM or OS activities that are made worse by replacement.
 538      * Rather than impose arbitrary policies, we allow users to
 539      * override the default of only adding threads upon apparent
 540      * starvation.  The compensation mechanism may also be bounded.
 541      * Bounds for the commonPool (see COMMON_MAX_SPARES) better enable
 542      * JVMs to cope with programming errors and abuse before running
 543      * out of resources to do so.
 544      *
 545      * Common Pool
 546      * ===========
 547      *
 548      * The static common pool always exists after static
 549      * initialization.  Since it (or any other created pool) need
 550      * never be used, we minimize initial construction overhead and
 551      * footprint to the setup of about a dozen fields.
 552      *
 553      * When external threads submit to the common pool, they can
 554      * perform subtask processing (see externalHelpComplete and
 555      * related methods) upon joins.  This caller-helps policy makes it
 556      * sensible to set common pool parallelism level to one (or more)
 557      * less than the total number of available cores, or even zero for
 558      * pure caller-runs.  We do not need to record whether external
 559      * submissions are to the common pool -- if not, external help
 560      * methods return quickly. These submitters would otherwise be
 561      * blocked waiting for completion, so the extra effort (with
 562      * liberally sprinkled task status checks) in inapplicable cases
 563      * amounts to an odd form of limited spin-wait before blocking in
 564      * ForkJoinTask.join.
 565      *
 566      * As a more appropriate default in managed environments, unless
 567      * overridden by system properties, we use workers of subclass
 568      * InnocuousForkJoinWorkerThread when there is a SecurityManager
 569      * present. These workers have no permissions set, do not belong
 570      * to any user-defined ThreadGroup, and erase all ThreadLocals
 571      * after executing any top-level task (see
 572      * WorkQueue.afterTopLevelExec).  The associated mechanics (mainly
 573      * in ForkJoinWorkerThread) may be JVM-dependent and must access
 574      * particular Thread class fields to achieve this effect.
 575      *
 576      * Style notes
 577      * ===========
 578      *
 579      * Memory ordering relies mainly on VarHandles.  This can be
 580      * awkward and ugly, but also reflects the need to control
 581      * outcomes across the unusual cases that arise in very racy code
 582      * with very few invariants. All fields are read into locals
 583      * before use, and null-checked if they are references.  This is
 584      * usually done in a "C"-like style of listing declarations at the
 585      * heads of methods or blocks, and using inline assignments on
 586      * first encounter.  Nearly all explicit checks lead to
 587      * bypass/return, not exception throws, because they may
 588      * legitimately arise due to cancellation/revocation during
 589      * shutdown.
 590      *
 591      * There is a lot of representation-level coupling among classes
 592      * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask.  The
 593      * fields of WorkQueue maintain data structures managed by
 594      * ForkJoinPool, so are directly accessed.  There is little point
 595      * trying to reduce this, since any associated future changes in
 596      * representations will need to be accompanied by algorithmic
 597      * changes anyway. Several methods intrinsically sprawl because
 598      * they must accumulate sets of consistent reads of fields held in
 599      * local variables.  There are also other coding oddities
 600      * (including several unnecessary-looking hoisted null checks)
 601      * that help some methods perform reasonably even when interpreted
 602      * (not compiled).
 603      *
 604      * The order of declarations in this file is (with a few exceptions):
 605      * (1) Static utility functions
 606      * (2) Nested (static) classes
 607      * (3) Static fields
 608      * (4) Fields, along with constants used when unpacking some of them
 609      * (5) Internal control methods
 610      * (6) Callbacks and other support for ForkJoinTask methods
 611      * (7) Exported methods
 612      * (8) Static block initializing statics in minimally dependent order
 613      */
 614 
 615     // Static utilities
 616 
 617     /**
 618      * If there is a security manager, makes sure caller has
 619      * permission to modify threads.
 620      */
 621     private static void checkPermission() {
 622         SecurityManager security = System.getSecurityManager();
 623         if (security != null)
 624             security.checkPermission(modifyThreadPermission);
 625     }
 626 
 627     // Nested classes
 628 
 629     /**
 630      * Factory for creating new {@link ForkJoinWorkerThread}s.
 631      * A {@code ForkJoinWorkerThreadFactory} must be defined and used
 632      * for {@code ForkJoinWorkerThread} subclasses that extend base
 633      * functionality or initialize threads with different contexts.
 634      */
 635     public static interface ForkJoinWorkerThreadFactory {
 636         /**
 637          * Returns a new worker thread operating in the given pool.
 638          * Returning null or throwing an exception may result in tasks
 639          * never being executed.  If this method throws an exception,
 640          * it is relayed to the caller of the method (for example
 641          * {@code execute}) causing attempted thread creation. If this
 642          * method returns null or throws an exception, it is not
 643          * retried until the next attempted creation (for example
 644          * another call to {@code execute}).
 645          *
 646          * @param pool the pool this thread works in
 647          * @return the new worker thread, or {@code null} if the request
 648          *         to create a thread is rejected
 649          * @throws NullPointerException if the pool is null
 650          */
 651         public ForkJoinWorkerThread newThread(ForkJoinPool pool);
 652     }
 653 
 654     static AccessControlContext contextWithPermissions(Permission ... perms) {
 655         Permissions permissions = new Permissions();
 656         for (Permission perm : perms)
 657             permissions.add(perm);
 658         return new AccessControlContext(
 659             new ProtectionDomain[] { new ProtectionDomain(null, permissions) });
 660     }
 661 
 662     /**
 663      * Default ForkJoinWorkerThreadFactory implementation; creates a
 664      * new ForkJoinWorkerThread using the system class loader as the
 665      * thread context class loader.
 666      */
 667     private static final class DefaultForkJoinWorkerThreadFactory
 668         implements ForkJoinWorkerThreadFactory {
 669         private static final AccessControlContext ACC = contextWithPermissions(
 670             new RuntimePermission("getClassLoader"),
 671             new RuntimePermission("setContextClassLoader"));
 672 
 673         public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
 674             return AccessController.doPrivileged(
 675                 new PrivilegedAction<>() {
 676                     public ForkJoinWorkerThread run() {
 677                         return new ForkJoinWorkerThread(
 678                             pool, ClassLoader.getSystemClassLoader()); }},
 679                 ACC);
 680         }
 681     }
 682 
 683     // Constants shared across ForkJoinPool and WorkQueue
 684 
 685     // Bounds
 686     static final int SWIDTH       = 16;            // width of short
 687     static final int SMASK        = 0xffff;        // short bits == max index
 688     static final int MAX_CAP      = 0x7fff;        // max #workers - 1
 689     static final int SQMASK       = 0x007e;        // max 64 (even) slots
 690 
 691     // Masks and units for WorkQueue.phase and ctl sp subfield
 692     static final int UNSIGNALLED  = 1 << 31;       // must be negative
 693     static final int SS_SEQ       = 1 << 16;       // version count
 694     static final int QLOCK        = 1;             // must be 1
 695 
 696     // Mode bits and sentinels, some also used in WorkQueue id and.source fields
 697     static final int OWNED        = 1;             // queue has owner thread
 698     static final int FIFO         = 1 << 16;       // fifo queue or access mode
 699     static final int SHUTDOWN     = 1 << 18;
 700     static final int TERMINATED   = 1 << 19;
 701     static final int STOP         = 1 << 31;       // must be negative
 702     static final int QUIET        = 1 << 30;       // not scanning or working
 703     static final int DORMANT      = QUIET | UNSIGNALLED;
 704 
 705     /**
 706      * The maximum number of local polls from the same queue before
 707      * checking others. This is a safeguard against infinitely unfair
 708      * looping under unbounded user task recursion, and must be larger
 709      * than plausible cases of intentional bounded task recursion.
 710      */
 711     static final int POLL_LIMIT = 1 << 10;
 712 
 713     /**
 714      * Queues supporting work-stealing as well as external task
 715      * submission. See above for descriptions and algorithms.
 716      * Performance on most platforms is very sensitive to placement of
 717      * instances of both WorkQueues and their arrays -- we absolutely
 718      * do not want multiple WorkQueue instances or multiple queue
 719      * arrays sharing cache lines. The @Contended annotation alerts
 720      * JVMs to try to keep instances apart.
 721      */
 722     @jdk.internal.vm.annotation.Contended
 723     static final class WorkQueue {
 724 
 725         /**
 726          * Capacity of work-stealing queue array upon initialization.
 727          * Must be a power of two; at least 4, but should be larger to
 728          * reduce or eliminate cacheline sharing among queues.
 729          * Currently, it is much larger, as a partial workaround for
 730          * the fact that JVMs often place arrays in locations that
 731          * share GC bookkeeping (especially cardmarks) such that
 732          * per-write accesses encounter serious memory contention.
 733          */
 734         static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
 735 
 736         /**
 737          * Maximum size for queue arrays. Must be a power of two less
 738          * than or equal to 1 << (31 - width of array entry) to ensure
 739          * lack of wraparound of index calculations, but defined to a
 740          * value a bit less than this to help users trap runaway
 741          * programs before saturating systems.
 742          */
 743         static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
 744 
 745         // Instance fields
 746         volatile int phase;        // versioned, negative: queued, 1: locked
 747         int stackPred;             // pool stack (ctl) predecessor link
 748         int nsteals;               // number of steals
 749         int id;                    // index, mode, tag
 750         volatile int source;       // source queue id, or sentinel
 751         volatile int base;         // index of next slot for poll
 752         int top;                   // index of next slot for push
 753         ForkJoinTask<?>[] array;   // the elements (initially unallocated)
 754         final ForkJoinPool pool;   // the containing pool (may be null)
 755         final ForkJoinWorkerThread owner; // owning thread or null if shared
 756 
 757         WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) {
 758             this.pool = pool;
 759             this.owner = owner;
 760             // Place indices in the center of array (that is not yet allocated)
 761             base = top = INITIAL_QUEUE_CAPACITY >>> 1;
 762         }
 763 
 764         /**
 765          * Returns an exportable index (used by ForkJoinWorkerThread).
 766          */
 767         final int getPoolIndex() {
 768             return (id & 0xffff) >>> 1; // ignore odd/even tag bit
 769         }
 770 
 771         /**
 772          * Returns the approximate number of tasks in the queue.
 773          */
 774         final int queueSize() {
 775             int n = base - top;       // read base first
 776             return (n >= 0) ? 0 : -n; // ignore transient negative
 777         }
 778 
 779         /**
 780          * Provides a more accurate estimate of whether this queue has
 781          * any tasks than does queueSize, by checking whether a
 782          * near-empty queue has at least one unclaimed task.
 783          */
 784         final boolean isEmpty() {
 785             ForkJoinTask<?>[] a; int n, al, b;
 786             return ((n = (b = base) - top) >= 0 || // possibly one task
 787                     (n == -1 && ((a = array) == null ||
 788                                  (al = a.length) == 0 ||
 789                                  a[(al - 1) & b] == null)));
 790         }
 791 
 792 
 793         /**
 794          * Pushes a task. Call only by owner in unshared queues.
 795          *
 796          * @param task the task. Caller must ensure non-null.
 797          * @throws RejectedExecutionException if array cannot be resized
 798          */
 799         final void push(ForkJoinTask<?> task) {
 800             int s = top; ForkJoinTask<?>[] a; int al, d;
 801             if ((a = array) != null && (al = a.length) > 0) {
 802                 int index = (al - 1) & s;
 803                 ForkJoinPool p = pool;
 804                 top = s + 1;
 805                 QA.setRelease(a, index, task);
 806                 if ((d = base - s) == 0 && p != null) {
 807                     VarHandle.fullFence();
 808                     p.signalWork();
 809                 }
 810                 else if (d + al == 1)
 811                     growArray();
 812             }
 813         }
 814 
 815         /**
 816          * Initializes or doubles the capacity of array. Call either
 817          * by owner or with lock held -- it is OK for base, but not
 818          * top, to move while resizings are in progress.
 819          */
 820         final ForkJoinTask<?>[] growArray() {
 821             ForkJoinTask<?>[] oldA = array;
 822             int oldSize = oldA != null ? oldA.length : 0;
 823             int size = oldSize > 0 ? oldSize << 1 : INITIAL_QUEUE_CAPACITY;
 824             if (size < INITIAL_QUEUE_CAPACITY || size > MAXIMUM_QUEUE_CAPACITY)
 825                 throw new RejectedExecutionException("Queue capacity exceeded");
 826             int oldMask, t, b;
 827             ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size];
 828             if (oldA != null && (oldMask = oldSize - 1) > 0 &&
 829                 (t = top) - (b = base) > 0) {
 830                 int mask = size - 1;
 831                 do { // emulate poll from old array, push to new array
 832                     int index = b & oldMask;
 833                     ForkJoinTask<?> x = (ForkJoinTask<?>)
 834                         QA.getAcquire(oldA, index);
 835                     if (x != null &&
 836                         QA.compareAndSet(oldA, index, x, null))
 837                         a[b & mask] = x;
 838                 } while (++b != t);
 839                 VarHandle.releaseFence();
 840             }
 841             return a;
 842         }
 843 
 844         /**
 845          * Takes next task, if one exists, in LIFO order.  Call only
 846          * by owner in unshared queues.
 847          */
 848         final ForkJoinTask<?> pop() {
 849             int b = base, s = top, al, i; ForkJoinTask<?>[] a;
 850             if ((a = array) != null && b != s && (al = a.length) > 0) {
 851                 int index = (al - 1) & --s;
 852                 ForkJoinTask<?> t = (ForkJoinTask<?>)
 853                     QA.get(a, index);
 854                 if (t != null &&
 855                     QA.compareAndSet(a, index, t, null)) {
 856                     top = s;
 857                     VarHandle.releaseFence();
 858                     return t;
 859                 }
 860             }
 861             return null;
 862         }
 863 
 864         /**
 865          * Takes next task, if one exists, in FIFO order.
 866          */
 867         final ForkJoinTask<?> poll() {
 868             for (;;) {
 869                 int b = base, s = top, d, al; ForkJoinTask<?>[] a;
 870                 if ((a = array) != null && (d = b - s) < 0 &&
 871                     (al = a.length) > 0) {
 872                     int index = (al - 1) & b;
 873                     ForkJoinTask<?> t = (ForkJoinTask<?>)
 874                         QA.getAcquire(a, index);
 875                     if (b++ == base) {
 876                         if (t != null) {
 877                             if (QA.compareAndSet(a, index, t, null)) {
 878                                 base = b;
 879                                 return t;
 880                             }
 881                         }
 882                         else if (d == -1)
 883                             break; // now empty
 884                     }
 885                 }
 886                 else
 887                     break;
 888             }
 889             return null;
 890         }
 891 
 892         /**
 893          * Takes next task, if one exists, in order specified by mode.
 894          */
 895         final ForkJoinTask<?> nextLocalTask() {
 896             return ((id & FIFO) != 0) ? poll() : pop();
 897         }
 898 
 899         /**
 900          * Returns next task, if one exists, in order specified by mode.
 901          */
 902         final ForkJoinTask<?> peek() {
 903             int al; ForkJoinTask<?>[] a;
 904             return ((a = array) != null && (al = a.length) > 0) ?
 905                 a[(al - 1) &
 906                   ((id & FIFO) != 0 ? base : top - 1)] : null;
 907         }
 908 
 909         /**
 910          * Pops the given task only if it is at the current top.
 911          */
 912         final boolean tryUnpush(ForkJoinTask<?> task) {
 913             int b = base, s = top, al; ForkJoinTask<?>[] a;
 914             if ((a = array) != null && b != s && (al = a.length) > 0) {
 915                 int index = (al - 1) & --s;
 916                 if (QA.compareAndSet(a, index, task, null)) {
 917                     top = s;
 918                     VarHandle.releaseFence();
 919                     return true;
 920                 }
 921             }
 922             return false;
 923         }
 924 
 925         /**
 926          * Removes and cancels all known tasks, ignoring any exceptions.
 927          */
 928         final void cancelAll() {
 929             for (ForkJoinTask<?> t; (t = poll()) != null; )
 930                 ForkJoinTask.cancelIgnoringExceptions(t);
 931         }
 932 
 933         // Specialized execution methods
 934 
 935         /**
 936          * Pops and executes up to limit consecutive tasks or until empty.
 937          *
 938          * @param limit max runs, or zero for no limit
 939          */
 940         final void localPopAndExec(int limit) {
 941             for (;;) {
 942                 int b = base, s = top, al; ForkJoinTask<?>[] a;
 943                 if ((a = array) != null && b != s && (al = a.length) > 0) {
 944                     int index = (al - 1) & --s;
 945                     ForkJoinTask<?> t = (ForkJoinTask<?>)
 946                         QA.getAndSet(a, index, null);
 947                     if (t != null) {
 948                         top = s;
 949                         VarHandle.releaseFence();
 950                         t.doExec();
 951                         if (limit != 0 && --limit == 0)
 952                             break;
 953                     }
 954                     else
 955                         break;
 956                 }
 957                 else
 958                     break;
 959             }
 960         }
 961 
 962         /**
 963          * Polls and executes up to limit consecutive tasks or until empty.
 964          *
 965          * @param limit, or zero for no limit
 966          */
 967         final void localPollAndExec(int limit) {
 968             for (int polls = 0;;) {
 969                 int b = base, s = top, d, al; ForkJoinTask<?>[] a;
 970                 if ((a = array) != null && (d = b - s) < 0 &&
 971                     (al = a.length) > 0) {
 972                     int index = (al - 1) & b++;
 973                     ForkJoinTask<?> t = (ForkJoinTask<?>)
 974                         QA.getAndSet(a, index, null);
 975                     if (t != null) {
 976                         base = b;
 977                         t.doExec();
 978                         if (limit != 0 && ++polls == limit)
 979                             break;
 980                     }
 981                     else if (d == -1)
 982                         break;     // now empty
 983                     else
 984                         polls = 0; // stolen; reset
 985                 }
 986                 else
 987                     break;
 988             }
 989         }
 990 
 991         /**
 992          * If present, removes task from queue and executes it.
 993          */
 994         final void tryRemoveAndExec(ForkJoinTask<?> task) {
 995             ForkJoinTask<?>[] wa; int s, wal;
 996             if (base - (s = top) < 0 && // traverse from top
 997                 (wa = array) != null && (wal = wa.length) > 0) {
 998                 for (int m = wal - 1, ns = s - 1, i = ns; ; --i) {
 999                     int index = i & m;
1000                     ForkJoinTask<?> t = (ForkJoinTask<?>)
1001                         QA.get(wa, index);
1002                     if (t == null)
1003                         break;
1004                     else if (t == task) {
1005                         if (QA.compareAndSet(wa, index, t, null)) {
1006                             top = ns;   // safely shift down
1007                             for (int j = i; j != ns; ++j) {
1008                                 ForkJoinTask<?> f;
1009                                 int pindex = (j + 1) & m;
1010                                 f = (ForkJoinTask<?>)QA.get(wa, pindex);
1011                                 QA.setVolatile(wa, pindex, null);
1012                                 int jindex = j & m;
1013                                 QA.setRelease(wa, jindex, f);
1014                             }
1015                             VarHandle.releaseFence();
1016                             t.doExec();
1017                         }
1018                         break;
1019                     }
1020                 }
1021             }
1022         }
1023 
1024         /**
1025          * Tries to steal and run tasks within the target's
1026          * computation until done, not found, or limit exceeded.
1027          *
1028          * @param task root of CountedCompleter computation
1029          * @param limit max runs, or zero for no limit
1030          * @return task status on exit
1031          */
1032         final int localHelpCC(CountedCompleter<?> task, int limit) {
1033             int status = 0;
1034             if (task != null && (status = task.status) >= 0) {
1035                 for (;;) {
1036                     boolean help = false;
1037                     int b = base, s = top, al; ForkJoinTask<?>[] a;
1038                     if ((a = array) != null && b != s && (al = a.length) > 0) {
1039                         int index = (al - 1) & (s - 1);
1040                         ForkJoinTask<?> o = (ForkJoinTask<?>)
1041                             QA.get(a, index);
1042                         if (o instanceof CountedCompleter) {
1043                             CountedCompleter<?> t = (CountedCompleter<?>)o;
1044                             for (CountedCompleter<?> f = t;;) {
1045                                 if (f != task) {
1046                                     if ((f = f.completer) == null) // try parent
1047                                         break;
1048                                 }
1049                                 else {
1050                                     if (QA.compareAndSet(a, index, t, null)) {
1051                                         top = s - 1;
1052                                         VarHandle.releaseFence();
1053                                         t.doExec();
1054                                         help = true;
1055                                     }
1056                                     break;
1057                                 }
1058                             }
1059                         }
1060                     }
1061                     if ((status = task.status) < 0 || !help ||
1062                         (limit != 0 && --limit == 0))
1063                         break;
1064                 }
1065             }
1066             return status;
1067         }
1068 
1069         // Operations on shared queues
1070 
1071         /**
1072          * Tries to lock shared queue by CASing phase field.
1073          */
1074         final boolean tryLockSharedQueue() {
1075             return PHASE.compareAndSet(this, 0, QLOCK);
1076         }
1077 
1078         /**
1079          * Shared version of tryUnpush.
1080          */
1081         final boolean trySharedUnpush(ForkJoinTask<?> task) {
1082             boolean popped = false;
1083             int s = top - 1, al; ForkJoinTask<?>[] a;
1084             if ((a = array) != null && (al = a.length) > 0) {
1085                 int index = (al - 1) & s;
1086                 ForkJoinTask<?> t = (ForkJoinTask<?>) QA.get(a, index);
1087                 if (t == task &&
1088                     PHASE.compareAndSet(this, 0, QLOCK)) {
1089                     if (top == s + 1 && array == a &&
1090                         QA.compareAndSet(a, index, task, null)) {
1091                         popped = true;
1092                         top = s;
1093                     }
1094                     PHASE.setRelease(this, 0);
1095                 }
1096             }
1097             return popped;
1098         }
1099 
1100         /**
1101          * Shared version of localHelpCC.
1102          */
1103         final int sharedHelpCC(CountedCompleter<?> task, int limit) {
1104             int status = 0;
1105             if (task != null && (status = task.status) >= 0) {
1106                 for (;;) {
1107                     boolean help = false;
1108                     int b = base, s = top, al; ForkJoinTask<?>[] a;
1109                     if ((a = array) != null && b != s && (al = a.length) > 0) {
1110                         int index = (al - 1) & (s - 1);
1111                         ForkJoinTask<?> o = (ForkJoinTask<?>)
1112                             QA.get(a, index);
1113                         if (o instanceof CountedCompleter) {
1114                             CountedCompleter<?> t = (CountedCompleter<?>)o;
1115                             for (CountedCompleter<?> f = t;;) {
1116                                 if (f != task) {
1117                                     if ((f = f.completer) == null)
1118                                         break;
1119                                 }
1120                                 else {
1121                                     if (PHASE.compareAndSet(this, 0, QLOCK)) {
1122                                         if (top == s && array == a &&
1123                                             QA.compareAndSet(a, index, t, null)) {
1124                                             help = true;
1125                                             top = s - 1;
1126                                         }
1127                                         PHASE.setRelease(this, 0);
1128                                         if (help)
1129                                             t.doExec();
1130                                     }
1131                                     break;
1132                                 }
1133                             }
1134                         }
1135                     }
1136                     if ((status = task.status) < 0 || !help ||
1137                         (limit != 0 && --limit == 0))
1138                         break;
1139                 }
1140             }
1141             return status;
1142         }
1143 
1144         /**
1145          * Returns true if owned and not known to be blocked.
1146          */
1147         final boolean isApparentlyUnblocked() {
1148             Thread wt; Thread.State s;
1149             return ((wt = owner) != null &&
1150                     (s = wt.getState()) != Thread.State.BLOCKED &&
1151                     s != Thread.State.WAITING &&
1152                     s != Thread.State.TIMED_WAITING);
1153         }
1154 
1155         // VarHandle mechanics.
1156         private static final VarHandle PHASE;
1157         static {
1158             try {
1159                 MethodHandles.Lookup l = MethodHandles.lookup();
1160                 PHASE = l.findVarHandle(WorkQueue.class, "phase", int.class);
1161             } catch (ReflectiveOperationException e) {
1162                 throw new Error(e);
1163             }
1164         }
1165     }
1166 
1167     // static fields (initialized in static initializer below)
1168 
1169     /**
1170      * Creates a new ForkJoinWorkerThread. This factory is used unless
1171      * overridden in ForkJoinPool constructors.
1172      */
1173     public static final ForkJoinWorkerThreadFactory
1174         defaultForkJoinWorkerThreadFactory;
1175 
1176     /**
1177      * Permission required for callers of methods that may start or
1178      * kill threads.
1179      */
1180     static final RuntimePermission modifyThreadPermission;
1181 
1182     /**
1183      * Common (static) pool. Non-null for public use unless a static
1184      * construction exception, but internal usages null-check on use
1185      * to paranoically avoid potential initialization circularities
1186      * as well as to simplify generated code.
1187      */
1188     static final ForkJoinPool common;
1189 
1190     /**
1191      * Common pool parallelism. To allow simpler use and management
1192      * when common pool threads are disabled, we allow the underlying
1193      * common.parallelism field to be zero, but in that case still report
1194      * parallelism as 1 to reflect resulting caller-runs mechanics.
1195      */
1196     static final int COMMON_PARALLELISM;
1197 
1198     /**
1199      * Limit on spare thread construction in tryCompensate.
1200      */
1201     private static final int COMMON_MAX_SPARES;
1202 
1203     /**
1204      * Sequence number for creating workerNamePrefix.
1205      */
1206     private static int poolNumberSequence;
1207 
1208     /**
1209      * Returns the next sequence number. We don't expect this to
1210      * ever contend, so use simple builtin sync.
1211      */
1212     private static final synchronized int nextPoolId() {
1213         return ++poolNumberSequence;
1214     }
1215 
1216     // static configuration constants
1217 
1218     /**
1219      * Default idle timeout value (in milliseconds) for the thread
1220      * triggering quiescence to park waiting for new work
1221      */
1222     private static final long DEFAULT_KEEPALIVE = 60_000L;
1223 
1224     /**
1225      * Undershoot tolerance for idle timeouts
1226      */
1227     private static final long TIMEOUT_SLOP = 20L;
1228 
1229     /**
1230      * The default value for COMMON_MAX_SPARES.  Overridable using the
1231      * "java.util.concurrent.ForkJoinPool.common.maximumSpares" system
1232      * property.  The default value is far in excess of normal
1233      * requirements, but also far short of MAX_CAP and typical OS
1234      * thread limits, so allows JVMs to catch misuse/abuse before
1235      * running out of resources needed to do so.
1236      */
1237     private static final int DEFAULT_COMMON_MAX_SPARES = 256;
1238 
1239     /**
1240      * Increment for seed generators. See class ThreadLocal for
1241      * explanation.
1242      */
1243     private static final int SEED_INCREMENT = 0x9e3779b9;
1244 
1245     /*
1246      * Bits and masks for field ctl, packed with 4 16 bit subfields:
1247      * RC: Number of released (unqueued) workers minus target parallelism
1248      * TC: Number of total workers minus target parallelism
1249      * SS: version count and status of top waiting thread
1250      * ID: poolIndex of top of Treiber stack of waiters
1251      *
1252      * When convenient, we can extract the lower 32 stack top bits
1253      * (including version bits) as sp=(int)ctl.  The offsets of counts
1254      * by the target parallelism and the positionings of fields makes
1255      * it possible to perform the most common checks via sign tests of
1256      * fields: When ac is negative, there are not enough unqueued
1257      * workers, when tc is negative, there are not enough total
1258      * workers.  When sp is non-zero, there are waiting workers.  To
1259      * deal with possibly negative fields, we use casts in and out of
1260      * "short" and/or signed shifts to maintain signedness.
1261      *
1262      * Because it occupies uppermost bits, we can add one release count
1263      * using getAndAddLong of RC_UNIT, rather than CAS, when returning
1264      * from a blocked join.  Other updates entail multiple subfields
1265      * and masking, requiring CAS.
1266      *
1267      * The limits packed in field "bounds" are also offset by the
1268      * parallelism level to make them comparable to the ctl rc and tc
1269      * fields.
1270      */
1271 
1272     // Lower and upper word masks
1273     private static final long SP_MASK    = 0xffffffffL;
1274     private static final long UC_MASK    = ~SP_MASK;
1275 
1276     // Release counts
1277     private static final int  RC_SHIFT   = 48;
1278     private static final long RC_UNIT    = 0x0001L << RC_SHIFT;
1279     private static final long RC_MASK    = 0xffffL << RC_SHIFT;
1280 
1281     // Total counts
1282     private static final int  TC_SHIFT   = 32;
1283     private static final long TC_UNIT    = 0x0001L << TC_SHIFT;
1284     private static final long TC_MASK    = 0xffffL << TC_SHIFT;
1285     private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign
1286 
1287     // Instance fields
1288 
1289     volatile long stealCount;            // collects worker nsteals
1290     final long keepAlive;                // milliseconds before dropping if idle
1291     int indexSeed;                       // next worker index
1292     final int bounds;                    // min, max threads packed as shorts
1293     volatile int mode;                   // parallelism, runstate, queue mode
1294     WorkQueue[] workQueues;              // main registry
1295     final String workerNamePrefix;       // for worker thread string; sync lock
1296     final ForkJoinWorkerThreadFactory factory;
1297     final UncaughtExceptionHandler ueh;  // per-worker UEH
1298     final Predicate<? super ForkJoinPool> saturate;
1299 
1300     @jdk.internal.vm.annotation.Contended("fjpctl") // segregate
1301     volatile long ctl;                   // main pool control
1302 
1303     // Creating, registering and deregistering workers
1304 
1305     /**
1306      * Tries to construct and start one worker. Assumes that total
1307      * count has already been incremented as a reservation.  Invokes
1308      * deregisterWorker on any failure.
1309      *
1310      * @return true if successful
1311      */
1312     private boolean createWorker() {
1313         ForkJoinWorkerThreadFactory fac = factory;
1314         Throwable ex = null;
1315         ForkJoinWorkerThread wt = null;
1316         try {
1317             if (fac != null && (wt = fac.newThread(this)) != null) {
1318                 wt.start();
1319                 return true;
1320             }
1321         } catch (Throwable rex) {
1322             ex = rex;
1323         }
1324         deregisterWorker(wt, ex);
1325         return false;
1326     }
1327 
1328     /**
1329      * Tries to add one worker, incrementing ctl counts before doing
1330      * so, relying on createWorker to back out on failure.
1331      *
1332      * @param c incoming ctl value, with total count negative and no
1333      * idle workers.  On CAS failure, c is refreshed and retried if
1334      * this holds (otherwise, a new worker is not needed).
1335      */
1336     private void tryAddWorker(long c) {
1337         do {
1338             long nc = ((RC_MASK & (c + RC_UNIT)) |
1339                        (TC_MASK & (c + TC_UNIT)));
1340             if (ctl == c && CTL.compareAndSet(this, c, nc)) {
1341                 createWorker();
1342                 break;
1343             }
1344         } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
1345     }
1346 
1347     /**
1348      * Callback from ForkJoinWorkerThread constructor to establish and
1349      * record its WorkQueue.
1350      *
1351      * @param wt the worker thread
1352      * @return the worker's queue
1353      */
1354     final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
1355         UncaughtExceptionHandler handler;
1356         wt.setDaemon(true);                             // configure thread
1357         if ((handler = ueh) != null)
1358             wt.setUncaughtExceptionHandler(handler);
1359         WorkQueue w = new WorkQueue(this, wt);
1360         int tid = 0;                                    // for thread name
1361         int fifo = mode & FIFO;
1362         String prefix = workerNamePrefix;
1363         if (prefix != null) {
1364             synchronized (prefix) {
1365                 WorkQueue[] ws = workQueues; int n;
1366                 int s = indexSeed += SEED_INCREMENT;
1367                 if (ws != null && (n = ws.length) > 1) {
1368                     int m = n - 1;
1369                     tid = s & m;
1370                     int i = m & ((s << 1) | 1);         // odd-numbered indices
1371                     for (int probes = n >>> 1;;) {      // find empty slot
1372                         WorkQueue q;
1373                         if ((q = ws[i]) == null || q.phase == QUIET)
1374                             break;
1375                         else if (--probes == 0) {
1376                             i = n | 1;                  // resize below
1377                             break;
1378                         }
1379                         else
1380                             i = (i + 2) & m;
1381                     }
1382 
1383                     int id = i | fifo | (s & ~(SMASK | FIFO | DORMANT));
1384                     w.phase = w.id = id;                // now publishable
1385 
1386                     if (i < n)
1387                         ws[i] = w;
1388                     else {                              // expand array
1389                         int an = n << 1;
1390                         WorkQueue[] as = new WorkQueue[an];
1391                         as[i] = w;
1392                         int am = an - 1;
1393                         for (int j = 0; j < n; ++j) {
1394                             WorkQueue v;                // copy external queue
1395                             if ((v = ws[j]) != null)    // position may change
1396                                 as[v.id & am & SQMASK] = v;
1397                             if (++j >= n)
1398                                 break;
1399                             as[j] = ws[j];              // copy worker
1400                         }
1401                         workQueues = as;
1402                     }
1403                 }
1404             }
1405             wt.setName(prefix.concat(Integer.toString(tid)));
1406         }
1407         return w;
1408     }
1409 
1410     /**
1411      * Final callback from terminating worker, as well as upon failure
1412      * to construct or start a worker.  Removes record of worker from
1413      * array, and adjusts counts. If pool is shutting down, tries to
1414      * complete termination.
1415      *
1416      * @param wt the worker thread, or null if construction failed
1417      * @param ex the exception causing failure, or null if none
1418      */
1419     final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
1420         WorkQueue w = null;
1421         int phase = 0;
1422         if (wt != null && (w = wt.workQueue) != null) {
1423             Object lock = workerNamePrefix;
1424             long ns = (long)w.nsteals & 0xffffffffL;
1425             int idx = w.id & SMASK;
1426             if (lock != null) {
1427                 WorkQueue[] ws;                       // remove index from array
1428                 synchronized (lock) {
1429                     if ((ws = workQueues) != null && ws.length > idx &&
1430                         ws[idx] == w)
1431                         ws[idx] = null;
1432                     stealCount += ns;
1433                 }
1434             }
1435             phase = w.phase;
1436         }
1437         if (phase != QUIET) {                         // else pre-adjusted
1438             long c;                                   // decrement counts
1439             do {} while (!CTL.weakCompareAndSet
1440                          (this, c = ctl, ((RC_MASK & (c - RC_UNIT)) |
1441                                           (TC_MASK & (c - TC_UNIT)) |
1442                                           (SP_MASK & c))));
1443         }
1444         if (w != null)
1445             w.cancelAll();                            // cancel remaining tasks
1446 
1447         if (!tryTerminate(false, false) &&            // possibly replace worker
1448             w != null && w.array != null)             // avoid repeated failures
1449             signalWork();
1450 
1451         if (ex == null)                               // help clean on way out
1452             ForkJoinTask.helpExpungeStaleExceptions();
1453         else                                          // rethrow
1454             ForkJoinTask.rethrow(ex);
1455     }
1456 
1457     /**
1458      * Tries to create or release a worker if too few are running.
1459      */
1460     final void signalWork() {
1461         for (;;) {
1462             long c; int sp; WorkQueue[] ws; int i; WorkQueue v;
1463             if ((c = ctl) >= 0L)                      // enough workers
1464                 break;
1465             else if ((sp = (int)c) == 0) {            // no idle workers
1466                 if ((c & ADD_WORKER) != 0L)           // too few workers
1467                     tryAddWorker(c);
1468                 break;
1469             }
1470             else if ((ws = workQueues) == null)
1471                 break;                                // unstarted/terminated
1472             else if (ws.length <= (i = sp & SMASK))
1473                 break;                                // terminated
1474             else if ((v = ws[i]) == null)
1475                 break;                                // terminating
1476             else {
1477                 int np = sp & ~UNSIGNALLED;
1478                 int vp = v.phase;
1479                 long nc = (v.stackPred & SP_MASK) | (UC_MASK & (c + RC_UNIT));
1480                 Thread vt = v.owner;
1481                 if (sp == vp && CTL.compareAndSet(this, c, nc)) {
1482                     v.phase = np;
1483                     if (v.source < 0)
1484                         LockSupport.unpark(vt);
1485                     break;
1486                 }
1487             }
1488         }
1489     }
1490 
1491     /**
1492      * Tries to decrement counts (sometimes implicitly) and possibly
1493      * arrange for a compensating worker in preparation for blocking:
1494      * If not all core workers yet exist, creates one, else if any are
1495      * unreleased (possibly including caller) releases one, else if
1496      * fewer than the minimum allowed number of workers running,
1497      * checks to see that they are all active, and if so creates an
1498      * extra worker unless over maximum limit and policy is to
1499      * saturate.  Most of these steps can fail due to interference, in
1500      * which case 0 is returned so caller will retry. A negative
1501      * return value indicates that the caller doesn't need to
1502      * re-adjust counts when later unblocked.
1503      *
1504      * @return 1: block then adjust, -1: block without adjust, 0 : retry
1505      */
1506     private int tryCompensate(WorkQueue w) {
1507         int t, n, sp;
1508         long c = ctl;
1509         WorkQueue[] ws = workQueues;
1510         if ((t = (short)(c >>> TC_SHIFT)) >= 0) {
1511             if (ws == null || (n = ws.length) <= 0 || w == null)
1512                 return 0;                        // disabled
1513             else if ((sp = (int)c) != 0) {       // replace or release
1514                 WorkQueue v = ws[sp & (n - 1)];
1515                 int wp = w.phase;
1516                 long uc = UC_MASK & ((wp < 0) ? c + RC_UNIT : c);
1517                 int np = sp & ~UNSIGNALLED;
1518                 if (v != null) {
1519                     int vp = v.phase;
1520                     Thread vt = v.owner;
1521                     long nc = ((long)v.stackPred & SP_MASK) | uc;
1522                     if (vp == sp && CTL.compareAndSet(this, c, nc)) {
1523                         v.phase = np;
1524                         if (v.source < 0)
1525                             LockSupport.unpark(vt);
1526                         return (wp < 0) ? -1 : 1;
1527                     }
1528                 }
1529                 return 0;
1530             }
1531             else if ((int)(c >> RC_SHIFT) -      // reduce parallelism
1532                      (short)(bounds & SMASK) > 0) {
1533                 long nc = ((RC_MASK & (c - RC_UNIT)) | (~RC_MASK & c));
1534                 return CTL.compareAndSet(this, c, nc) ? 1 : 0;
1535             }
1536             else {                               // validate
1537                 int md = mode, pc = md & SMASK, tc = pc + t, bc = 0;
1538                 boolean unstable = false;
1539                 for (int i = 1; i < n; i += 2) {
1540                     WorkQueue q; Thread wt; Thread.State ts;
1541                     if ((q = ws[i]) != null) {
1542                         if (q.source == 0) {
1543                             unstable = true;
1544                             break;
1545                         }
1546                         else {
1547                             --tc;
1548                             if ((wt = q.owner) != null &&
1549                                 ((ts = wt.getState()) == Thread.State.BLOCKED ||
1550                                  ts == Thread.State.WAITING))
1551                                 ++bc;            // worker is blocking
1552                         }
1553                     }
1554                 }
1555                 if (unstable || tc != 0 || ctl != c)
1556                     return 0;                    // inconsistent
1557                 else if (t + pc >= MAX_CAP || t >= (bounds >>> SWIDTH)) {
1558                     Predicate<? super ForkJoinPool> sat;
1559                     if ((sat = saturate) != null && sat.test(this))
1560                         return -1;
1561                     else if (bc < pc) {          // lagging
1562                         Thread.yield();          // for retry spins
1563                         return 0;
1564                     }
1565                     else
1566                         throw new RejectedExecutionException(
1567                             "Thread limit exceeded replacing blocked worker");
1568                 }
1569             }
1570         }
1571 
1572         long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK); // expand pool
1573         return CTL.compareAndSet(this, c, nc) && createWorker() ? 1 : 0;
1574     }
1575 
1576     /**
1577      * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
1578      * See above for explanation.
1579      */
1580     final void runWorker(WorkQueue w) {
1581         WorkQueue[] ws;
1582         w.growArray();                                  // allocate queue
1583         int r = w.id ^ ThreadLocalRandom.nextSecondarySeed();
1584         if (r == 0)                                     // initial nonzero seed
1585             r = 1;
1586         int lastSignalId = 0;                           // avoid unneeded signals
1587         while ((ws = workQueues) != null) {
1588             boolean nonempty = false;                   // scan
1589             for (int n = ws.length, j = n, m = n - 1; j > 0; --j) {
1590                 WorkQueue q; int i, b, al; ForkJoinTask<?>[] a;
1591                 if ((i = r & m) >= 0 && i < n &&        // always true
1592                     (q = ws[i]) != null && (b = q.base) - q.top < 0 &&
1593                     (a = q.array) != null && (al = a.length) > 0) {
1594                     int qid = q.id;                     // (never zero)
1595                     int index = (al - 1) & b;
1596                     ForkJoinTask<?> t = (ForkJoinTask<?>)
1597                         QA.getAcquire(a, index);
1598                     if (t != null && b++ == q.base &&
1599                         QA.compareAndSet(a, index, t, null)) {
1600                         if ((q.base = b) - q.top < 0 && qid != lastSignalId)
1601                             signalWork();               // propagate signal
1602                         w.source = lastSignalId = qid;
1603                         t.doExec();
1604                         if ((w.id & FIFO) != 0)         // run remaining locals
1605                             w.localPollAndExec(POLL_LIMIT);
1606                         else
1607                             w.localPopAndExec(POLL_LIMIT);
1608                         ForkJoinWorkerThread thread = w.owner;
1609                         ++w.nsteals;
1610                         w.source = 0;                   // now idle
1611                         if (thread != null)
1612                             thread.afterTopLevelExec();
1613                     }
1614                     nonempty = true;
1615                 }
1616                 else if (nonempty)
1617                     break;
1618                 else
1619                     ++r;
1620             }
1621 
1622             if (nonempty) {                             // move (xorshift)
1623                 r ^= r << 13; r ^= r >>> 17; r ^= r << 5;
1624             }
1625             else {
1626                 int phase;
1627                 lastSignalId = 0;                       // clear for next scan
1628                 if ((phase = w.phase) >= 0) {           // enqueue
1629                     int np = w.phase = (phase + SS_SEQ) | UNSIGNALLED;
1630                     long c, nc;
1631                     do {
1632                         w.stackPred = (int)(c = ctl);
1633                         nc = ((c - RC_UNIT) & UC_MASK) | (SP_MASK & np);
1634                     } while (!CTL.weakCompareAndSet(this, c, nc));
1635                 }
1636                 else {                                  // already queued
1637                     int pred = w.stackPred;
1638                     w.source = DORMANT;                 // enable signal
1639                     for (int steps = 0;;) {
1640                         int md, rc; long c;
1641                         if (w.phase >= 0) {
1642                             w.source = 0;
1643                             break;
1644                         }
1645                         else if ((md = mode) < 0)       // shutting down
1646                             return;
1647                         else if ((rc = ((md & SMASK) +  // possibly quiescent
1648                                         (int)((c = ctl) >> RC_SHIFT))) <= 0 &&
1649                                  (md & SHUTDOWN) != 0 &&
1650                                  tryTerminate(false, false))
1651                             return;                     // help terminate
1652                         else if ((++steps & 1) == 0)
1653                             Thread.interrupted();       // clear between parks
1654                         else if (rc <= 0 && pred != 0 && phase == (int)c) {
1655                             long d = keepAlive + System.currentTimeMillis();
1656                             LockSupport.parkUntil(this, d);
1657                             if (ctl == c &&
1658                                 d - System.currentTimeMillis() <= TIMEOUT_SLOP) {
1659                                 long nc = ((UC_MASK & (c - TC_UNIT)) |
1660                                            (SP_MASK & pred));
1661                                 if (CTL.compareAndSet(this, c, nc)) {
1662                                     w.phase = QUIET;
1663                                     return;             // drop on timeout
1664                                 }
1665                             }
1666                         }
1667                         else
1668                             LockSupport.park(this);
1669                     }
1670                 }
1671             }
1672         }
1673     }
1674 
1675     /**
1676      * Helps and/or blocks until the given task is done or timeout.
1677      * First tries locally helping, then scans other queues for a task
1678      * produced by one of w's stealers; compensating and blocking if
1679      * none are found (rescanning if tryCompensate fails).
1680      *
1681      * @param w caller
1682      * @param task the task
1683      * @param deadline for timed waits, if nonzero
1684      * @return task status on exit
1685      */
1686     final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
1687         int s = 0;
1688         if (w != null && task != null &&
1689             (!(task instanceof CountedCompleter) ||
1690              (s = w.localHelpCC((CountedCompleter<?>)task, 0)) >= 0)) {
1691             w.tryRemoveAndExec(task);
1692             int src = w.source, id = w.id;
1693             s = task.status;
1694             while (s >= 0) {
1695                 WorkQueue[] ws;
1696                 boolean nonempty = false;
1697                 int r = ThreadLocalRandom.nextSecondarySeed() | 1; // odd indices
1698                 if ((ws = workQueues) != null) {       // scan for matching id
1699                     for (int n = ws.length, m = n - 1, j = -n; j < n; j += 2) {
1700                         WorkQueue q; int i, b, al; ForkJoinTask<?>[] a;
1701                         if ((i = (r + j) & m) >= 0 && i < n &&
1702                             (q = ws[i]) != null && q.source == id &&
1703                             (b = q.base) - q.top < 0 &&
1704                             (a = q.array) != null && (al = a.length) > 0) {
1705                             int qid = q.id;
1706                             int index = (al - 1) & b;
1707                             ForkJoinTask<?> t = (ForkJoinTask<?>)
1708                                 QA.getAcquire(a, index);
1709                             if (t != null && b++ == q.base && id == q.source &&
1710                                 QA.compareAndSet(a, index, t, null)) {
1711                                 q.base = b;
1712                                 w.source = qid;
1713                                 t.doExec();
1714                                 w.source = src;
1715                             }
1716                             nonempty = true;
1717                             break;
1718                         }
1719                     }
1720                 }
1721                 if ((s = task.status) < 0)
1722                     break;
1723                 else if (!nonempty) {
1724                     long ms, ns; int block;
1725                     if (deadline == 0L)
1726                         ms = 0L;                       // untimed
1727                     else if ((ns = deadline - System.nanoTime()) <= 0L)
1728                         break;                         // timeout
1729                     else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
1730                         ms = 1L;                       // avoid 0 for timed wait
1731                     if ((block = tryCompensate(w)) != 0) {
1732                         task.internalWait(ms);
1733                         CTL.getAndAdd(this, (block > 0) ? RC_UNIT : 0L);
1734                     }
1735                     s = task.status;
1736                 }
1737             }
1738         }
1739         return s;
1740     }
1741 
1742     /**
1743      * Runs tasks until {@code isQuiescent()}. Rather than blocking
1744      * when tasks cannot be found, rescans until all others cannot
1745      * find tasks either.
1746      */
1747     final void helpQuiescePool(WorkQueue w) {
1748         int prevSrc = w.source, fifo = w.id & FIFO;
1749         for (int source = prevSrc, released = -1;;) { // -1 until known
1750             WorkQueue[] ws;
1751             if (fifo != 0)
1752                 w.localPollAndExec(0);
1753             else
1754                 w.localPopAndExec(0);
1755             if (released == -1 && w.phase >= 0)
1756                 released = 1;
1757             boolean quiet = true, empty = true;
1758             int r = ThreadLocalRandom.nextSecondarySeed();
1759             if ((ws = workQueues) != null) {
1760                 for (int n = ws.length, j = n, m = n - 1; j > 0; --j) {
1761                     WorkQueue q; int i, b, al; ForkJoinTask<?>[] a;
1762                     if ((i = (r - j) & m) >= 0 && i < n && (q = ws[i]) != null) {
1763                         if ((b = q.base) - q.top < 0 &&
1764                             (a = q.array) != null && (al = a.length) > 0) {
1765                             int qid = q.id;
1766                             if (released == 0) {    // increment
1767                                 released = 1;
1768                                 CTL.getAndAdd(this, RC_UNIT);
1769                             }
1770                             int index = (al - 1) & b;
1771                             ForkJoinTask<?> t = (ForkJoinTask<?>)
1772                                 QA.getAcquire(a, index);
1773                             if (t != null && b++ == q.base &&
1774                                 QA.compareAndSet(a, index, t, null)) {
1775                                 q.base = b;
1776                                 w.source = source = q.id;
1777                                 t.doExec();
1778                                 w.source = source = prevSrc;
1779                             }
1780                             quiet = empty = false;
1781                             break;
1782                         }
1783                         else if ((q.source & QUIET) == 0)
1784                             quiet = false;
1785                     }
1786                 }
1787             }
1788             if (quiet) {
1789                 if (released == 0)
1790                     CTL.getAndAdd(this, RC_UNIT);
1791                 w.source = prevSrc;
1792                 break;
1793             }
1794             else if (empty) {
1795                 if (source != QUIET)
1796                     w.source = source = QUIET;
1797                 if (released == 1) {                 // decrement
1798                     released = 0;
1799                     CTL.getAndAdd(this, RC_MASK & -RC_UNIT);
1800                 }
1801             }
1802         }
1803     }
1804 
1805     /**
1806      * Scans for and returns a polled task, if available.
1807      * Used only for untracked polls.
1808      *
1809      * @param submissionsOnly if true, only scan submission queues
1810      */
1811     private ForkJoinTask<?> pollScan(boolean submissionsOnly) {
1812         WorkQueue[] ws; int n;
1813         rescan: while ((mode & STOP) == 0 && (ws = workQueues) != null &&
1814                       (n = ws.length) > 0) {
1815             int m = n - 1;
1816             int r = ThreadLocalRandom.nextSecondarySeed();
1817             int h = r >>> 16;
1818             int origin, step;
1819             if (submissionsOnly) {
1820                 origin = (r & ~1) & m;         // even indices and steps
1821                 step = (h & ~1) | 2;
1822             }
1823             else {
1824                 origin = r & m;
1825                 step = h | 1;
1826             }
1827             for (int k = origin, oldSum = 0, checkSum = 0;;) {
1828                 WorkQueue q; int b, al; ForkJoinTask<?>[] a;
1829                 if ((q = ws[k]) != null) {
1830                     checkSum += b = q.base;
1831                     if (b - q.top < 0 &&
1832                         (a = q.array) != null && (al = a.length) > 0) {
1833                         int index = (al - 1) & b;
1834                         ForkJoinTask<?> t = (ForkJoinTask<?>)
1835                             QA.getAcquire(a, index);
1836                         if (t != null && b++ == q.base &&
1837                             QA.compareAndSet(a, index, t, null)) {
1838                             q.base = b;
1839                             return t;
1840                         }
1841                         else
1842                             break; // restart
1843                     }
1844                 }
1845                 if ((k = (k + step) & m) == origin) {
1846                     if (oldSum == (oldSum = checkSum))
1847                         break rescan;
1848                     checkSum = 0;
1849                 }
1850             }
1851         }
1852         return null;
1853     }
1854 
1855     /**
1856      * Gets and removes a local or stolen task for the given worker.
1857      *
1858      * @return a task, if available
1859      */
1860     final ForkJoinTask<?> nextTaskFor(WorkQueue w) {
1861         ForkJoinTask<?> t;
1862         if (w != null &&
1863             (t = (w.id & FIFO) != 0 ? w.poll() : w.pop()) != null)
1864             return t;
1865         else
1866             return pollScan(false);
1867     }
1868 
1869     // External operations
1870 
1871     /**
1872      * Adds the given task to a submission queue at submitter's
1873      * current queue, creating one if null or contended.
1874      *
1875      * @param task the task. Caller must ensure non-null.
1876      */
1877     final void externalPush(ForkJoinTask<?> task) {
1878         int r;                                // initialize caller's probe
1879         if ((r = ThreadLocalRandom.getProbe()) == 0) {
1880             ThreadLocalRandom.localInit();
1881             r = ThreadLocalRandom.getProbe();
1882         }
1883         for (;;) {
1884             int md = mode, n;
1885             WorkQueue[] ws = workQueues;
1886             if ((md & SHUTDOWN) != 0 || ws == null || (n = ws.length) <= 0)
1887                 throw new RejectedExecutionException();
1888             else {
1889                 WorkQueue q;
1890                 boolean push = false, grow = false;
1891                 if ((q = ws[(n - 1) & r & SQMASK]) == null) {
1892                     Object lock = workerNamePrefix;
1893                     int qid = (r | QUIET) & ~(FIFO | OWNED);
1894                     q = new WorkQueue(this, null);
1895                     q.id = qid;
1896                     q.source = QUIET;
1897                     q.phase = QLOCK;          // lock queue
1898                     if (lock != null) {
1899                         synchronized (lock) { // lock pool to install
1900                             int i;
1901                             if ((ws = workQueues) != null &&
1902                                 (n = ws.length) > 0 &&
1903                                 ws[i = qid & (n - 1) & SQMASK] == null) {
1904                                 ws[i] = q;
1905                                 push = grow = true;
1906                             }
1907                         }
1908                     }
1909                 }
1910                 else if (q.tryLockSharedQueue()) {
1911                     int b = q.base, s = q.top, al, d; ForkJoinTask<?>[] a;
1912                     if ((a = q.array) != null && (al = a.length) > 0 &&
1913                         al - 1 + (d = b - s) > 0) {
1914                         a[(al - 1) & s] = task;
1915                         q.top = s + 1;        // relaxed writes OK here
1916                         q.phase = 0;
1917                         if (d < 0 && q.base - s < -1)
1918                             break;            // no signal needed
1919                     }
1920                     else
1921                         grow = true;
1922                     push = true;
1923                 }
1924                 if (push) {
1925                     if (grow) {
1926                         try {
1927                             q.growArray();
1928                             int s = q.top, al; ForkJoinTask<?>[] a;
1929                             if ((a = q.array) != null && (al = a.length) > 0) {
1930                                 a[(al - 1) & s] = task;
1931                                 q.top = s + 1;
1932                             }
1933                         } finally {
1934                             q.phase = 0;
1935                         }
1936                     }
1937                     signalWork();
1938                     break;
1939                 }
1940                 else                          // move if busy
1941                     r = ThreadLocalRandom.advanceProbe(r);
1942             }
1943         }
1944     }
1945 
1946     /**
1947      * Pushes a possibly-external submission.
1948      */
1949     private <T> ForkJoinTask<T> externalSubmit(ForkJoinTask<T> task) {
1950         Thread t; ForkJoinWorkerThread w; WorkQueue q;
1951         if (task == null)
1952             throw new NullPointerException();
1953         if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
1954             (w = (ForkJoinWorkerThread)t).pool == this &&
1955             (q = w.workQueue) != null)
1956             q.push(task);
1957         else
1958             externalPush(task);
1959         return task;
1960     }
1961 
1962     /**
1963      * Returns common pool queue for an external thread.
1964      */
1965     static WorkQueue commonSubmitterQueue() {
1966         ForkJoinPool p = common;
1967         int r = ThreadLocalRandom.getProbe();
1968         WorkQueue[] ws; int n;
1969         return (p != null && (ws = p.workQueues) != null &&
1970                 (n = ws.length) > 0) ?
1971             ws[(n - 1) & r & SQMASK] : null;
1972     }
1973 
1974     /**
1975      * Performs tryUnpush for an external submitter.
1976      */
1977     final boolean tryExternalUnpush(ForkJoinTask<?> task) {
1978         int r = ThreadLocalRandom.getProbe();
1979         WorkQueue[] ws; WorkQueue w; int n;
1980         return ((ws = workQueues) != null &&
1981                 (n = ws.length) > 0 &&
1982                 (w = ws[(n - 1) & r & SQMASK]) != null &&
1983                 w.trySharedUnpush(task));
1984     }
1985 
1986     /**
1987      * Performs helpComplete for an external submitter.
1988      */
1989     final int externalHelpComplete(CountedCompleter<?> task, int maxTasks) {
1990         int r = ThreadLocalRandom.getProbe();
1991         WorkQueue[] ws; WorkQueue w; int n;
1992         return ((ws = workQueues) != null && (n = ws.length) > 0 &&
1993                 (w = ws[(n - 1) & r & SQMASK]) != null) ?
1994             w.sharedHelpCC(task, maxTasks) : 0;
1995     }
1996 
1997     /**
1998      * Tries to steal and run tasks within the target's computation.
1999      * The maxTasks argument supports external usages; internal calls
2000      * use zero, allowing unbounded steps (external calls trap
2001      * non-positive values).
2002      *
2003      * @param w caller
2004      * @param maxTasks if non-zero, the maximum number of other tasks to run
2005      * @return task status on exit
2006      */
2007     final int helpComplete(WorkQueue w, CountedCompleter<?> task,
2008                            int maxTasks) {
2009         return (w == null) ? 0 : w.localHelpCC(task, maxTasks);
2010     }
2011 
2012     /**
2013      * Returns a cheap heuristic guide for task partitioning when
2014      * programmers, frameworks, tools, or languages have little or no
2015      * idea about task granularity.  In essence, by offering this
2016      * method, we ask users only about tradeoffs in overhead vs
2017      * expected throughput and its variance, rather than how finely to
2018      * partition tasks.
2019      *
2020      * In a steady state strict (tree-structured) computation, each
2021      * thread makes available for stealing enough tasks for other
2022      * threads to remain active. Inductively, if all threads play by
2023      * the same rules, each thread should make available only a
2024      * constant number of tasks.
2025      *
2026      * The minimum useful constant is just 1. But using a value of 1
2027      * would require immediate replenishment upon each steal to
2028      * maintain enough tasks, which is infeasible.  Further,
2029      * partitionings/granularities of offered tasks should minimize
2030      * steal rates, which in general means that threads nearer the top
2031      * of computation tree should generate more than those nearer the
2032      * bottom. In perfect steady state, each thread is at
2033      * approximately the same level of computation tree. However,
2034      * producing extra tasks amortizes the uncertainty of progress and
2035      * diffusion assumptions.
2036      *
2037      * So, users will want to use values larger (but not much larger)
2038      * than 1 to both smooth over transient shortages and hedge
2039      * against uneven progress; as traded off against the cost of
2040      * extra task overhead. We leave the user to pick a threshold
2041      * value to compare with the results of this call to guide
2042      * decisions, but recommend values such as 3.
2043      *
2044      * When all threads are active, it is on average OK to estimate
2045      * surplus strictly locally. In steady-state, if one thread is
2046      * maintaining say 2 surplus tasks, then so are others. So we can
2047      * just use estimated queue length.  However, this strategy alone
2048      * leads to serious mis-estimates in some non-steady-state
2049      * conditions (ramp-up, ramp-down, other stalls). We can detect
2050      * many of these by further considering the number of "idle"
2051      * threads, that are known to have zero queued tasks, so
2052      * compensate by a factor of (#idle/#active) threads.
2053      */
2054     static int getSurplusQueuedTaskCount() {
2055         Thread t; ForkJoinWorkerThread wt; ForkJoinPool pool; WorkQueue q;
2056         if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
2057             (pool = (wt = (ForkJoinWorkerThread)t).pool) != null &&
2058             (q = wt.workQueue) != null) {
2059             int p = pool.mode & SMASK;
2060             int a = p + (int)(pool.ctl >> RC_SHIFT);
2061             int n = q.top - q.base;
2062             return n - (a > (p >>>= 1) ? 0 :
2063                         a > (p >>>= 1) ? 1 :
2064                         a > (p >>>= 1) ? 2 :
2065                         a > (p >>>= 1) ? 4 :
2066                         8);
2067         }
2068         return 0;
2069     }
2070 
2071     // Termination
2072 
2073     /**
2074      * Possibly initiates and/or completes termination.
2075      *
2076      * @param now if true, unconditionally terminate, else only
2077      * if no work and no active workers
2078      * @param enable if true, terminate when next possible
2079      * @return true if terminating or terminated
2080      */
2081     private boolean tryTerminate(boolean now, boolean enable) {
2082         int md; // 3 phases: try to set SHUTDOWN, then STOP, then TERMINATED
2083 
2084         while (((md = mode) & SHUTDOWN) == 0) {
2085             if (!enable || this == common)        // cannot shutdown
2086                 return false;
2087             else
2088                 MODE.compareAndSet(this, md, md | SHUTDOWN);
2089         }
2090 
2091         while (((md = mode) & STOP) == 0) {       // try to initiate termination
2092             if (!now) {                           // check if quiescent & empty
2093                 for (long oldSum = 0L;;) {        // repeat until stable
2094                     boolean running = false;
2095                     long checkSum = ctl;
2096                     WorkQueue[] ws = workQueues;
2097                     if ((md & SMASK) + (int)(checkSum >> RC_SHIFT) > 0)
2098                         running = true;
2099                     else if (ws != null) {
2100                         WorkQueue w; int b;
2101                         for (int i = 0; i < ws.length; ++i) {
2102                             if ((w = ws[i]) != null) {
2103                                 checkSum += (b = w.base) + w.id;
2104                                 if (b != w.top ||
2105                                     ((i & 1) == 1 && w.source >= 0)) {
2106                                     running = true;
2107                                     break;
2108                                 }
2109                             }
2110                         }
2111                     }
2112                     if (((md = mode) & STOP) != 0)
2113                         break;                 // already triggered
2114                     else if (running)
2115                         return false;
2116                     else if (workQueues == ws && oldSum == (oldSum = checkSum))
2117                         break;
2118                 }
2119             }
2120             if ((md & STOP) == 0)
2121                 MODE.compareAndSet(this, md, md | STOP);
2122         }
2123 
2124         while (((md = mode) & TERMINATED) == 0) { // help terminate others
2125             for (long oldSum = 0L;;) {            // repeat until stable
2126                 WorkQueue[] ws; WorkQueue w;
2127                 long checkSum = ctl;
2128                 if ((ws = workQueues) != null) {
2129                     for (int i = 0; i < ws.length; ++i) {
2130                         if ((w = ws[i]) != null) {
2131                             ForkJoinWorkerThread wt = w.owner;
2132                             w.cancelAll();        // clear queues
2133                             if (wt != null) {
2134                                 try {             // unblock join or park
2135                                     wt.interrupt();
2136                                 } catch (Throwable ignore) {
2137                                 }
2138                             }
2139                             checkSum += w.base + w.id;
2140                         }
2141                     }
2142                 }
2143                 if (((md = mode) & TERMINATED) != 0 ||
2144                     (workQueues == ws && oldSum == (oldSum = checkSum)))
2145                     break;
2146             }
2147             if ((md & TERMINATED) != 0)
2148                 break;
2149             else if ((md & SMASK) + (short)(ctl >>> TC_SHIFT) > 0)
2150                 break;
2151             else if (MODE.compareAndSet(this, md, md | TERMINATED)) {
2152                 synchronized (this) {
2153                     notifyAll();                  // for awaitTermination
2154                 }
2155                 break;
2156             }
2157         }
2158         return true;
2159     }
2160 
2161     // Exported methods
2162 
2163     // Constructors
2164 
2165     /**
2166      * Creates a {@code ForkJoinPool} with parallelism equal to {@link
2167      * java.lang.Runtime#availableProcessors}, using defaults for all
2168      * other parameters (see {@link #ForkJoinPool(int,
2169      * ForkJoinWorkerThreadFactory, UncaughtExceptionHandler, boolean,
2170      * int, int, int, Predicate, long, TimeUnit)}).
2171      *
2172      * @throws SecurityException if a security manager exists and
2173      *         the caller is not permitted to modify threads
2174      *         because it does not hold {@link
2175      *         java.lang.RuntimePermission}{@code ("modifyThread")}
2176      */
2177     public ForkJoinPool() {
2178         this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
2179              defaultForkJoinWorkerThreadFactory, null, false,
2180              0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
2181     }
2182 
2183     /**
2184      * Creates a {@code ForkJoinPool} with the indicated parallelism
2185      * level, using defaults for all other parameters (see {@link
2186      * #ForkJoinPool(int, ForkJoinWorkerThreadFactory,
2187      * UncaughtExceptionHandler, boolean, int, int, int, Predicate,
2188      * long, TimeUnit)}).
2189      *
2190      * @param parallelism the parallelism level
2191      * @throws IllegalArgumentException if parallelism less than or
2192      *         equal to zero, or greater than implementation limit
2193      * @throws SecurityException if a security manager exists and
2194      *         the caller is not permitted to modify threads
2195      *         because it does not hold {@link
2196      *         java.lang.RuntimePermission}{@code ("modifyThread")}
2197      */
2198     public ForkJoinPool(int parallelism) {
2199         this(parallelism, defaultForkJoinWorkerThreadFactory, null, false,
2200              0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
2201     }
2202 
2203     /**
2204      * Creates a {@code ForkJoinPool} with the given parameters (using
2205      * defaults for others -- see {@link #ForkJoinPool(int,
2206      * ForkJoinWorkerThreadFactory, UncaughtExceptionHandler, boolean,
2207      * int, int, int, Predicate, long, TimeUnit)}).
2208      *
2209      * @param parallelism the parallelism level. For default value,
2210      * use {@link java.lang.Runtime#availableProcessors}.
2211      * @param factory the factory for creating new threads. For default value,
2212      * use {@link #defaultForkJoinWorkerThreadFactory}.
2213      * @param handler the handler for internal worker threads that
2214      * terminate due to unrecoverable errors encountered while executing
2215      * tasks. For default value, use {@code null}.
2216      * @param asyncMode if true,
2217      * establishes local first-in-first-out scheduling mode for forked
2218      * tasks that are never joined. This mode may be more appropriate
2219      * than default locally stack-based mode in applications in which
2220      * worker threads only process event-style asynchronous tasks.
2221      * For default value, use {@code false}.
2222      * @throws IllegalArgumentException if parallelism less than or
2223      *         equal to zero, or greater than implementation limit
2224      * @throws NullPointerException if the factory is null
2225      * @throws SecurityException if a security manager exists and
2226      *         the caller is not permitted to modify threads
2227      *         because it does not hold {@link
2228      *         java.lang.RuntimePermission}{@code ("modifyThread")}
2229      */
2230     public ForkJoinPool(int parallelism,
2231                         ForkJoinWorkerThreadFactory factory,
2232                         UncaughtExceptionHandler handler,
2233                         boolean asyncMode) {
2234         this(parallelism, factory, handler, asyncMode,
2235              0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
2236     }
2237 
2238     /**
2239      * Creates a {@code ForkJoinPool} with the given parameters.
2240      *
2241      * @param parallelism the parallelism level. For default value,
2242      * use {@link java.lang.Runtime#availableProcessors}.
2243      *
2244      * @param factory the factory for creating new threads. For
2245      * default value, use {@link #defaultForkJoinWorkerThreadFactory}.
2246      *
2247      * @param handler the handler for internal worker threads that
2248      * terminate due to unrecoverable errors encountered while
2249      * executing tasks. For default value, use {@code null}.
2250      *
2251      * @param asyncMode if true, establishes local first-in-first-out
2252      * scheduling mode for forked tasks that are never joined. This
2253      * mode may be more appropriate than default locally stack-based
2254      * mode in applications in which worker threads only process
2255      * event-style asynchronous tasks.  For default value, use {@code
2256      * false}.
2257      *
2258      * @param corePoolSize the number of threads to keep in the pool
2259      * (unless timed out after an elapsed keep-alive). Normally (and
2260      * by default) this is the same value as the parallelism level,
2261      * but may be set to a larger value to reduce dynamic overhead if
2262      * tasks regularly block. Using a smaller value (for example
2263      * {@code 0}) has the same effect as the default.
2264      *
2265      * @param maximumPoolSize the maximum number of threads allowed.
2266      * When the maximum is reached, attempts to replace blocked
2267      * threads fail.  (However, because creation and termination of
2268      * different threads may overlap, and may be managed by the given
2269      * thread factory, this value may be transiently exceeded.)  To
2270      * arrange the same value as is used by default for the common
2271      * pool, use {@code 256} plus the {@code parallelism} level. (By
2272      * default, the common pool allows a maximum of 256 spare
2273      * threads.)  Using a value (for example {@code
2274      * Integer.MAX_VALUE}) larger than the implementation's total
2275      * thread limit has the same effect as using this limit (which is
2276      * the default).
2277      *
2278      * @param minimumRunnable the minimum allowed number of core
2279      * threads not blocked by a join or {@link ManagedBlocker}.  To
2280      * ensure progress, when too few unblocked threads exist and
2281      * unexecuted tasks may exist, new threads are constructed, up to
2282      * the given maximumPoolSize.  For the default value, use {@code
2283      * 1}, that ensures liveness.  A larger value might improve
2284      * throughput in the presence of blocked activities, but might
2285      * not, due to increased overhead.  A value of zero may be
2286      * acceptable when submitted tasks cannot have dependencies
2287      * requiring additional threads.
2288      *
2289      * @param saturate if non-null, a predicate invoked upon attempts
2290      * to create more than the maximum total allowed threads.  By
2291      * default, when a thread is about to block on a join or {@link
2292      * ManagedBlocker}, but cannot be replaced because the
2293      * maximumPoolSize would be exceeded, a {@link
2294      * RejectedExecutionException} is thrown.  But if this predicate
2295      * returns {@code true}, then no exception is thrown, so the pool
2296      * continues to operate with fewer than the target number of
2297      * runnable threads, which might not ensure progress.
2298      *
2299      * @param keepAliveTime the elapsed time since last use before
2300      * a thread is terminated (and then later replaced if needed).
2301      * For the default value, use {@code 60, TimeUnit.SECONDS}.
2302      *
2303      * @param unit the time unit for the {@code keepAliveTime} argument
2304      *
2305      * @throws IllegalArgumentException if parallelism is less than or
2306      *         equal to zero, or is greater than implementation limit,
2307      *         or if maximumPoolSize is less than parallelism,
2308      *         of if the keepAliveTime is less than or equal to zero.
2309      * @throws NullPointerException if the factory is null
2310      * @throws SecurityException if a security manager exists and
2311      *         the caller is not permitted to modify threads
2312      *         because it does not hold {@link
2313      *         java.lang.RuntimePermission}{@code ("modifyThread")}
2314      * @since 9
2315      */
2316     public ForkJoinPool(int parallelism,
2317                         ForkJoinWorkerThreadFactory factory,
2318                         UncaughtExceptionHandler handler,
2319                         boolean asyncMode,
2320                         int corePoolSize,
2321                         int maximumPoolSize,
2322                         int minimumRunnable,
2323                         Predicate<? super ForkJoinPool> saturate,
2324                         long keepAliveTime,
2325                         TimeUnit unit) {
2326         // check, encode, pack parameters
2327         if (parallelism <= 0 || parallelism > MAX_CAP ||
2328             maximumPoolSize < parallelism || keepAliveTime <= 0L)
2329             throw new IllegalArgumentException();
2330         if (factory == null)
2331             throw new NullPointerException();
2332         long ms = Math.max(unit.toMillis(keepAliveTime), TIMEOUT_SLOP);
2333 
2334         int corep = Math.min(Math.max(corePoolSize, parallelism), MAX_CAP);
2335         long c = ((((long)(-corep)       << TC_SHIFT) & TC_MASK) |
2336                   (((long)(-parallelism) << RC_SHIFT) & RC_MASK));
2337         int m = parallelism | (asyncMode ? FIFO : 0);
2338         int maxSpares = Math.min(maximumPoolSize, MAX_CAP) - parallelism;
2339         int minAvail = Math.min(Math.max(minimumRunnable, 0), MAX_CAP);
2340         int b = ((minAvail - parallelism) & SMASK) | (maxSpares << SWIDTH);
2341         int n = (parallelism > 1) ? parallelism - 1 : 1; // at least 2 slots
2342         n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16;
2343         n = (n + 1) << 1; // power of two, including space for submission queues
2344 
2345         this.workerNamePrefix = "ForkJoinPool-" + nextPoolId() + "-worker-";
2346         this.workQueues = new WorkQueue[n];
2347         this.factory = factory;
2348         this.ueh = handler;
2349         this.saturate = saturate;
2350         this.keepAlive = ms;
2351         this.bounds = b;
2352         this.mode = m;
2353         this.ctl = c;
2354         checkPermission();
2355     }
2356 
2357     private static Object newInstanceFromSystemProperty(String property)
2358         throws ReflectiveOperationException {
2359         String className = System.getProperty(property);
2360         return (className == null)
2361             ? null
2362             : ClassLoader.getSystemClassLoader().loadClass(className)
2363             .getConstructor().newInstance();
2364     }
2365 
2366     /**
2367      * Constructor for common pool using parameters possibly
2368      * overridden by system properties
2369      */
2370     private ForkJoinPool(byte forCommonPoolOnly) {
2371         int parallelism = -1;
2372         ForkJoinWorkerThreadFactory fac = null;
2373         UncaughtExceptionHandler handler = null;
2374         try {  // ignore exceptions in accessing/parsing properties
2375             String pp = System.getProperty
2376                 ("java.util.concurrent.ForkJoinPool.common.parallelism");
2377             if (pp != null)
2378                 parallelism = Integer.parseInt(pp);
2379             fac = (ForkJoinWorkerThreadFactory) newInstanceFromSystemProperty(
2380                 "java.util.concurrent.ForkJoinPool.common.threadFactory");
2381             handler = (UncaughtExceptionHandler) newInstanceFromSystemProperty(
2382                 "java.util.concurrent.ForkJoinPool.common.exceptionHandler");
2383         } catch (Exception ignore) {
2384         }
2385 
2386         if (fac == null) {
2387             if (System.getSecurityManager() == null)
2388                 fac = defaultForkJoinWorkerThreadFactory;
2389             else // use security-managed default
2390                 fac = new InnocuousForkJoinWorkerThreadFactory();
2391         }
2392         if (parallelism < 0 && // default 1 less than #cores
2393             (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
2394             parallelism = 1;
2395         if (parallelism > MAX_CAP)
2396             parallelism = MAX_CAP;
2397 
2398         long c = ((((long)(-parallelism) << TC_SHIFT) & TC_MASK) |
2399                   (((long)(-parallelism) << RC_SHIFT) & RC_MASK));
2400         int b = ((1 - parallelism) & SMASK) | (COMMON_MAX_SPARES << SWIDTH);
2401         int n = (parallelism > 1) ? parallelism - 1 : 1;
2402         n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16;
2403         n = (n + 1) << 1;
2404 
2405         this.workerNamePrefix = "ForkJoinPool.commonPool-worker-";
2406         this.workQueues = new WorkQueue[n];
2407         this.factory = fac;
2408         this.ueh = handler;
2409         this.saturate = null;
2410         this.keepAlive = DEFAULT_KEEPALIVE;
2411         this.bounds = b;
2412         this.mode = parallelism;
2413         this.ctl = c;
2414     }
2415 
2416     /**
2417      * Returns the common pool instance. This pool is statically
2418      * constructed; its run state is unaffected by attempts to {@link
2419      * #shutdown} or {@link #shutdownNow}. However this pool and any
2420      * ongoing processing are automatically terminated upon program
2421      * {@link System#exit}.  Any program that relies on asynchronous
2422      * task processing to complete before program termination should
2423      * invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence},
2424      * before exit.
2425      *
2426      * @return the common pool instance
2427      * @since 1.8
2428      */
2429     public static ForkJoinPool commonPool() {
2430         // assert common != null : "static init error";
2431         return common;
2432     }
2433 
2434     // Execution methods
2435 
2436     /**
2437      * Performs the given task, returning its result upon completion.
2438      * If the computation encounters an unchecked Exception or Error,
2439      * it is rethrown as the outcome of this invocation.  Rethrown
2440      * exceptions behave in the same way as regular exceptions, but,
2441      * when possible, contain stack traces (as displayed for example
2442      * using {@code ex.printStackTrace()}) of both the current thread
2443      * as well as the thread actually encountering the exception;
2444      * minimally only the latter.
2445      *
2446      * @param task the task
2447      * @param <T> the type of the task's result
2448      * @return the task's result
2449      * @throws NullPointerException if the task is null
2450      * @throws RejectedExecutionException if the task cannot be
2451      *         scheduled for execution
2452      */
2453     public <T> T invoke(ForkJoinTask<T> task) {
2454         if (task == null)
2455             throw new NullPointerException();
2456         externalSubmit(task);
2457         return task.join();
2458     }
2459 
2460     /**
2461      * Arranges for (asynchronous) execution of the given task.
2462      *
2463      * @param task the task
2464      * @throws NullPointerException if the task is null
2465      * @throws RejectedExecutionException if the task cannot be
2466      *         scheduled for execution
2467      */
2468     public void execute(ForkJoinTask<?> task) {
2469         externalSubmit(task);
2470     }
2471 
2472     // AbstractExecutorService methods
2473 
2474     /**
2475      * @throws NullPointerException if the task is null
2476      * @throws RejectedExecutionException if the task cannot be
2477      *         scheduled for execution
2478      */
2479     public void execute(Runnable task) {
2480         if (task == null)
2481             throw new NullPointerException();
2482         ForkJoinTask<?> job;
2483         if (task instanceof ForkJoinTask<?>) // avoid re-wrap
2484             job = (ForkJoinTask<?>) task;
2485         else
2486             job = new ForkJoinTask.RunnableExecuteAction(task);
2487         externalSubmit(job);
2488     }
2489 
2490     /**
2491      * Submits a ForkJoinTask for execution.
2492      *
2493      * @param task the task to submit
2494      * @param <T> the type of the task's result
2495      * @return the task
2496      * @throws NullPointerException if the task is null
2497      * @throws RejectedExecutionException if the task cannot be
2498      *         scheduled for execution
2499      */
2500     public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
2501         return externalSubmit(task);
2502     }
2503 
2504     /**
2505      * @throws NullPointerException if the task is null
2506      * @throws RejectedExecutionException if the task cannot be
2507      *         scheduled for execution
2508      */
2509     public <T> ForkJoinTask<T> submit(Callable<T> task) {
2510         return externalSubmit(new ForkJoinTask.AdaptedCallable<T>(task));
2511     }
2512 
2513     /**
2514      * @throws NullPointerException if the task is null
2515      * @throws RejectedExecutionException if the task cannot be
2516      *         scheduled for execution
2517      */
2518     public <T> ForkJoinTask<T> submit(Runnable task, T result) {
2519         return externalSubmit(new ForkJoinTask.AdaptedRunnable<T>(task, result));
2520     }
2521 
2522     /**
2523      * @throws NullPointerException if the task is null
2524      * @throws RejectedExecutionException if the task cannot be
2525      *         scheduled for execution
2526      */
2527     @SuppressWarnings("unchecked")
2528     public ForkJoinTask<?> submit(Runnable task) {
2529         if (task == null)
2530             throw new NullPointerException();
2531         return externalSubmit((task instanceof ForkJoinTask<?>)
2532             ? (ForkJoinTask<Void>) task // avoid re-wrap
2533             : new ForkJoinTask.AdaptedRunnableAction(task));
2534     }
2535 
2536     /**
2537      * @throws NullPointerException       {@inheritDoc}
2538      * @throws RejectedExecutionException {@inheritDoc}
2539      */
2540     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
2541         // In previous versions of this class, this method constructed
2542         // a task to run ForkJoinTask.invokeAll, but now external
2543         // invocation of multiple tasks is at least as efficient.
2544         ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
2545 
2546         try {
2547             for (Callable<T> t : tasks) {
2548                 ForkJoinTask<T> f = new ForkJoinTask.AdaptedCallable<T>(t);
2549                 futures.add(f);
2550                 externalSubmit(f);
2551             }
2552             for (int i = 0, size = futures.size(); i < size; i++)
2553                 ((ForkJoinTask<?>)futures.get(i)).quietlyJoin();
2554             return futures;
2555         } catch (Throwable t) {
2556             for (int i = 0, size = futures.size(); i < size; i++)
2557                 futures.get(i).cancel(false);
2558             throw t;
2559         }
2560     }
2561 
2562     /**
2563      * Returns the factory used for constructing new workers.
2564      *
2565      * @return the factory used for constructing new workers
2566      */
2567     public ForkJoinWorkerThreadFactory getFactory() {
2568         return factory;
2569     }
2570 
2571     /**
2572      * Returns the handler for internal worker threads that terminate
2573      * due to unrecoverable errors encountered while executing tasks.
2574      *
2575      * @return the handler, or {@code null} if none
2576      */
2577     public UncaughtExceptionHandler getUncaughtExceptionHandler() {
2578         return ueh;
2579     }
2580 
2581     /**
2582      * Returns the targeted parallelism level of this pool.
2583      *
2584      * @return the targeted parallelism level of this pool
2585      */
2586     public int getParallelism() {
2587         int par = mode & SMASK;
2588         return (par > 0) ? par : 1;
2589     }
2590 
2591     /**
2592      * Returns the targeted parallelism level of the common pool.
2593      *
2594      * @return the targeted parallelism level of the common pool
2595      * @since 1.8
2596      */
2597     public static int getCommonPoolParallelism() {
2598         return COMMON_PARALLELISM;
2599     }
2600 
2601     /**
2602      * Returns the number of worker threads that have started but not
2603      * yet terminated.  The result returned by this method may differ
2604      * from {@link #getParallelism} when threads are created to
2605      * maintain parallelism when others are cooperatively blocked.
2606      *
2607      * @return the number of worker threads
2608      */
2609     public int getPoolSize() {
2610         return ((mode & SMASK) + (short)(ctl >>> TC_SHIFT));
2611     }
2612 
2613     /**
2614      * Returns {@code true} if this pool uses local first-in-first-out
2615      * scheduling mode for forked tasks that are never joined.
2616      *
2617      * @return {@code true} if this pool uses async mode
2618      */
2619     public boolean getAsyncMode() {
2620         return (mode & FIFO) != 0;
2621     }
2622 
2623     /**
2624      * Returns an estimate of the number of worker threads that are
2625      * not blocked waiting to join tasks or for other managed
2626      * synchronization. This method may overestimate the
2627      * number of running threads.
2628      *
2629      * @return the number of worker threads
2630      */
2631     public int getRunningThreadCount() {
2632         int rc = 0;
2633         WorkQueue[] ws; WorkQueue w;
2634         if ((ws = workQueues) != null) {
2635             for (int i = 1; i < ws.length; i += 2) {
2636                 if ((w = ws[i]) != null && w.isApparentlyUnblocked())
2637                     ++rc;
2638             }
2639         }
2640         return rc;
2641     }
2642 
2643     /**
2644      * Returns an estimate of the number of threads that are currently
2645      * stealing or executing tasks. This method may overestimate the
2646      * number of active threads.
2647      *
2648      * @return the number of active threads
2649      */
2650     public int getActiveThreadCount() {
2651         int r = (mode & SMASK) + (int)(ctl >> RC_SHIFT);
2652         return (r <= 0) ? 0 : r; // suppress momentarily negative values
2653     }
2654 
2655     /**
2656      * Returns {@code true} if all worker threads are currently idle.
2657      * An idle worker is one that cannot obtain a task to execute
2658      * because none are available to steal from other threads, and
2659      * there are no pending submissions to the pool. This method is
2660      * conservative; it might not return {@code true} immediately upon
2661      * idleness of all threads, but will eventually become true if
2662      * threads remain inactive.
2663      *
2664      * @return {@code true} if all threads are currently idle
2665      */
2666     public boolean isQuiescent() {
2667         for (;;) {
2668             long c = ctl;
2669             int md = mode, pc = md & SMASK;
2670             int tc = pc + (short)(c >>> TC_SHIFT);
2671             int rc = pc + (int)(c >> RC_SHIFT);
2672             if ((md & (STOP | TERMINATED)) != 0)
2673                 return true;
2674             else if (rc > 0)
2675                 return false;
2676             else {
2677                 WorkQueue[] ws; WorkQueue v;
2678                 if ((ws = workQueues) != null) {
2679                     for (int i = 1; i < ws.length; i += 2) {
2680                         if ((v = ws[i]) != null) {
2681                             if ((v.source & QUIET) == 0)
2682                                 return false;
2683                             --tc;
2684                         }
2685                     }
2686                 }
2687                 if (tc == 0 && ctl == c)
2688                     return true;
2689             }
2690         }
2691     }
2692 
2693     /**
2694      * Returns an estimate of the total number of tasks stolen from
2695      * one thread's work queue by another. The reported value
2696      * underestimates the actual total number of steals when the pool
2697      * is not quiescent. This value may be useful for monitoring and
2698      * tuning fork/join programs: in general, steal counts should be
2699      * high enough to keep threads busy, but low enough to avoid
2700      * overhead and contention across threads.
2701      *
2702      * @return the number of steals
2703      */
2704     public long getStealCount() {
2705         long count = stealCount;
2706         WorkQueue[] ws; WorkQueue w;
2707         if ((ws = workQueues) != null) {
2708             for (int i = 1; i < ws.length; i += 2) {
2709                 if ((w = ws[i]) != null)
2710                     count += (long)w.nsteals & 0xffffffffL;
2711             }
2712         }
2713         return count;
2714     }
2715 
2716     /**
2717      * Returns an estimate of the total number of tasks currently held
2718      * in queues by worker threads (but not including tasks submitted
2719      * to the pool that have not begun executing). This value is only
2720      * an approximation, obtained by iterating across all threads in
2721      * the pool. This method may be useful for tuning task
2722      * granularities.
2723      *
2724      * @return the number of queued tasks
2725      */
2726     public long getQueuedTaskCount() {
2727         long count = 0;
2728         WorkQueue[] ws; WorkQueue w;
2729         if ((ws = workQueues) != null) {
2730             for (int i = 1; i < ws.length; i += 2) {
2731                 if ((w = ws[i]) != null)
2732                     count += w.queueSize();
2733             }
2734         }
2735         return count;
2736     }
2737 
2738     /**
2739      * Returns an estimate of the number of tasks submitted to this
2740      * pool that have not yet begun executing.  This method may take
2741      * time proportional to the number of submissions.
2742      *
2743      * @return the number of queued submissions
2744      */
2745     public int getQueuedSubmissionCount() {
2746         int count = 0;
2747         WorkQueue[] ws; WorkQueue w;
2748         if ((ws = workQueues) != null) {
2749             for (int i = 0; i < ws.length; i += 2) {
2750                 if ((w = ws[i]) != null)
2751                     count += w.queueSize();
2752             }
2753         }
2754         return count;
2755     }
2756 
2757     /**
2758      * Returns {@code true} if there are any tasks submitted to this
2759      * pool that have not yet begun executing.
2760      *
2761      * @return {@code true} if there are any queued submissions
2762      */
2763     public boolean hasQueuedSubmissions() {
2764         WorkQueue[] ws; WorkQueue w;
2765         if ((ws = workQueues) != null) {
2766             for (int i = 0; i < ws.length; i += 2) {
2767                 if ((w = ws[i]) != null && !w.isEmpty())
2768                     return true;
2769             }
2770         }
2771         return false;
2772     }
2773 
2774     /**
2775      * Removes and returns the next unexecuted submission if one is
2776      * available.  This method may be useful in extensions to this
2777      * class that re-assign work in systems with multiple pools.
2778      *
2779      * @return the next submission, or {@code null} if none
2780      */
2781     protected ForkJoinTask<?> pollSubmission() {
2782         return pollScan(true);
2783     }
2784 
2785     /**
2786      * Removes all available unexecuted submitted and forked tasks
2787      * from scheduling queues and adds them to the given collection,
2788      * without altering their execution status. These may include
2789      * artificially generated or wrapped tasks. This method is
2790      * designed to be invoked only when the pool is known to be
2791      * quiescent. Invocations at other times may not remove all
2792      * tasks. A failure encountered while attempting to add elements
2793      * to collection {@code c} may result in elements being in
2794      * neither, either or both collections when the associated
2795      * exception is thrown.  The behavior of this operation is
2796      * undefined if the specified collection is modified while the
2797      * operation is in progress.
2798      *
2799      * @param c the collection to transfer elements into
2800      * @return the number of elements transferred
2801      */
2802     protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
2803         int count = 0;
2804         WorkQueue[] ws; WorkQueue w; ForkJoinTask<?> t;
2805         if ((ws = workQueues) != null) {
2806             for (int i = 0; i < ws.length; ++i) {
2807                 if ((w = ws[i]) != null) {
2808                     while ((t = w.poll()) != null) {
2809                         c.add(t);
2810                         ++count;
2811                     }
2812                 }
2813             }
2814         }
2815         return count;
2816     }
2817 
2818     /**
2819      * Returns a string identifying this pool, as well as its state,
2820      * including indications of run state, parallelism level, and
2821      * worker and task counts.
2822      *
2823      * @return a string identifying this pool, as well as its state
2824      */
2825     public String toString() {
2826         // Use a single pass through workQueues to collect counts
2827         long qt = 0L, qs = 0L; int rc = 0;
2828         long st = stealCount;
2829         WorkQueue[] ws; WorkQueue w;
2830         if ((ws = workQueues) != null) {
2831             for (int i = 0; i < ws.length; ++i) {
2832                 if ((w = ws[i]) != null) {
2833                     int size = w.queueSize();
2834                     if ((i & 1) == 0)
2835                         qs += size;
2836                     else {
2837                         qt += size;
2838                         st += (long)w.nsteals & 0xffffffffL;
2839                         if (w.isApparentlyUnblocked())
2840                             ++rc;
2841                     }
2842                 }
2843             }
2844         }
2845 
2846         int md = mode;
2847         int pc = (md & SMASK);
2848         long c = ctl;
2849         int tc = pc + (short)(c >>> TC_SHIFT);
2850         int ac = pc + (int)(c >> RC_SHIFT);
2851         if (ac < 0) // ignore transient negative
2852             ac = 0;
2853         String level = ((md & TERMINATED) != 0 ? "Terminated" :
2854                         (md & STOP)       != 0 ? "Terminating" :
2855                         (md & SHUTDOWN)   != 0 ? "Shutting down" :
2856                         "Running");
2857         return super.toString() +
2858             "[" + level +
2859             ", parallelism = " + pc +
2860             ", size = " + tc +
2861             ", active = " + ac +
2862             ", running = " + rc +
2863             ", steals = " + st +
2864             ", tasks = " + qt +
2865             ", submissions = " + qs +
2866             "]";
2867     }
2868 
2869     /**
2870      * Possibly initiates an orderly shutdown in which previously
2871      * submitted tasks are executed, but no new tasks will be
2872      * accepted. Invocation has no effect on execution state if this
2873      * is the {@link #commonPool()}, and no additional effect if
2874      * already shut down.  Tasks that are in the process of being
2875      * submitted concurrently during the course of this method may or
2876      * may not be rejected.
2877      *
2878      * @throws SecurityException if a security manager exists and
2879      *         the caller is not permitted to modify threads
2880      *         because it does not hold {@link
2881      *         java.lang.RuntimePermission}{@code ("modifyThread")}
2882      */
2883     public void shutdown() {
2884         checkPermission();
2885         tryTerminate(false, true);
2886     }
2887 
2888     /**
2889      * Possibly attempts to cancel and/or stop all tasks, and reject
2890      * all subsequently submitted tasks.  Invocation has no effect on
2891      * execution state if this is the {@link #commonPool()}, and no
2892      * additional effect if already shut down. Otherwise, tasks that
2893      * are in the process of being submitted or executed concurrently
2894      * during the course of this method may or may not be
2895      * rejected. This method cancels both existing and unexecuted
2896      * tasks, in order to permit termination in the presence of task
2897      * dependencies. So the method always returns an empty list
2898      * (unlike the case for some other Executors).
2899      *
2900      * @return an empty list
2901      * @throws SecurityException if a security manager exists and
2902      *         the caller is not permitted to modify threads
2903      *         because it does not hold {@link
2904      *         java.lang.RuntimePermission}{@code ("modifyThread")}
2905      */
2906     public List<Runnable> shutdownNow() {
2907         checkPermission();
2908         tryTerminate(true, true);
2909         return Collections.emptyList();
2910     }
2911 
2912     /**
2913      * Returns {@code true} if all tasks have completed following shut down.
2914      *
2915      * @return {@code true} if all tasks have completed following shut down
2916      */
2917     public boolean isTerminated() {
2918         return (mode & TERMINATED) != 0;
2919     }
2920 
2921     /**
2922      * Returns {@code true} if the process of termination has
2923      * commenced but not yet completed.  This method may be useful for
2924      * debugging. A return of {@code true} reported a sufficient
2925      * period after shutdown may indicate that submitted tasks have
2926      * ignored or suppressed interruption, or are waiting for I/O,
2927      * causing this executor not to properly terminate. (See the
2928      * advisory notes for class {@link ForkJoinTask} stating that
2929      * tasks should not normally entail blocking operations.  But if
2930      * they do, they must abort them on interrupt.)
2931      *
2932      * @return {@code true} if terminating but not yet terminated
2933      */
2934     public boolean isTerminating() {
2935         int md = mode;
2936         return (md & STOP) != 0 && (md & TERMINATED) == 0;
2937     }
2938 
2939     /**
2940      * Returns {@code true} if this pool has been shut down.
2941      *
2942      * @return {@code true} if this pool has been shut down
2943      */
2944     public boolean isShutdown() {
2945         return (mode & SHUTDOWN) != 0;
2946     }
2947 
2948     /**
2949      * Blocks until all tasks have completed execution after a
2950      * shutdown request, or the timeout occurs, or the current thread
2951      * is interrupted, whichever happens first. Because the {@link
2952      * #commonPool()} never terminates until program shutdown, when
2953      * applied to the common pool, this method is equivalent to {@link
2954      * #awaitQuiescence(long, TimeUnit)} but always returns {@code false}.
2955      *
2956      * @param timeout the maximum time to wait
2957      * @param unit the time unit of the timeout argument
2958      * @return {@code true} if this executor terminated and
2959      *         {@code false} if the timeout elapsed before termination
2960      * @throws InterruptedException if interrupted while waiting
2961      */
2962     public boolean awaitTermination(long timeout, TimeUnit unit)
2963         throws InterruptedException {
2964         if (Thread.interrupted())
2965             throw new InterruptedException();
2966         if (this == common) {
2967             awaitQuiescence(timeout, unit);
2968             return false;
2969         }
2970         long nanos = unit.toNanos(timeout);
2971         if (isTerminated())
2972             return true;
2973         if (nanos <= 0L)
2974             return false;
2975         long deadline = System.nanoTime() + nanos;
2976         synchronized (this) {
2977             for (;;) {
2978                 if (isTerminated())
2979                     return true;
2980                 if (nanos <= 0L)
2981                     return false;
2982                 long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
2983                 wait(millis > 0L ? millis : 1L);
2984                 nanos = deadline - System.nanoTime();
2985             }
2986         }
2987     }
2988 
2989     /**
2990      * If called by a ForkJoinTask operating in this pool, equivalent
2991      * in effect to {@link ForkJoinTask#helpQuiesce}. Otherwise,
2992      * waits and/or attempts to assist performing tasks until this
2993      * pool {@link #isQuiescent} or the indicated timeout elapses.
2994      *
2995      * @param timeout the maximum time to wait
2996      * @param unit the time unit of the timeout argument
2997      * @return {@code true} if quiescent; {@code false} if the
2998      * timeout elapsed.
2999      */
3000     public boolean awaitQuiescence(long timeout, TimeUnit unit) {
3001         long nanos = unit.toNanos(timeout);
3002         ForkJoinWorkerThread wt;
3003         Thread thread = Thread.currentThread();
3004         if ((thread instanceof ForkJoinWorkerThread) &&
3005             (wt = (ForkJoinWorkerThread)thread).pool == this) {
3006             helpQuiescePool(wt.workQueue);
3007             return true;
3008         }
3009         else {
3010             for (long startTime = System.nanoTime();;) {
3011                 ForkJoinTask<?> t;
3012                 if ((t = pollScan(false)) != null)
3013                     t.doExec();
3014                 else if (isQuiescent())
3015                     return true;
3016                 else if ((System.nanoTime() - startTime) > nanos)
3017                     return false;
3018                 else
3019                     Thread.yield(); // cannot block
3020             }
3021         }
3022     }
3023 
3024     /**
3025      * Waits and/or attempts to assist performing tasks indefinitely
3026      * until the {@link #commonPool()} {@link #isQuiescent}.
3027      */
3028     static void quiesceCommonPool() {
3029         common.awaitQuiescence(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
3030     }
3031 
3032     /**
3033      * Interface for extending managed parallelism for tasks running
3034      * in {@link ForkJoinPool}s.
3035      *
3036      * <p>A {@code ManagedBlocker} provides two methods.  Method
3037      * {@link #isReleasable} must return {@code true} if blocking is
3038      * not necessary. Method {@link #block} blocks the current thread
3039      * if necessary (perhaps internally invoking {@code isReleasable}
3040      * before actually blocking). These actions are performed by any
3041      * thread invoking {@link ForkJoinPool#managedBlock(ManagedBlocker)}.
3042      * The unusual methods in this API accommodate synchronizers that
3043      * may, but don't usually, block for long periods. Similarly, they
3044      * allow more efficient internal handling of cases in which
3045      * additional workers may be, but usually are not, needed to
3046      * ensure sufficient parallelism.  Toward this end,
3047      * implementations of method {@code isReleasable} must be amenable
3048      * to repeated invocation.
3049      *
3050      * <p>For example, here is a ManagedBlocker based on a
3051      * ReentrantLock:
3052      * <pre> {@code
3053      * class ManagedLocker implements ManagedBlocker {
3054      *   final ReentrantLock lock;
3055      *   boolean hasLock = false;
3056      *   ManagedLocker(ReentrantLock lock) { this.lock = lock; }
3057      *   public boolean block() {
3058      *     if (!hasLock)
3059      *       lock.lock();
3060      *     return true;
3061      *   }
3062      *   public boolean isReleasable() {
3063      *     return hasLock || (hasLock = lock.tryLock());
3064      *   }
3065      * }}</pre>
3066      *
3067      * <p>Here is a class that possibly blocks waiting for an
3068      * item on a given queue:
3069      * <pre> {@code
3070      * class QueueTaker<E> implements ManagedBlocker {
3071      *   final BlockingQueue<E> queue;
3072      *   volatile E item = null;
3073      *   QueueTaker(BlockingQueue<E> q) { this.queue = q; }
3074      *   public boolean block() throws InterruptedException {
3075      *     if (item == null)
3076      *       item = queue.take();
3077      *     return true;
3078      *   }
3079      *   public boolean isReleasable() {
3080      *     return item != null || (item = queue.poll()) != null;
3081      *   }
3082      *   public E getItem() { // call after pool.managedBlock completes
3083      *     return item;
3084      *   }
3085      * }}</pre>
3086      */
3087     public static interface ManagedBlocker {
3088         /**
3089          * Possibly blocks the current thread, for example waiting for
3090          * a lock or condition.
3091          *
3092          * @return {@code true} if no additional blocking is necessary
3093          * (i.e., if isReleasable would return true)
3094          * @throws InterruptedException if interrupted while waiting
3095          * (the method is not required to do so, but is allowed to)
3096          */
3097         boolean block() throws InterruptedException;
3098 
3099         /**
3100          * Returns {@code true} if blocking is unnecessary.
3101          * @return {@code true} if blocking is unnecessary
3102          */
3103         boolean isReleasable();
3104     }
3105 
3106     /**
3107      * Runs the given possibly blocking task.  When {@linkplain
3108      * ForkJoinTask#inForkJoinPool() running in a ForkJoinPool}, this
3109      * method possibly arranges for a spare thread to be activated if
3110      * necessary to ensure sufficient parallelism while the current
3111      * thread is blocked in {@link ManagedBlocker#block blocker.block()}.
3112      *
3113      * <p>This method repeatedly calls {@code blocker.isReleasable()} and
3114      * {@code blocker.block()} until either method returns {@code true}.
3115      * Every call to {@code blocker.block()} is preceded by a call to
3116      * {@code blocker.isReleasable()} that returned {@code false}.
3117      *
3118      * <p>If not running in a ForkJoinPool, this method is
3119      * behaviorally equivalent to
3120      * <pre> {@code
3121      * while (!blocker.isReleasable())
3122      *   if (blocker.block())
3123      *     break;}</pre>
3124      *
3125      * If running in a ForkJoinPool, the pool may first be expanded to
3126      * ensure sufficient parallelism available during the call to
3127      * {@code blocker.block()}.
3128      *
3129      * @param blocker the blocker task
3130      * @throws InterruptedException if {@code blocker.block()} did so
3131      */
3132     public static void managedBlock(ManagedBlocker blocker)
3133         throws InterruptedException {
3134         ForkJoinPool p;
3135         ForkJoinWorkerThread wt;
3136         WorkQueue w;
3137         Thread t = Thread.currentThread();
3138         if ((t instanceof ForkJoinWorkerThread) &&
3139             (p = (wt = (ForkJoinWorkerThread)t).pool) != null &&
3140             (w = wt.workQueue) != null) {
3141             int block;
3142             while (!blocker.isReleasable()) {
3143                 if ((block = p.tryCompensate(w)) != 0) {
3144                     try {
3145                         do {} while (!blocker.isReleasable() &&
3146                                      !blocker.block());
3147                     } finally {
3148                         CTL.getAndAdd(p, (block > 0) ? RC_UNIT : 0L);
3149                     }
3150                     break;
3151                 }
3152             }
3153         }
3154         else {
3155             do {} while (!blocker.isReleasable() &&
3156                          !blocker.block());
3157         }
3158     }
3159 
3160     /**
3161      * If the given executor is a ForkJoinPool, poll and execute
3162      * AsynchronousCompletionTasks from worker's queue until none are
3163      * available or blocker is released.
3164      */
3165     static void helpAsyncBlocker(Executor e, ManagedBlocker blocker) {
3166         if (blocker != null && (e instanceof ForkJoinPool)) {
3167             WorkQueue w; ForkJoinWorkerThread wt; WorkQueue[] ws; int r, n;
3168             ForkJoinPool p = (ForkJoinPool)e;
3169             Thread thread = Thread.currentThread();
3170             if (thread instanceof ForkJoinWorkerThread &&
3171                 (wt = (ForkJoinWorkerThread)thread).pool == p)
3172                 w = wt.workQueue;
3173             else if ((r = ThreadLocalRandom.getProbe()) != 0 &&
3174                      (ws = p.workQueues) != null && (n = ws.length) > 0)
3175                 w = ws[(n - 1) & r & SQMASK];
3176             else
3177                 w = null;
3178             if (w != null) {
3179                 for (;;) {
3180                     int b = w.base, s = w.top, d, al; ForkJoinTask<?>[] a;
3181                     if ((a = w.array) != null && (d = b - s) < 0 &&
3182                         (al = a.length) > 0) {
3183                         int index = (al - 1) & b;
3184                         ForkJoinTask<?> t = (ForkJoinTask<?>)
3185                             QA.getAcquire(a, index);
3186                         if (blocker.isReleasable())
3187                             break;
3188                         else if (b++ == w.base) {
3189                             if (t == null) {
3190                                 if (d == -1)
3191                                     break;
3192                             }
3193                             else if (!(t instanceof CompletableFuture.
3194                                   AsynchronousCompletionTask))
3195                                 break;
3196                             else if (QA.compareAndSet(a, index, t, null)) {
3197                                 w.base = b;
3198                                 t.doExec();
3199                             }
3200                         }
3201                     }
3202                     else
3203                         break;
3204                 }
3205             }
3206         }
3207     }
3208 
3209     // AbstractExecutorService overrides.  These rely on undocumented
3210     // fact that ForkJoinTask.adapt returns ForkJoinTasks that also
3211     // implement RunnableFuture.
3212 
3213     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
3214         return new ForkJoinTask.AdaptedRunnable<T>(runnable, value);
3215     }
3216 
3217     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
3218         return new ForkJoinTask.AdaptedCallable<T>(callable);
3219     }
3220 
3221     // VarHandle mechanics
3222     private static final VarHandle CTL;
3223     private static final VarHandle MODE;
3224     private static final VarHandle QA;
3225 
3226     static {
3227         try {
3228             MethodHandles.Lookup l = MethodHandles.lookup();
3229             CTL = l.findVarHandle(ForkJoinPool.class, "ctl", long.class);
3230             MODE = l.findVarHandle(ForkJoinPool.class, "mode", int.class);
3231             QA = MethodHandles.arrayElementVarHandle(ForkJoinTask[].class);
3232         } catch (ReflectiveOperationException e) {
3233             throw new Error(e);
3234         }
3235 
3236         // Reduce the risk of rare disastrous classloading in first call to
3237         // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
3238         Class<?> ensureLoaded = LockSupport.class;
3239 
3240         int commonMaxSpares = DEFAULT_COMMON_MAX_SPARES;
3241         try {
3242             String p = System.getProperty
3243                 ("java.util.concurrent.ForkJoinPool.common.maximumSpares");
3244             if (p != null)
3245                 commonMaxSpares = Integer.parseInt(p);
3246         } catch (Exception ignore) {}
3247         COMMON_MAX_SPARES = commonMaxSpares;
3248 
3249         defaultForkJoinWorkerThreadFactory =
3250             new DefaultForkJoinWorkerThreadFactory();
3251         modifyThreadPermission = new RuntimePermission("modifyThread");
3252 
3253         common = AccessController.doPrivileged(new PrivilegedAction<>() {
3254             public ForkJoinPool run() {
3255                 return new ForkJoinPool((byte)0); }});
3256 
3257         COMMON_PARALLELISM = Math.max(common.mode & SMASK, 1);
3258     }
3259 
3260     /**
3261      * Factory for innocuous worker threads.
3262      */
3263     private static final class InnocuousForkJoinWorkerThreadFactory
3264         implements ForkJoinWorkerThreadFactory {
3265 
3266         /**
3267          * An ACC to restrict permissions for the factory itself.
3268          * The constructed workers have no permissions set.
3269          */
3270         private static final AccessControlContext ACC = contextWithPermissions(
3271             modifyThreadPermission,
3272             new RuntimePermission("enableContextClassLoaderOverride"),
3273             new RuntimePermission("modifyThreadGroup"),
3274             new RuntimePermission("getClassLoader"),
3275             new RuntimePermission("setContextClassLoader"));
3276 
3277         public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
3278             return AccessController.doPrivileged(
3279                 new PrivilegedAction<>() {
3280                     public ForkJoinWorkerThread run() {
3281                         return new ForkJoinWorkerThread.
3282                             InnocuousForkJoinWorkerThread(pool); }},
3283                 ACC);
3284         }
3285     }
3286 }