< prev index next >

src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java

Print this page
8200520: forkjoin tasks interrupted after shutdown
Reviewed-by: martin, psandoz, chegar, dholmes


 167  * maximum number of running threads to 32767. Attempts to create
 168  * pools with greater than the maximum number result in
 169  * {@code IllegalArgumentException}.
 170  *
 171  * <p>This implementation rejects submitted tasks (that is, by throwing
 172  * {@link RejectedExecutionException}) only when the pool is shut down
 173  * or internal resources have been exhausted.
 174  *
 175  * @since 1.7
 176  * @author Doug Lea
 177  */
 178 public class ForkJoinPool extends AbstractExecutorService {
 179 
 180     /*
 181      * Implementation Overview
 182      *
 183      * This class and its nested classes provide the main
 184      * functionality and control for a set of worker threads:
 185      * Submissions from non-FJ threads enter into submission queues.
 186      * Workers take these tasks and typically split them into subtasks
 187      * that may be stolen by other workers.  Preference rules give
 188      * first priority to processing tasks from their own queues (LIFO
 189      * or FIFO, depending on mode), then to randomized FIFO steals of
 190      * tasks in other queues.  This framework began as vehicle for
 191      * supporting tree-structured parallelism using work-stealing.
 192      * Over time, its scalability advantages led to extensions and
 193      * changes to better support more diverse usage contexts.  Because
 194      * most internal methods and nested classes are interrelated,
 195      * their main rationale and descriptions are presented here;
 196      * individual methods and nested classes contain only brief
 197      * comments about details.





 198      *
 199      * WorkQueues
 200      * ==========
 201      *
 202      * Most operations occur within work-stealing queues (in nested
 203      * class WorkQueue).  These are special forms of Deques that
 204      * support only three of the four possible end-operations -- push,
 205      * pop, and poll (aka steal), under the further constraints that
 206      * push and pop are called only from the owning thread (or, as
 207      * extended here, under a lock), while poll may be called from
 208      * other threads.  (If you are unfamiliar with them, you probably
 209      * want to read Herlihy and Shavit's book "The Art of
 210      * Multiprocessor programming", chapter 16 describing these in
 211      * more detail before proceeding.)  The main work-stealing queue
 212      * design is roughly similar to those in the papers "Dynamic
 213      * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
 214      * (http://research.sun.com/scalable/pubs/index.html) and
 215      * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
 216      * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
 217      * The main differences ultimately stem from GC requirements that
 218      * we null out taken slots as soon as we can, to maintain as small
 219      * a footprint as possible even in programs generating huge
 220      * numbers of tasks. To accomplish this, we shift the CAS
 221      * arbitrating pop vs poll (steal) from being on the indices
 222      * ("base" and "top") to the slots themselves.
 223      *
 224      * Adding tasks then takes the form of a classic array push(task)
 225      * in a circular buffer:
 226      *    q.array[q.top++ % length] = task;
 227      *
 228      * (The actual code needs to null-check and size-check the array,
 229      * uses masking, not mod, for indexing a power-of-two-sized array,
 230      * properly fences accesses, and possibly signals waiting workers
 231      * to start scanning -- see below.)  Both a successful pop and
 232      * poll mainly entail a CAS of a slot from non-null to null.

 233      *
 234      * The pop operation (always performed by owner) is:
 235      *   if ((the task at top slot is not null) and
 236      *        (CAS slot to null))
 237      *           decrement top and return task;
 238      *
 239      * And the poll operation (usually by a stealer) is
 240      *    if ((the task at base slot is not null) and
 241      *        (CAS slot to null))
 242      *           increment base and return task;
 243      *
 244      * There are several variants of each of these. In particular,
 245      * almost all uses of poll occur within scan operations that also
 246      * interleave contention tracking (with associated code sprawl.)





 247      *
 248      * Memory ordering.  See "Correct and Efficient Work-Stealing for
 249      * Weak Memory Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013
 250      * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an
 251      * analysis of memory ordering requirements in work-stealing
 252      * algorithms similar to (but different than) the one used here.
 253      * Extracting tasks in array slots via (fully fenced) CAS provides
 254      * primary synchronization. The base and top indices imprecisely
 255      * guide where to extract from. We do not always require strict
 256      * orderings of array and index updates, so sometimes let them be
 257      * subject to compiler and processor reorderings. However, the
 258      * volatile "base" index also serves as a basis for memory
 259      * ordering: Slot accesses are preceded by a read of base,
 260      * ensuring happens-before ordering with respect to stealers (so
 261      * the slots themselves can be read via plain array reads.)  The
 262      * only other memory orderings relied on are maintained in the
 263      * course of signalling and activation (see below).  A check that
 264      * base == top indicates (momentary) emptiness, but otherwise may
 265      * err on the side of possibly making the queue appear nonempty
 266      * when a push, pop, or poll have not fully committed, or making
 267      * it appear empty when an update of top has not yet been visibly
 268      * written.  (Method isEmpty() checks the case of a partially
 269      * completed removal of the last element.)  Because of this, the
 270      * poll operation, considered individually, is not wait-free. One
 271      * thief cannot successfully continue until another in-progress
 272      * one (or, if previously empty, a push) visibly completes.
 273      * However, in the aggregate, we ensure at least probabilistic








 274      * non-blockingness.  If an attempted steal fails, a scanning
 275      * thief chooses a different random victim target to try next. So,
 276      * in order for one thief to progress, it suffices for any
 277      * in-progress poll or new push on any empty queue to
 278      * complete.
 279      *
 280      * This approach also enables support of a user mode in which
 281      * local task processing is in FIFO, not LIFO order, simply by
 282      * using poll rather than pop.  This can be useful in
 283      * message-passing frameworks in which tasks are never joined.
 284      *
 285      * WorkQueues are also used in a similar way for tasks submitted
 286      * to the pool. We cannot mix these tasks in the same queues used
 287      * by workers. Instead, we randomly associate submission queues
 288      * with submitting threads, using a form of hashing.  The
 289      * ThreadLocalRandom probe value serves as a hash code for
 290      * choosing existing queues, and may be randomly repositioned upon
 291      * contention with other submitters.  In essence, submitters act
 292      * like workers except that they are restricted to executing local
 293      * tasks that they submitted.  Insertion of tasks in shared mode
 294      * requires a lock but we use only a simple spinlock (using field
 295      * phase), because submitters encountering a busy queue move to a
 296      * different position to use or create other queues -- they block
 297      * only when creating and registering new queues. Because it is
 298      * used only as a spinlock, unlocking requires only a "releasing"
 299      * store (using setRelease).
 300      *
 301      * Management
 302      * ==========
 303      *
 304      * The main throughput advantages of work-stealing stem from
 305      * decentralized control -- workers mostly take tasks from
 306      * themselves or each other, at rates that can exceed a billion
 307      * per second.  The pool itself creates, activates (enables
 308      * scanning for and running tasks), deactivates, blocks, and
 309      * terminates threads, all with minimal central information.
 310      * There are only a few properties that we can globally track or
 311      * maintain, so we pack them into a small number of variables,
 312      * often maintaining atomicity without blocking or locking.
 313      * Nearly all essentially atomic control state is held in a few
 314      * volatile variables that are by far most often read (not
 315      * written) as status and consistency checks. We pack as much
 316      * information into them as we can.
 317      *
 318      * Field "ctl" contains 64 bits holding information needed to
 319      * atomically decide to add, enqueue (on an event queue), and
 320      * dequeue (and release)-activate workers.  To enable this
 321      * packing, we restrict maximum parallelism to (1<<15)-1 (which is
 322      * far in excess of normal operating range) to allow ids, counts,
 323      * and their negations (used for thresholding) to fit into 16bit
 324      * subfields.
 325      *
 326      * Field "mode" holds configuration parameters as well as lifetime
 327      * status, atomically and monotonically setting SHUTDOWN, STOP,
 328      * and finally TERMINATED bits.
 329      *
 330      * Field "workQueues" holds references to WorkQueues.  It is
 331      * updated (only during worker creation and termination) under
 332      * lock (using field workerNamePrefix as lock), but is otherwise
 333      * concurrently readable, and accessed directly. We also ensure
 334      * that uses of the array reference itself never become too stale
 335      * in case of resizing.  To simplify index-based operations, the
 336      * array size is always a power of two, and all readers must
 337      * tolerate null slots. Worker queues are at odd indices. Shared
 338      * (submission) queues are at even indices, up to a maximum of 64
 339      * slots, to limit growth even if array needs to expand to add
 340      * more workers. Grouping them together in this way simplifies and
 341      * speeds up task scanning.

 342      *
 343      * All worker thread creation is on-demand, triggered by task
 344      * submissions, replacement of terminated workers, and/or
 345      * compensation for blocked workers. However, all other support
 346      * code is set up to work with other policies.  To ensure that we
 347      * do not hold on to worker references that would prevent GC, all
 348      * accesses to workQueues are via indices into the workQueues
 349      * array (which is one source of some of the messy code
 350      * constructions here). In essence, the workQueues array serves as
 351      * a weak reference mechanism. Thus for example the stack top
 352      * subfield of ctl stores indices, not references.
 353      *
 354      * Queuing Idle Workers. Unlike HPC work-stealing frameworks, we
 355      * cannot let workers spin indefinitely scanning for tasks when
 356      * none can be found immediately, and we cannot start/resume
 357      * workers unless there appear to be tasks available.  On the
 358      * other hand, we must quickly prod them into action when new
 359      * tasks are submitted or generated. In many usages, ramp-up time
 360      * is the main limiting factor in overall performance, which is
 361      * compounded at program start-up by JIT compilation and


 399      * exception is propagated, generally to some external caller.
 400      * Worker index assignment avoids the bias in scanning that would
 401      * occur if entries were sequentially packed starting at the front
 402      * of the workQueues array. We treat the array as a simple
 403      * power-of-two hash table, expanding as needed. The seedIndex
 404      * increment ensures no collisions until a resize is needed or a
 405      * worker is deregistered and replaced, and thereafter keeps
 406      * probability of collision low. We cannot use
 407      * ThreadLocalRandom.getProbe() for similar purposes here because
 408      * the thread has not started yet, but do so for creating
 409      * submission queues for existing external threads (see
 410      * externalPush).
 411      *
 412      * WorkQueue field "phase" is used by both workers and the pool to
 413      * manage and track whether a worker is UNSIGNALLED (possibly
 414      * blocked waiting for a signal).  When a worker is enqueued its
 415      * phase field is set. Note that phase field updates lag queue CAS
 416      * releases so usage requires care -- seeing a negative phase does
 417      * not guarantee that the worker is available. When queued, the
 418      * lower 16 bits of scanState must hold its pool index. So we
 419      * place the index there upon initialization (see registerWorker)
 420      * and otherwise keep it there or restore it when necessary.
 421      *
 422      * The ctl field also serves as the basis for memory
 423      * synchronization surrounding activation. This uses a more
 424      * efficient version of a Dekker-like rule that task producers and
 425      * consumers sync with each other by both writing/CASing ctl (even
 426      * if to its current value).  This would be extremely costly. So
 427      * we relax it in several ways: (1) Producers only signal when
 428      * their queue is empty. Other workers propagate this signal (in
 429      * method scan) when they find tasks; to further reduce flailing,
 430      * each worker signals only one other per activation. (2) Workers
 431      * only enqueue after scanning (see below) and not finding any
 432      * tasks.  (3) Rather than CASing ctl to its current value in the
 433      * common case where no action is required, we reduce write

 434      * contention by equivalently prefacing signalWork when called by
 435      * an external task producer using a memory access with
 436      * full-volatile semantics or a "fullFence".
 437      *
 438      * Almost always, too many signals are issued. A task producer
 439      * cannot in general tell if some existing worker is in the midst
 440      * of finishing one task (or already scanning) and ready to take
 441      * another without being signalled. So the producer might instead
 442      * activate a different worker that does not find any work, and
 443      * then inactivates. This scarcely matters in steady-state
 444      * computations involving all workers, but can create contention
 445      * and bookkeeping bottlenecks during ramp-up, ramp-down, and small
 446      * computations involving only a few workers.
 447      *
 448      * Scanning. Method runWorker performs top-level scanning for
 449      * tasks.  Each scan traverses and tries to poll from each queue
 450      * starting at a random index and circularly stepping. Scans are
 451      * not performed in ideal random permutation order, to reduce
 452      * cacheline contention.  The pseudorandom generator need not have

 453      * high-quality statistical properties in the long term, but just
 454      * within computations; We use Marsaglia XorShifts (often via
 455      * ThreadLocalRandom.nextSecondarySeed), which are cheap and
 456      * suffice. Scanning also employs contention reduction: When
 457      * scanning workers fail to extract an apparently existing task,
 458      * they soon restart at a different pseudorandom index.  This
 459      * improves throughput when many threads are trying to take tasks
 460      * from few queues, which can be common in some usages.  Scans do
 461      * not otherwise explicitly take into account core affinities,
 462      * loads, cache localities, etc, However, they do exploit temporal
 463      * locality (which usually approximates these) by preferring to
 464      * re-poll (at most #workers times) from the same queue after a
 465      * successful poll before trying others.






 466      *
 467      * Trimming workers. To release resources after periods of lack of
 468      * use, a worker starting to wait when the pool is quiescent will
 469      * time out and terminate (see method scan) if the pool has
 470      * remained quiescent for period given by field keepAlive.
 471      *
 472      * Shutdown and Termination. A call to shutdownNow invokes
 473      * tryTerminate to atomically set a runState bit. The calling
 474      * thread, as well as every other worker thereafter terminating,
 475      * helps terminate others by cancelling their unprocessed tasks,
 476      * and waking them up, doing so repeatedly until stable. Calls to
 477      * non-abrupt shutdown() preface this by checking whether
 478      * termination should commence by sweeping through queues (until
 479      * stable) to ensure lack of in-flight submissions and workers
 480      * about to process them before triggering the "STOP" phase of
 481      * termination.
 482      *
 483      * Joining Tasks
 484      * =============
 485      *
 486      * Any of several actions may be taken when one worker is waiting
 487      * to join a task stolen (or always held) by another.  Because we
 488      * are multiplexing many tasks on to a pool of workers, we can't
 489      * always just let them block (as in Thread.join).  We also cannot


 517      * actively joined task.  Thus, the joiner executes a task that
 518      * would be on its own local deque if the to-be-joined task had
 519      * not been stolen. This is a conservative variant of the approach
 520      * described in Wagner & Calder "Leapfrogging: a portable
 521      * technique for implementing efficient futures" SIGPLAN Notices,
 522      * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs
 523      * mainly in that we only record queue ids, not full dependency
 524      * links.  This requires a linear scan of the workQueues array to
 525      * locate stealers, but isolates cost to when it is needed, rather
 526      * than adding to per-task overhead. Searches can fail to locate
 527      * stealers GC stalls and the like delay recording sources.
 528      * Further, even when accurately identified, stealers might not
 529      * ever produce a task that the joiner can in turn help with. So,
 530      * compensation is tried upon failure to find tasks to run.
 531      *
 532      * Compensation does not by default aim to keep exactly the target
 533      * parallelism number of unblocked threads running at any given
 534      * time. Some previous versions of this class employed immediate
 535      * compensations for any blocked join. However, in practice, the
 536      * vast majority of blockages are transient byproducts of GC and
 537      * other JVM or OS activities that are made worse by replacement.
 538      * Rather than impose arbitrary policies, we allow users to
 539      * override the default of only adding threads upon apparent
 540      * starvation.  The compensation mechanism may also be bounded.
 541      * Bounds for the commonPool (see COMMON_MAX_SPARES) better enable
 542      * JVMs to cope with programming errors and abuse before running
 543      * out of resources to do so.

 544      *
 545      * Common Pool
 546      * ===========
 547      *
 548      * The static common pool always exists after static
 549      * initialization.  Since it (or any other created pool) need
 550      * never be used, we minimize initial construction overhead and
 551      * footprint to the setup of about a dozen fields.
 552      *
 553      * When external threads submit to the common pool, they can
 554      * perform subtask processing (see externalHelpComplete and
 555      * related methods) upon joins.  This caller-helps policy makes it
 556      * sensible to set common pool parallelism level to one (or more)
 557      * less than the total number of available cores, or even zero for
 558      * pure caller-runs.  We do not need to record whether external
 559      * submissions are to the common pool -- if not, external help
 560      * methods return quickly. These submitters would otherwise be
 561      * blocked waiting for completion, so the extra effort (with
 562      * liberally sprinkled task status checks) in inapplicable cases
 563      * amounts to an odd form of limited spin-wait before blocking in
 564      * ForkJoinTask.join.
 565      *
 566      * As a more appropriate default in managed environments, unless
 567      * overridden by system properties, we use workers of subclass
 568      * InnocuousForkJoinWorkerThread when there is a SecurityManager
 569      * present. These workers have no permissions set, do not belong
 570      * to any user-defined ThreadGroup, and erase all ThreadLocals
 571      * after executing any top-level task (see
 572      * WorkQueue.afterTopLevelExec).  The associated mechanics (mainly
 573      * in ForkJoinWorkerThread) may be JVM-dependent and must access
 574      * particular Thread class fields to achieve this effect.
 575      *












 576      * Style notes
 577      * ===========
 578      *
 579      * Memory ordering relies mainly on VarHandles.  This can be
 580      * awkward and ugly, but also reflects the need to control
 581      * outcomes across the unusual cases that arise in very racy code
 582      * with very few invariants. All fields are read into locals
 583      * before use, and null-checked if they are references.  This is
 584      * usually done in a "C"-like style of listing declarations at the
 585      * heads of methods or blocks, and using inline assignments on
 586      * first encounter.  Nearly all explicit checks lead to
 587      * bypass/return, not exception throws, because they may
 588      * legitimately arise due to cancellation/revocation during
 589      * shutdown.


 590      *
 591      * There is a lot of representation-level coupling among classes
 592      * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask.  The
 593      * fields of WorkQueue maintain data structures managed by
 594      * ForkJoinPool, so are directly accessed.  There is little point
 595      * trying to reduce this, since any associated future changes in
 596      * representations will need to be accompanied by algorithmic
 597      * changes anyway. Several methods intrinsically sprawl because
 598      * they must accumulate sets of consistent reads of fields held in
 599      * local variables.  There are also other coding oddities
 600      * (including several unnecessary-looking hoisted null checks)
 601      * that help some methods perform reasonably even when interpreted
 602      * (not compiled).

 603      *
 604      * The order of declarations in this file is (with a few exceptions):
 605      * (1) Static utility functions
 606      * (2) Nested (static) classes
 607      * (3) Static fields
 608      * (4) Fields, along with constants used when unpacking some of them
 609      * (5) Internal control methods
 610      * (6) Callbacks and other support for ForkJoinTask methods
 611      * (7) Exported methods
 612      * (8) Static block initializing statics in minimally dependent order
 613      */
 614 
 615     // Static utilities
 616 
 617     /**
 618      * If there is a security manager, makes sure caller has
 619      * permission to modify threads.
 620      */
 621     private static void checkPermission() {
 622         SecurityManager security = System.getSecurityManager();


 686     static final int SWIDTH       = 16;            // width of short
 687     static final int SMASK        = 0xffff;        // short bits == max index
 688     static final int MAX_CAP      = 0x7fff;        // max #workers - 1
 689     static final int SQMASK       = 0x007e;        // max 64 (even) slots
 690 
 691     // Masks and units for WorkQueue.phase and ctl sp subfield
 692     static final int UNSIGNALLED  = 1 << 31;       // must be negative
 693     static final int SS_SEQ       = 1 << 16;       // version count
 694     static final int QLOCK        = 1;             // must be 1
 695 
 696     // Mode bits and sentinels, some also used in WorkQueue id and.source fields
 697     static final int OWNED        = 1;             // queue has owner thread
 698     static final int FIFO         = 1 << 16;       // fifo queue or access mode
 699     static final int SHUTDOWN     = 1 << 18;
 700     static final int TERMINATED   = 1 << 19;
 701     static final int STOP         = 1 << 31;       // must be negative
 702     static final int QUIET        = 1 << 30;       // not scanning or working
 703     static final int DORMANT      = QUIET | UNSIGNALLED;
 704 
 705     /**
 706      * The maximum number of local polls from the same queue before
 707      * checking others. This is a safeguard against infinitely unfair
 708      * looping under unbounded user task recursion, and must be larger
 709      * than plausible cases of intentional bounded task recursion.
 710      */
 711     static final int POLL_LIMIT = 1 << 10;
 712 
 713     /**
 714      * Queues supporting work-stealing as well as external task
 715      * submission. See above for descriptions and algorithms.
 716      * Performance on most platforms is very sensitive to placement of
 717      * instances of both WorkQueues and their arrays -- we absolutely
 718      * do not want multiple WorkQueue instances or multiple queue
 719      * arrays sharing cache lines. The @Contended annotation alerts
 720      * JVMs to try to keep instances apart.
 721      */
 722     @jdk.internal.vm.annotation.Contended
 723     static final class WorkQueue {
 724 
 725         /**
 726          * Capacity of work-stealing queue array upon initialization.
 727          * Must be a power of two; at least 4, but should be larger to
 728          * reduce or eliminate cacheline sharing among queues.
 729          * Currently, it is much larger, as a partial workaround for
 730          * the fact that JVMs often place arrays in locations that
 731          * share GC bookkeeping (especially cardmarks) such that
 732          * per-write accesses encounter serious memory contention.
 733          */
 734         static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
 735 
 736         /**
 737          * Maximum size for queue arrays. Must be a power of two less
 738          * than or equal to 1 << (31 - width of array entry) to ensure
 739          * lack of wraparound of index calculations, but defined to a
 740          * value a bit less than this to help users trap runaway
 741          * programs before saturating systems.
 742          */
 743         static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
 744 
 745         // Instance fields



 746         volatile int phase;        // versioned, negative: queued, 1: locked
 747         int stackPred;             // pool stack (ctl) predecessor link
 748         int nsteals;               // number of steals
 749         int id;                    // index, mode, tag
 750         volatile int source;       // source queue id, or sentinel
 751         volatile int base;         // index of next slot for poll
 752         int top;                   // index of next slot for push
 753         ForkJoinTask<?>[] array;   // the elements (initially unallocated)
 754         final ForkJoinPool pool;   // the containing pool (may be null)
 755         final ForkJoinWorkerThread owner; // owning thread or null if shared
 756 
 757         WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) {
 758             this.pool = pool;
 759             this.owner = owner;
 760             // Place indices in the center of array (that is not yet allocated)
 761             base = top = INITIAL_QUEUE_CAPACITY >>> 1;
 762         }
 763 
 764         /**











 765          * Returns an exportable index (used by ForkJoinWorkerThread).
 766          */
 767         final int getPoolIndex() {
 768             return (id & 0xffff) >>> 1; // ignore odd/even tag bit
 769         }
 770 
 771         /**
 772          * Returns the approximate number of tasks in the queue.
 773          */
 774         final int queueSize() {
 775             int n = base - top;       // read base first
 776             return (n >= 0) ? 0 : -n; // ignore transient negative
 777         }
 778 
 779         /**
 780          * Provides a more accurate estimate of whether this queue has
 781          * any tasks than does queueSize, by checking whether a
 782          * near-empty queue has at least one unclaimed task.
 783          */
 784         final boolean isEmpty() {
 785             ForkJoinTask<?>[] a; int n, al, b;

 786             return ((n = (b = base) - top) >= 0 || // possibly one task
 787                     (n == -1 && ((a = array) == null ||
 788                                  (al = a.length) == 0 ||
 789                                  a[(al - 1) & b] == null)));
 790         }
 791 
 792 
 793         /**
 794          * Pushes a task. Call only by owner in unshared queues.
 795          *
 796          * @param task the task. Caller must ensure non-null.
 797          * @throws RejectedExecutionException if array cannot be resized
 798          */
 799         final void push(ForkJoinTask<?> task) {
 800             int s = top; ForkJoinTask<?>[] a; int al, d;
 801             if ((a = array) != null && (al = a.length) > 0) {
 802                 int index = (al - 1) & s;
 803                 ForkJoinPool p = pool;


 804                 top = s + 1;
 805                 QA.setRelease(a, index, task);
 806                 if ((d = base - s) == 0 && p != null) {
 807                     VarHandle.fullFence();
 808                     p.signalWork();
 809                 }
 810                 else if (d + al == 1)
 811                     growArray();
 812             }
 813         }
 814 
 815         /**
 816          * Initializes or doubles the capacity of array. Call either
 817          * by owner or with lock held -- it is OK for base, but not
 818          * top, to move while resizings are in progress.
 819          */
 820         final ForkJoinTask<?>[] growArray() {
 821             ForkJoinTask<?>[] oldA = array;
 822             int oldSize = oldA != null ? oldA.length : 0;
 823             int size = oldSize > 0 ? oldSize << 1 : INITIAL_QUEUE_CAPACITY;
 824             if (size < INITIAL_QUEUE_CAPACITY || size > MAXIMUM_QUEUE_CAPACITY)
 825                 throw new RejectedExecutionException("Queue capacity exceeded");
 826             int oldMask, t, b;
 827             ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size];
 828             if (oldA != null && (oldMask = oldSize - 1) > 0 &&
 829                 (t = top) - (b = base) > 0) {
 830                 int mask = size - 1;
 831                 do { // emulate poll from old array, push to new array
 832                     int index = b & oldMask;
 833                     ForkJoinTask<?> x = (ForkJoinTask<?>)
 834                         QA.getAcquire(oldA, index);
 835                     if (x != null &&
 836                         QA.compareAndSet(oldA, index, x, null))
 837                         a[b & mask] = x;
 838                 } while (++b != t);
 839                 VarHandle.releaseFence();
 840             }
 841             return a;

 842         }
 843 
 844         /**
 845          * Takes next task, if one exists, in LIFO order.  Call only
 846          * by owner in unshared queues.

 847          */
 848         final ForkJoinTask<?> pop() {
 849             int b = base, s = top, al, i; ForkJoinTask<?>[] a;
 850             if ((a = array) != null && b != s && (al = a.length) > 0) {
 851                 int index = (al - 1) & --s;
 852                 ForkJoinTask<?> t = (ForkJoinTask<?>)
 853                     QA.get(a, index);
 854                 if (t != null &&
 855                     QA.compareAndSet(a, index, t, null)) {
 856                     top = s;













 857                     VarHandle.releaseFence();
 858                     return t;
 859                 }
 860             }
 861             return null;





 862         }
 863 
 864         /**
 865          * Takes next task, if one exists, in FIFO order.
 866          */
 867         final ForkJoinTask<?> poll() {
 868             for (;;) {
 869                 int b = base, s = top, d, al; ForkJoinTask<?>[] a;
 870                 if ((a = array) != null && (d = b - s) < 0 &&
 871                     (al = a.length) > 0) {
 872                     int index = (al - 1) & b;
 873                     ForkJoinTask<?> t = (ForkJoinTask<?>)
 874                         QA.getAcquire(a, index);
 875                     if (b++ == base) {
 876                         if (t != null) {
 877                             if (QA.compareAndSet(a, index, t, null)) {
 878                                 base = b;

 879                                 return t;
 880                             }
 881                         }
 882                         else if (d == -1)
 883                             break; // now empty
 884                     }
 885                 }
 886                 else
 887                     break;
 888             }
 889             return null;
 890         }
 891 
 892         /**
 893          * Takes next task, if one exists, in order specified by mode.
 894          */
 895         final ForkJoinTask<?> nextLocalTask() {
 896             return ((id & FIFO) != 0) ? poll() : pop();
















 897         }
 898 
 899         /**
 900          * Returns next task, if one exists, in order specified by mode.
 901          */
 902         final ForkJoinTask<?> peek() {
 903             int al; ForkJoinTask<?>[] a;
 904             return ((a = array) != null && (al = a.length) > 0) ?
 905                 a[(al - 1) &
 906                   ((id & FIFO) != 0 ? base : top - 1)] : null;
 907         }
 908 
 909         /**
 910          * Pops the given task only if it is at the current top.
 911          */
 912         final boolean tryUnpush(ForkJoinTask<?> task) {
 913             int b = base, s = top, al; ForkJoinTask<?>[] a;
 914             if ((a = array) != null && b != s && (al = a.length) > 0) {
 915                 int index = (al - 1) & --s;
 916                 if (QA.compareAndSet(a, index, task, null)) {
 917                     top = s;
 918                     VarHandle.releaseFence();
 919                     return true;
 920                 }













 921             }
 922             return false;
 923         }
 924 
 925         /**
 926          * Removes and cancels all known tasks, ignoring any exceptions.
 927          */
 928         final void cancelAll() {
 929             for (ForkJoinTask<?> t; (t = poll()) != null; )
 930                 ForkJoinTask.cancelIgnoringExceptions(t);
 931         }
 932 
 933         // Specialized execution methods
 934 
 935         /**
 936          * Pops and executes up to limit consecutive tasks or until empty.
 937          *
 938          * @param limit max runs, or zero for no limit
 939          */
 940         final void localPopAndExec(int limit) {


 941             for (;;) {
 942                 int b = base, s = top, al; ForkJoinTask<?>[] a;
 943                 if ((a = array) != null && b != s && (al = a.length) > 0) {
 944                     int index = (al - 1) & --s;
 945                     ForkJoinTask<?> t = (ForkJoinTask<?>)
 946                         QA.getAndSet(a, index, null);
 947                     if (t != null) {
 948                         top = s;
 949                         VarHandle.releaseFence();
 950                         t.doExec();
 951                         if (limit != 0 && --limit == 0)
 952                             break;
 953                     }
 954                     else
 955                         break;
 956                 }
 957                 else
 958                     break;
 959             }
 960         }
 961 
 962         /**
 963          * Polls and executes up to limit consecutive tasks or until empty.
 964          *
 965          * @param limit, or zero for no limit
 966          */
 967         final void localPollAndExec(int limit) {
 968             for (int polls = 0;;) {
 969                 int b = base, s = top, d, al; ForkJoinTask<?>[] a;
 970                 if ((a = array) != null && (d = b - s) < 0 &&
 971                     (al = a.length) > 0) {
 972                     int index = (al - 1) & b++;
 973                     ForkJoinTask<?> t = (ForkJoinTask<?>)
 974                         QA.getAndSet(a, index, null);
 975                     if (t != null) {
 976                         base = b;
 977                         t.doExec();
 978                         if (limit != 0 && ++polls == limit)
 979                             break;
 980                     }
 981                     else if (d == -1)
 982                         break;     // now empty
 983                     else
 984                         polls = 0; // stolen; reset
 985                 }
 986                 else
 987                     break;
 988             }
 989         }
 990 
 991         /**
 992          * If present, removes task from queue and executes it.
 993          */
 994         final void tryRemoveAndExec(ForkJoinTask<?> task) {
 995             ForkJoinTask<?>[] wa; int s, wal;
 996             if (base - (s = top) < 0 && // traverse from top
 997                 (wa = array) != null && (wal = wa.length) > 0) {
 998                 for (int m = wal - 1, ns = s - 1, i = ns; ; --i) {
 999                     int index = i & m;
1000                     ForkJoinTask<?> t = (ForkJoinTask<?>)
1001                         QA.get(wa, index);
1002                     if (t == null)
1003                         break;
1004                     else if (t == task) {
1005                         if (QA.compareAndSet(wa, index, t, null)) {
1006                             top = ns;   // safely shift down
1007                             for (int j = i; j != ns; ++j) {
1008                                 ForkJoinTask<?> f;
1009                                 int pindex = (j + 1) & m;
1010                                 f = (ForkJoinTask<?>)QA.get(wa, pindex);
1011                                 QA.setVolatile(wa, pindex, null);
1012                                 int jindex = j & m;
1013                                 QA.setRelease(wa, jindex, f);
1014                             }
1015                             VarHandle.releaseFence();
1016                             t.doExec();
1017                         }
1018                         break;
1019                     }
1020                 }
1021             }
1022         }
1023 
1024         /**
1025          * Tries to steal and run tasks within the target's
1026          * computation until done, not found, or limit exceeded.
1027          *
1028          * @param task root of CountedCompleter computation
1029          * @param limit max runs, or zero for no limit

1030          * @return task status on exit
1031          */
1032         final int localHelpCC(CountedCompleter<?> task, int limit) {
1033             int status = 0;
1034             if (task != null && (status = task.status) >= 0) {
1035                 for (;;) {
1036                     boolean help = false;
1037                     int b = base, s = top, al; ForkJoinTask<?>[] a;
1038                     if ((a = array) != null && b != s && (al = a.length) > 0) {
1039                         int index = (al - 1) & (s - 1);
1040                         ForkJoinTask<?> o = (ForkJoinTask<?>)
1041                             QA.get(a, index);
1042                         if (o instanceof CountedCompleter) {
1043                             CountedCompleter<?> t = (CountedCompleter<?>)o;
1044                             for (CountedCompleter<?> f = t;;) {
1045                                 if (f != task) {
1046                                     if ((f = f.completer) == null) // try parent
1047                                         break;
1048                                 }
1049                                 else {
1050                                     if (QA.compareAndSet(a, index, t, null)) {


1051                                         top = s - 1;
1052                                         VarHandle.releaseFence();
1053                                         t.doExec();
1054                                         help = true;
1055                                     }
1056                                     break;
1057                                 }






1058                             }
1059                         }
1060                     }
1061                     if ((status = task.status) < 0 || !help ||


1062                         (limit != 0 && --limit == 0))
1063                         break;
1064                 }
1065             }
1066             return status;
1067         }
1068 
1069         // Operations on shared queues
1070 
1071         /**
1072          * Tries to lock shared queue by CASing phase field.
1073          */
1074         final boolean tryLockSharedQueue() {
1075             return PHASE.compareAndSet(this, 0, QLOCK);
1076         }
1077 
1078         /**
1079          * Shared version of tryUnpush.
1080          */
1081         final boolean trySharedUnpush(ForkJoinTask<?> task) {
1082             boolean popped = false;
1083             int s = top - 1, al; ForkJoinTask<?>[] a;
1084             if ((a = array) != null && (al = a.length) > 0) {
1085                 int index = (al - 1) & s;
1086                 ForkJoinTask<?> t = (ForkJoinTask<?>) QA.get(a, index);
1087                 if (t == task &&
1088                     PHASE.compareAndSet(this, 0, QLOCK)) {
1089                     if (top == s + 1 && array == a &&
1090                         QA.compareAndSet(a, index, task, null)) {
1091                         popped = true;
1092                         top = s;
1093                     }
1094                     PHASE.setRelease(this, 0);
1095                 }
1096             }
1097             return popped;
1098         }
1099 
1100         /**
1101          * Shared version of localHelpCC.
1102          */
1103         final int sharedHelpCC(CountedCompleter<?> task, int limit) {
1104             int status = 0;
1105             if (task != null && (status = task.status) >= 0) {
1106                 for (;;) {
1107                     boolean help = false;
1108                     int b = base, s = top, al; ForkJoinTask<?>[] a;
1109                     if ((a = array) != null && b != s && (al = a.length) > 0) {
1110                         int index = (al - 1) & (s - 1);
1111                         ForkJoinTask<?> o = (ForkJoinTask<?>)
1112                             QA.get(a, index);
1113                         if (o instanceof CountedCompleter) {
1114                             CountedCompleter<?> t = (CountedCompleter<?>)o;
1115                             for (CountedCompleter<?> f = t;;) {
1116                                 if (f != task) {
1117                                     if ((f = f.completer) == null)
1118                                         break;
1119                                 }
1120                                 else {
1121                                     if (PHASE.compareAndSet(this, 0, QLOCK)) {
1122                                         if (top == s && array == a &&
1123                                             QA.compareAndSet(a, index, t, null)) {
1124                                             help = true;
1125                                             top = s - 1;
1126                                         }
1127                                         PHASE.setRelease(this, 0);
1128                                         if (help)
1129                                             t.doExec();
1130                                     }
1131                                     break;



1132                                 }
1133                             }
1134                         }
1135                     }
1136                     if ((status = task.status) < 0 || !help ||
1137                         (limit != 0 && --limit == 0))
1138                         break;
1139                 }
1140             }
1141             return status;
1142         }
1143 
1144         /**
1145          * Returns true if owned and not known to be blocked.
1146          */
1147         final boolean isApparentlyUnblocked() {
1148             Thread wt; Thread.State s;
1149             return ((wt = owner) != null &&
1150                     (s = wt.getState()) != Thread.State.BLOCKED &&
1151                     s != Thread.State.WAITING &&
1152                     s != Thread.State.TIMED_WAITING);
1153         }
1154 
1155         // VarHandle mechanics.
1156         private static final VarHandle PHASE;


1157         static {
1158             try {
1159                 MethodHandles.Lookup l = MethodHandles.lookup();
1160                 PHASE = l.findVarHandle(WorkQueue.class, "phase", int.class);


1161             } catch (ReflectiveOperationException e) {
1162                 throw new Error(e);
1163             }
1164         }
1165     }
1166 
1167     // static fields (initialized in static initializer below)
1168 
1169     /**
1170      * Creates a new ForkJoinWorkerThread. This factory is used unless
1171      * overridden in ForkJoinPool constructors.
1172      */
1173     public static final ForkJoinWorkerThreadFactory
1174         defaultForkJoinWorkerThreadFactory;
1175 
1176     /**
1177      * Permission required for callers of methods that may start or
1178      * kill threads.
1179      */
1180     static final RuntimePermission modifyThreadPermission;
1181 
1182     /**


1339                        (TC_MASK & (c + TC_UNIT)));
1340             if (ctl == c && CTL.compareAndSet(this, c, nc)) {
1341                 createWorker();
1342                 break;
1343             }
1344         } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
1345     }
1346 
1347     /**
1348      * Callback from ForkJoinWorkerThread constructor to establish and
1349      * record its WorkQueue.
1350      *
1351      * @param wt the worker thread
1352      * @return the worker's queue
1353      */
1354     final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
1355         UncaughtExceptionHandler handler;
1356         wt.setDaemon(true);                             // configure thread
1357         if ((handler = ueh) != null)
1358             wt.setUncaughtExceptionHandler(handler);
1359         WorkQueue w = new WorkQueue(this, wt);
1360         int tid = 0;                                    // for thread name
1361         int fifo = mode & FIFO;
1362         String prefix = workerNamePrefix;

1363         if (prefix != null) {
1364             synchronized (prefix) {
1365                 WorkQueue[] ws = workQueues; int n;
1366                 int s = indexSeed += SEED_INCREMENT;

1367                 if (ws != null && (n = ws.length) > 1) {
1368                     int m = n - 1;
1369                     tid = s & m;
1370                     int i = m & ((s << 1) | 1);         // odd-numbered indices
1371                     for (int probes = n >>> 1;;) {      // find empty slot
1372                         WorkQueue q;
1373                         if ((q = ws[i]) == null || q.phase == QUIET)
1374                             break;
1375                         else if (--probes == 0) {
1376                             i = n | 1;                  // resize below
1377                             break;
1378                         }
1379                         else
1380                             i = (i + 2) & m;
1381                     }

1382 
1383                     int id = i | fifo | (s & ~(SMASK | FIFO | DORMANT));
1384                     w.phase = w.id = id;                // now publishable
1385 
1386                     if (i < n)
1387                         ws[i] = w;
1388                     else {                              // expand array
1389                         int an = n << 1;
1390                         WorkQueue[] as = new WorkQueue[an];
1391                         as[i] = w;
1392                         int am = an - 1;
1393                         for (int j = 0; j < n; ++j) {
1394                             WorkQueue v;                // copy external queue
1395                             if ((v = ws[j]) != null)    // position may change
1396                                 as[v.id & am & SQMASK] = v;
1397                             if (++j >= n)
1398                                 break;
1399                             as[j] = ws[j];              // copy worker
1400                         }
1401                         workQueues = as;
1402                     }
1403                 }
1404             }
1405             wt.setName(prefix.concat(Integer.toString(tid)));
1406         }
1407         return w;
1408     }
1409 
1410     /**
1411      * Final callback from terminating worker, as well as upon failure
1412      * to construct or start a worker.  Removes record of worker from
1413      * array, and adjusts counts. If pool is shutting down, tries to
1414      * complete termination.
1415      *
1416      * @param wt the worker thread, or null if construction failed
1417      * @param ex the exception causing failure, or null if none
1418      */
1419     final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
1420         WorkQueue w = null;
1421         int phase = 0;
1422         if (wt != null && (w = wt.workQueue) != null) {
1423             Object lock = workerNamePrefix;

1424             long ns = (long)w.nsteals & 0xffffffffL;
1425             int idx = w.id & SMASK;
1426             if (lock != null) {
1427                 WorkQueue[] ws;                       // remove index from array
1428                 synchronized (lock) {
1429                     if ((ws = workQueues) != null && ws.length > idx &&
1430                         ws[idx] == w)
1431                         ws[idx] = null;

1432                     stealCount += ns;
1433                 }
1434             }
1435             phase = w.phase;
1436         }
1437         if (phase != QUIET) {                         // else pre-adjusted
1438             long c;                                   // decrement counts
1439             do {} while (!CTL.weakCompareAndSet
1440                          (this, c = ctl, ((RC_MASK & (c - RC_UNIT)) |
1441                                           (TC_MASK & (c - TC_UNIT)) |
1442                                           (SP_MASK & c))));
1443         }
1444         if (w != null)
1445             w.cancelAll();                            // cancel remaining tasks
1446 
1447         if (!tryTerminate(false, false) &&            // possibly replace worker
1448             w != null && w.array != null)             // avoid repeated failures
1449             signalWork();
1450 
1451         if (ex == null)                               // help clean on way out


1463             if ((c = ctl) >= 0L)                      // enough workers
1464                 break;
1465             else if ((sp = (int)c) == 0) {            // no idle workers
1466                 if ((c & ADD_WORKER) != 0L)           // too few workers
1467                     tryAddWorker(c);
1468                 break;
1469             }
1470             else if ((ws = workQueues) == null)
1471                 break;                                // unstarted/terminated
1472             else if (ws.length <= (i = sp & SMASK))
1473                 break;                                // terminated
1474             else if ((v = ws[i]) == null)
1475                 break;                                // terminating
1476             else {
1477                 int np = sp & ~UNSIGNALLED;
1478                 int vp = v.phase;
1479                 long nc = (v.stackPred & SP_MASK) | (UC_MASK & (c + RC_UNIT));
1480                 Thread vt = v.owner;
1481                 if (sp == vp && CTL.compareAndSet(this, c, nc)) {
1482                     v.phase = np;
1483                     if (v.source < 0)
1484                         LockSupport.unpark(vt);
1485                     break;
1486                 }
1487             }
1488         }
1489     }
1490 
1491     /**
1492      * Tries to decrement counts (sometimes implicitly) and possibly
1493      * arrange for a compensating worker in preparation for blocking:
1494      * If not all core workers yet exist, creates one, else if any are
1495      * unreleased (possibly including caller) releases one, else if
1496      * fewer than the minimum allowed number of workers running,
1497      * checks to see that they are all active, and if so creates an
1498      * extra worker unless over maximum limit and policy is to
1499      * saturate.  Most of these steps can fail due to interference, in
1500      * which case 0 is returned so caller will retry. A negative
1501      * return value indicates that the caller doesn't need to
1502      * re-adjust counts when later unblocked.
1503      *
1504      * @return 1: block then adjust, -1: block without adjust, 0 : retry
1505      */
1506     private int tryCompensate(WorkQueue w) {
1507         int t, n, sp;
1508         long c = ctl;
1509         WorkQueue[] ws = workQueues;
1510         if ((t = (short)(c >>> TC_SHIFT)) >= 0) {
1511             if (ws == null || (n = ws.length) <= 0 || w == null)
1512                 return 0;                        // disabled
1513             else if ((sp = (int)c) != 0) {       // replace or release
1514                 WorkQueue v = ws[sp & (n - 1)];
1515                 int wp = w.phase;
1516                 long uc = UC_MASK & ((wp < 0) ? c + RC_UNIT : c);
1517                 int np = sp & ~UNSIGNALLED;
1518                 if (v != null) {
1519                     int vp = v.phase;
1520                     Thread vt = v.owner;
1521                     long nc = ((long)v.stackPred & SP_MASK) | uc;
1522                     if (vp == sp && CTL.compareAndSet(this, c, nc)) {
1523                         v.phase = np;
1524                         if (v.source < 0)
1525                             LockSupport.unpark(vt);
1526                         return (wp < 0) ? -1 : 1;
1527                     }
1528                 }
1529                 return 0;
1530             }
1531             else if ((int)(c >> RC_SHIFT) -      // reduce parallelism
1532                      (short)(bounds & SMASK) > 0) {
1533                 long nc = ((RC_MASK & (c - RC_UNIT)) | (~RC_MASK & c));
1534                 return CTL.compareAndSet(this, c, nc) ? 1 : 0;
1535             }
1536             else {                               // validate
1537                 int md = mode, pc = md & SMASK, tc = pc + t, bc = 0;
1538                 boolean unstable = false;
1539                 for (int i = 1; i < n; i += 2) {
1540                     WorkQueue q; Thread wt; Thread.State ts;
1541                     if ((q = ws[i]) != null) {
1542                         if (q.source == 0) {
1543                             unstable = true;
1544                             break;


1561                     else if (bc < pc) {          // lagging
1562                         Thread.yield();          // for retry spins
1563                         return 0;
1564                     }
1565                     else
1566                         throw new RejectedExecutionException(
1567                             "Thread limit exceeded replacing blocked worker");
1568                 }
1569             }
1570         }
1571 
1572         long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK); // expand pool
1573         return CTL.compareAndSet(this, c, nc) && createWorker() ? 1 : 0;
1574     }
1575 
1576     /**
1577      * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
1578      * See above for explanation.
1579      */
1580     final void runWorker(WorkQueue w) {
1581         WorkQueue[] ws;
1582         w.growArray();                                  // allocate queue
1583         int r = w.id ^ ThreadLocalRandom.nextSecondarySeed();
1584         if (r == 0)                                     // initial nonzero seed
1585             r = 1;
1586         int lastSignalId = 0;                           // avoid unneeded signals
1587         while ((ws = workQueues) != null) {
1588             boolean nonempty = false;                   // scan
1589             for (int n = ws.length, j = n, m = n - 1; j > 0; --j) {
1590                 WorkQueue q; int i, b, al; ForkJoinTask<?>[] a;
1591                 if ((i = r & m) >= 0 && i < n &&        // always true
1592                     (q = ws[i]) != null && (b = q.base) - q.top < 0 &&
1593                     (a = q.array) != null && (al = a.length) > 0) {
1594                     int qid = q.id;                     // (never zero)
1595                     int index = (al - 1) & b;
1596                     ForkJoinTask<?> t = (ForkJoinTask<?>)
1597                         QA.getAcquire(a, index);
1598                     if (t != null && b++ == q.base &&
1599                         QA.compareAndSet(a, index, t, null)) {
1600                         if ((q.base = b) - q.top < 0 && qid != lastSignalId)
1601                             signalWork();               // propagate signal
1602                         w.source = lastSignalId = qid;
1603                         t.doExec();
1604                         if ((w.id & FIFO) != 0)         // run remaining locals
1605                             w.localPollAndExec(POLL_LIMIT);
1606                         else
1607                             w.localPopAndExec(POLL_LIMIT);
1608                         ForkJoinWorkerThread thread = w.owner;
1609                         ++w.nsteals;
1610                         w.source = 0;                   // now idle
1611                         if (thread != null)
1612                             thread.afterTopLevelExec();
1613                     }
1614                     nonempty = true;
1615                 }
1616                 else if (nonempty)
1617                     break;
1618                 else
1619                     ++r;
1620             }
1621 
1622             if (nonempty) {                             // move (xorshift)
1623                 r ^= r << 13; r ^= r >>> 17; r ^= r << 5;
1624             }
1625             else {
1626                 int phase;
1627                 lastSignalId = 0;                       // clear for next scan
1628                 if ((phase = w.phase) >= 0) {           // enqueue
1629                     int np = w.phase = (phase + SS_SEQ) | UNSIGNALLED;


1630                     long c, nc;
1631                     do {
1632                         w.stackPred = (int)(c = ctl);
1633                         nc = ((c - RC_UNIT) & UC_MASK) | (SP_MASK & np);
1634                     } while (!CTL.weakCompareAndSet(this, c, nc));
1635                 }
1636                 else {                                  // already queued
1637                     int pred = w.stackPred;

1638                     w.source = DORMANT;                 // enable signal
1639                     for (int steps = 0;;) {
1640                         int md, rc; long c;
1641                         if (w.phase >= 0) {
1642                             w.source = 0;
1643                             break;
1644                         }
1645                         else if ((md = mode) < 0)       // shutting down
1646                             return;
1647                         else if ((rc = ((md & SMASK) +  // possibly quiescent
1648                                         (int)((c = ctl) >> RC_SHIFT))) <= 0 &&
1649                                  (md & SHUTDOWN) != 0 &&
1650                                  tryTerminate(false, false))
1651                             return;                     // help terminate
1652                         else if ((++steps & 1) == 0)
1653                             Thread.interrupted();       // clear between parks
1654                         else if (rc <= 0 && pred != 0 && phase == (int)c) {

1655                             long d = keepAlive + System.currentTimeMillis();
1656                             LockSupport.parkUntil(this, d);
1657                             if (ctl == c &&
1658                                 d - System.currentTimeMillis() <= TIMEOUT_SLOP) {
1659                                 long nc = ((UC_MASK & (c - TC_UNIT)) |
1660                                            (SP_MASK & pred));
1661                                 if (CTL.compareAndSet(this, c, nc)) {
1662                                     w.phase = QUIET;
1663                                     return;             // drop on timeout
1664                                 }
1665                             }



1666                         }
1667                         else
1668                             LockSupport.park(this);
1669                     }
1670                 }




























1671             }




1672         }
1673     }


1674 
1675     /**
1676      * Helps and/or blocks until the given task is done or timeout.
1677      * First tries locally helping, then scans other queues for a task
1678      * produced by one of w's stealers; compensating and blocking if
1679      * none are found (rescanning if tryCompensate fails).
1680      *
1681      * @param w caller
1682      * @param task the task
1683      * @param deadline for timed waits, if nonzero
1684      * @return task status on exit
1685      */
1686     final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
1687         int s = 0;

1688         if (w != null && task != null &&
1689             (!(task instanceof CountedCompleter) ||
1690              (s = w.localHelpCC((CountedCompleter<?>)task, 0)) >= 0)) {
1691             w.tryRemoveAndExec(task);
1692             int src = w.source, id = w.id;

1693             s = task.status;
1694             while (s >= 0) {
1695                 WorkQueue[] ws;
1696                 boolean nonempty = false;
1697                 int r = ThreadLocalRandom.nextSecondarySeed() | 1; // odd indices
1698                 if ((ws = workQueues) != null) {       // scan for matching id
1699                     for (int n = ws.length, m = n - 1, j = -n; j < n; j += 2) {
1700                         WorkQueue q; int i, b, al; ForkJoinTask<?>[] a;
1701                         if ((i = (r + j) & m) >= 0 && i < n &&
1702                             (q = ws[i]) != null && q.source == id &&
1703                             (b = q.base) - q.top < 0 &&
1704                             (a = q.array) != null && (al = a.length) > 0) {
1705                             int qid = q.id;
1706                             int index = (al - 1) & b;
1707                             ForkJoinTask<?> t = (ForkJoinTask<?>)
1708                                 QA.getAcquire(a, index);
1709                             if (t != null && b++ == q.base && id == q.source &&
1710                                 QA.compareAndSet(a, index, t, null)) {
1711                                 q.base = b;
1712                                 w.source = qid;
1713                                 t.doExec();
1714                                 w.source = src;
1715                             }
1716                             nonempty = true;
1717                             break;
1718                         }



1719                     }
1720                 }
1721                 if ((s = task.status) < 0)
1722                     break;
1723                 else if (!nonempty) {
1724                     long ms, ns; int block;
1725                     if (deadline == 0L)
1726                         ms = 0L;                       // untimed
1727                     else if ((ns = deadline - System.nanoTime()) <= 0L)
1728                         break;                         // timeout
1729                     else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
1730                         ms = 1L;                       // avoid 0 for timed wait
1731                     if ((block = tryCompensate(w)) != 0) {
1732                         task.internalWait(ms);
1733                         CTL.getAndAdd(this, (block > 0) ? RC_UNIT : 0L);
1734                     }
1735                     s = task.status;
1736                 }
1737             }
1738         }
1739         return s;
1740     }
1741 
1742     /**
1743      * Runs tasks until {@code isQuiescent()}. Rather than blocking
1744      * when tasks cannot be found, rescans until all others cannot
1745      * find tasks either.
1746      */
1747     final void helpQuiescePool(WorkQueue w) {
1748         int prevSrc = w.source, fifo = w.id & FIFO;


1749         for (int source = prevSrc, released = -1;;) { // -1 until known
1750             WorkQueue[] ws;
1751             if (fifo != 0)
1752                 w.localPollAndExec(0);
1753             else
1754                 w.localPopAndExec(0);
1755             if (released == -1 && w.phase >= 0)
1756                 released = 1;
1757             boolean quiet = true, empty = true;
1758             int r = ThreadLocalRandom.nextSecondarySeed();
1759             if ((ws = workQueues) != null) {
1760                 for (int n = ws.length, j = n, m = n - 1; j > 0; --j) {
1761                     WorkQueue q; int i, b, al; ForkJoinTask<?>[] a;
1762                     if ((i = (r - j) & m) >= 0 && i < n && (q = ws[i]) != null) {
1763                         if ((b = q.base) - q.top < 0 &&
1764                             (a = q.array) != null && (al = a.length) > 0) {

1765                             int qid = q.id;

1766                             if (released == 0) {    // increment
1767                                 released = 1;
1768                                 CTL.getAndAdd(this, RC_UNIT);
1769                             }
1770                             int index = (al - 1) & b;
1771                             ForkJoinTask<?> t = (ForkJoinTask<?>)
1772                                 QA.getAcquire(a, index);
1773                             if (t != null && b++ == q.base &&
1774                                 QA.compareAndSet(a, index, t, null)) {
1775                                 q.base = b;
1776                                 w.source = source = q.id;
1777                                 t.doExec();
1778                                 w.source = source = prevSrc;
1779                             }
1780                             quiet = empty = false;
1781                             break;
1782                         }
1783                         else if ((q.source & QUIET) == 0)
1784                             quiet = false;
1785                     }
1786                 }
1787             }
1788             if (quiet) {
1789                 if (released == 0)
1790                     CTL.getAndAdd(this, RC_UNIT);
1791                 w.source = prevSrc;
1792                 break;
1793             }
1794             else if (empty) {
1795                 if (source != QUIET)
1796                     w.source = source = QUIET;
1797                 if (released == 1) {                 // decrement
1798                     released = 0;
1799                     CTL.getAndAdd(this, RC_MASK & -RC_UNIT);
1800                 }
1801             }
1802         }
1803     }
1804 
1805     /**
1806      * Scans for and returns a polled task, if available.
1807      * Used only for untracked polls.
1808      *
1809      * @param submissionsOnly if true, only scan submission queues
1810      */
1811     private ForkJoinTask<?> pollScan(boolean submissionsOnly) {
1812         WorkQueue[] ws; int n;
1813         rescan: while ((mode & STOP) == 0 && (ws = workQueues) != null &&
1814                       (n = ws.length) > 0) {
1815             int m = n - 1;
1816             int r = ThreadLocalRandom.nextSecondarySeed();
1817             int h = r >>> 16;
1818             int origin, step;
1819             if (submissionsOnly) {
1820                 origin = (r & ~1) & m;         // even indices and steps
1821                 step = (h & ~1) | 2;
1822             }
1823             else {
1824                 origin = r & m;
1825                 step = h | 1;
1826             }
1827             for (int k = origin, oldSum = 0, checkSum = 0;;) {
1828                 WorkQueue q; int b, al; ForkJoinTask<?>[] a;
1829                 if ((q = ws[k]) != null) {
1830                     checkSum += b = q.base;
1831                     if (b - q.top < 0 &&
1832                         (a = q.array) != null && (al = a.length) > 0) {
1833                         int index = (al - 1) & b;
1834                         ForkJoinTask<?> t = (ForkJoinTask<?>)
1835                             QA.getAcquire(a, index);
1836                         if (t != null && b++ == q.base &&
1837                             QA.compareAndSet(a, index, t, null)) {
1838                             q.base = b;
1839                             return t;
1840                         }
1841                         else
1842                             break; // restart
1843                     }
1844                 }
1845                 if ((k = (k + step) & m) == origin) {
1846                     if (oldSum == (oldSum = checkSum))
1847                         break rescan;
1848                     checkSum = 0;

1849                 }
1850             }
1851         }
1852         return null;
1853     }
1854 
1855     /**
1856      * Gets and removes a local or stolen task for the given worker.
1857      *
1858      * @return a task, if available
1859      */
1860     final ForkJoinTask<?> nextTaskFor(WorkQueue w) {
1861         ForkJoinTask<?> t;
1862         if (w != null &&
1863             (t = (w.id & FIFO) != 0 ? w.poll() : w.pop()) != null)
1864             return t;
1865         else
1866             return pollScan(false);
1867     }
1868 
1869     // External operations
1870 
1871     /**
1872      * Adds the given task to a submission queue at submitter's
1873      * current queue, creating one if null or contended.
1874      *
1875      * @param task the task. Caller must ensure non-null.
1876      */
1877     final void externalPush(ForkJoinTask<?> task) {
1878         int r;                                // initialize caller's probe
1879         if ((r = ThreadLocalRandom.getProbe()) == 0) {
1880             ThreadLocalRandom.localInit();
1881             r = ThreadLocalRandom.getProbe();
1882         }
1883         for (;;) {

1884             int md = mode, n;
1885             WorkQueue[] ws = workQueues;
1886             if ((md & SHUTDOWN) != 0 || ws == null || (n = ws.length) <= 0)
1887                 throw new RejectedExecutionException();
1888             else {
1889                 WorkQueue q;
1890                 boolean push = false, grow = false;
1891                 if ((q = ws[(n - 1) & r & SQMASK]) == null) {
1892                     Object lock = workerNamePrefix;
1893                     int qid = (r | QUIET) & ~(FIFO | OWNED);



1894                     q = new WorkQueue(this, null);

1895                     q.id = qid;
1896                     q.source = QUIET;
1897                     q.phase = QLOCK;          // lock queue
1898                     if (lock != null) {
1899                         synchronized (lock) { // lock pool to install
1900                             int i;
1901                             if ((ws = workQueues) != null &&
1902                                 (n = ws.length) > 0 &&
1903                                 ws[i = qid & (n - 1) & SQMASK] == null) {
1904                                 ws[i] = q;
1905                                 push = grow = true;
1906                             }
1907                         }
1908                     }
1909                 }
1910                 else if (q.tryLockSharedQueue()) {
1911                     int b = q.base, s = q.top, al, d; ForkJoinTask<?>[] a;
1912                     if ((a = q.array) != null && (al = a.length) > 0 &&
1913                         al - 1 + (d = b - s) > 0) {
1914                         a[(al - 1) & s] = task;
1915                         q.top = s + 1;        // relaxed writes OK here
1916                         q.phase = 0;
1917                         if (d < 0 && q.base - s < -1)
1918                             break;            // no signal needed
1919                     }
1920                     else
1921                         grow = true;
1922                     push = true;
1923                 }
1924                 if (push) {
1925                     if (grow) {
1926                         try {
1927                             q.growArray();
1928                             int s = q.top, al; ForkJoinTask<?>[] a;
1929                             if ((a = q.array) != null && (al = a.length) > 0) {
1930                                 a[(al - 1) & s] = task;
1931                                 q.top = s + 1;
1932                             }
1933                         } finally {
1934                             q.phase = 0;
1935                         }
1936                     }
1937                     signalWork();
1938                     break;
1939                 }
1940                 else                          // move if busy
1941                     r = ThreadLocalRandom.advanceProbe(r);




1942             }
1943         }
1944     }
1945 
1946     /**
1947      * Pushes a possibly-external submission.
1948      */
1949     private <T> ForkJoinTask<T> externalSubmit(ForkJoinTask<T> task) {
1950         Thread t; ForkJoinWorkerThread w; WorkQueue q;
1951         if (task == null)
1952             throw new NullPointerException();
1953         if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
1954             (w = (ForkJoinWorkerThread)t).pool == this &&
1955             (q = w.workQueue) != null)
1956             q.push(task);
1957         else
1958             externalPush(task);
1959         return task;
1960     }
1961 


1963      * Returns common pool queue for an external thread.
1964      */
1965     static WorkQueue commonSubmitterQueue() {
1966         ForkJoinPool p = common;
1967         int r = ThreadLocalRandom.getProbe();
1968         WorkQueue[] ws; int n;
1969         return (p != null && (ws = p.workQueues) != null &&
1970                 (n = ws.length) > 0) ?
1971             ws[(n - 1) & r & SQMASK] : null;
1972     }
1973 
1974     /**
1975      * Performs tryUnpush for an external submitter.
1976      */
1977     final boolean tryExternalUnpush(ForkJoinTask<?> task) {
1978         int r = ThreadLocalRandom.getProbe();
1979         WorkQueue[] ws; WorkQueue w; int n;
1980         return ((ws = workQueues) != null &&
1981                 (n = ws.length) > 0 &&
1982                 (w = ws[(n - 1) & r & SQMASK]) != null &&
1983                 w.trySharedUnpush(task));
1984     }
1985 
1986     /**
1987      * Performs helpComplete for an external submitter.
1988      */
1989     final int externalHelpComplete(CountedCompleter<?> task, int maxTasks) {
1990         int r = ThreadLocalRandom.getProbe();
1991         WorkQueue[] ws; WorkQueue w; int n;
1992         return ((ws = workQueues) != null && (n = ws.length) > 0 &&
1993                 (w = ws[(n - 1) & r & SQMASK]) != null) ?
1994             w.sharedHelpCC(task, maxTasks) : 0;
1995     }
1996 
1997     /**
1998      * Tries to steal and run tasks within the target's computation.
1999      * The maxTasks argument supports external usages; internal calls
2000      * use zero, allowing unbounded steps (external calls trap
2001      * non-positive values).
2002      *
2003      * @param w caller
2004      * @param maxTasks if non-zero, the maximum number of other tasks to run
2005      * @return task status on exit
2006      */
2007     final int helpComplete(WorkQueue w, CountedCompleter<?> task,
2008                            int maxTasks) {
2009         return (w == null) ? 0 : w.localHelpCC(task, maxTasks);
2010     }
2011 
2012     /**
2013      * Returns a cheap heuristic guide for task partitioning when
2014      * programmers, frameworks, tools, or languages have little or no
2015      * idea about task granularity.  In essence, by offering this
2016      * method, we ask users only about tradeoffs in overhead vs
2017      * expected throughput and its variance, rather than how finely to
2018      * partition tasks.
2019      *
2020      * In a steady state strict (tree-structured) computation, each
2021      * thread makes available for stealing enough tasks for other
2022      * threads to remain active. Inductively, if all threads play by
2023      * the same rules, each thread should make available only a
2024      * constant number of tasks.
2025      *
2026      * The minimum useful constant is just 1. But using a value of 1
2027      * would require immediate replenishment upon each steal to
2028      * maintain enough tasks, which is infeasible.  Further,
2029      * partitionings/granularities of offered tasks should minimize


2080      */
2081     private boolean tryTerminate(boolean now, boolean enable) {
2082         int md; // 3 phases: try to set SHUTDOWN, then STOP, then TERMINATED
2083 
2084         while (((md = mode) & SHUTDOWN) == 0) {
2085             if (!enable || this == common)        // cannot shutdown
2086                 return false;
2087             else
2088                 MODE.compareAndSet(this, md, md | SHUTDOWN);
2089         }
2090 
2091         while (((md = mode) & STOP) == 0) {       // try to initiate termination
2092             if (!now) {                           // check if quiescent & empty
2093                 for (long oldSum = 0L;;) {        // repeat until stable
2094                     boolean running = false;
2095                     long checkSum = ctl;
2096                     WorkQueue[] ws = workQueues;
2097                     if ((md & SMASK) + (int)(checkSum >> RC_SHIFT) > 0)
2098                         running = true;
2099                     else if (ws != null) {
2100                         WorkQueue w; int b;
2101                         for (int i = 0; i < ws.length; ++i) {
2102                             if ((w = ws[i]) != null) {
2103                                 checkSum += (b = w.base) + w.id;

2104                                 if (b != w.top ||
2105                                     ((i & 1) == 1 && w.source >= 0)) {
2106                                     running = true;
2107                                     break;
2108                                 }


2109                             }
2110                         }
2111                     }
2112                     if (((md = mode) & STOP) != 0)
2113                         break;                 // already triggered
2114                     else if (running)
2115                         return false;
2116                     else if (workQueues == ws && oldSum == (oldSum = checkSum))
2117                         break;
2118                 }
2119             }
2120             if ((md & STOP) == 0)
2121                 MODE.compareAndSet(this, md, md | STOP);
2122         }
2123 
2124         while (((md = mode) & TERMINATED) == 0) { // help terminate others
2125             for (long oldSum = 0L;;) {            // repeat until stable
2126                 WorkQueue[] ws; WorkQueue w;
2127                 long checkSum = ctl;
2128                 if ((ws = workQueues) != null) {
2129                     for (int i = 0; i < ws.length; ++i) {
2130                         if ((w = ws[i]) != null) {
2131                             ForkJoinWorkerThread wt = w.owner;
2132                             w.cancelAll();        // clear queues
2133                             if (wt != null) {
2134                                 try {             // unblock join or park
2135                                     wt.interrupt();
2136                                 } catch (Throwable ignore) {
2137                                 }
2138                             }
2139                             checkSum += w.base + w.id;
2140                         }
2141                     }
2142                 }
2143                 if (((md = mode) & TERMINATED) != 0 ||
2144                     (workQueues == ws && oldSum == (oldSum = checkSum)))
2145                     break;
2146             }
2147             if ((md & TERMINATED) != 0)
2148                 break;
2149             else if ((md & SMASK) + (short)(ctl >>> TC_SHIFT) > 0)
2150                 break;
2151             else if (MODE.compareAndSet(this, md, md | TERMINATED)) {
2152                 synchronized (this) {
2153                     notifyAll();                  // for awaitTermination
2154                 }
2155                 break;
2156             }
2157         }
2158         return true;
2159     }


2612 
2613     /**
2614      * Returns {@code true} if this pool uses local first-in-first-out
2615      * scheduling mode for forked tasks that are never joined.
2616      *
2617      * @return {@code true} if this pool uses async mode
2618      */
2619     public boolean getAsyncMode() {
2620         return (mode & FIFO) != 0;
2621     }
2622 
2623     /**
2624      * Returns an estimate of the number of worker threads that are
2625      * not blocked waiting to join tasks or for other managed
2626      * synchronization. This method may overestimate the
2627      * number of running threads.
2628      *
2629      * @return the number of worker threads
2630      */
2631     public int getRunningThreadCount() {
2632         int rc = 0;
2633         WorkQueue[] ws; WorkQueue w;


2634         if ((ws = workQueues) != null) {
2635             for (int i = 1; i < ws.length; i += 2) {
2636                 if ((w = ws[i]) != null && w.isApparentlyUnblocked())
2637                     ++rc;
2638             }
2639         }
2640         return rc;
2641     }
2642 
2643     /**
2644      * Returns an estimate of the number of threads that are currently
2645      * stealing or executing tasks. This method may overestimate the
2646      * number of active threads.
2647      *
2648      * @return the number of active threads
2649      */
2650     public int getActiveThreadCount() {
2651         int r = (mode & SMASK) + (int)(ctl >> RC_SHIFT);
2652         return (r <= 0) ? 0 : r; // suppress momentarily negative values
2653     }


2661      * idleness of all threads, but will eventually become true if
2662      * threads remain inactive.
2663      *
2664      * @return {@code true} if all threads are currently idle
2665      */
2666     public boolean isQuiescent() {
2667         for (;;) {
2668             long c = ctl;
2669             int md = mode, pc = md & SMASK;
2670             int tc = pc + (short)(c >>> TC_SHIFT);
2671             int rc = pc + (int)(c >> RC_SHIFT);
2672             if ((md & (STOP | TERMINATED)) != 0)
2673                 return true;
2674             else if (rc > 0)
2675                 return false;
2676             else {
2677                 WorkQueue[] ws; WorkQueue v;
2678                 if ((ws = workQueues) != null) {
2679                     for (int i = 1; i < ws.length; i += 2) {
2680                         if ((v = ws[i]) != null) {
2681                             if ((v.source & QUIET) == 0)
2682                                 return false;
2683                             --tc;
2684                         }
2685                     }
2686                 }
2687                 if (tc == 0 && ctl == c)
2688                     return true;
2689             }
2690         }
2691     }
2692 
2693     /**
2694      * Returns an estimate of the total number of tasks stolen from
2695      * one thread's work queue by another. The reported value
2696      * underestimates the actual total number of steals when the pool
2697      * is not quiescent. This value may be useful for monitoring and
2698      * tuning fork/join programs: in general, steal counts should be
2699      * high enough to keep threads busy, but low enough to avoid
2700      * overhead and contention across threads.
2701      *


2707         if ((ws = workQueues) != null) {
2708             for (int i = 1; i < ws.length; i += 2) {
2709                 if ((w = ws[i]) != null)
2710                     count += (long)w.nsteals & 0xffffffffL;
2711             }
2712         }
2713         return count;
2714     }
2715 
2716     /**
2717      * Returns an estimate of the total number of tasks currently held
2718      * in queues by worker threads (but not including tasks submitted
2719      * to the pool that have not begun executing). This value is only
2720      * an approximation, obtained by iterating across all threads in
2721      * the pool. This method may be useful for tuning task
2722      * granularities.
2723      *
2724      * @return the number of queued tasks
2725      */
2726     public long getQueuedTaskCount() {
2727         long count = 0;
2728         WorkQueue[] ws; WorkQueue w;


2729         if ((ws = workQueues) != null) {
2730             for (int i = 1; i < ws.length; i += 2) {
2731                 if ((w = ws[i]) != null)
2732                     count += w.queueSize();
2733             }
2734         }
2735         return count;
2736     }
2737 
2738     /**
2739      * Returns an estimate of the number of tasks submitted to this
2740      * pool that have not yet begun executing.  This method may take
2741      * time proportional to the number of submissions.
2742      *
2743      * @return the number of queued submissions
2744      */
2745     public int getQueuedSubmissionCount() {
2746         int count = 0;
2747         WorkQueue[] ws; WorkQueue w;


2748         if ((ws = workQueues) != null) {
2749             for (int i = 0; i < ws.length; i += 2) {
2750                 if ((w = ws[i]) != null)
2751                     count += w.queueSize();
2752             }
2753         }
2754         return count;
2755     }
2756 
2757     /**
2758      * Returns {@code true} if there are any tasks submitted to this
2759      * pool that have not yet begun executing.
2760      *
2761      * @return {@code true} if there are any queued submissions
2762      */
2763     public boolean hasQueuedSubmissions() {
2764         WorkQueue[] ws; WorkQueue w;

2765         if ((ws = workQueues) != null) {
2766             for (int i = 0; i < ws.length; i += 2) {
2767                 if ((w = ws[i]) != null && !w.isEmpty())
2768                     return true;
2769             }
2770         }
2771         return false;
2772     }
2773 
2774     /**
2775      * Removes and returns the next unexecuted submission if one is
2776      * available.  This method may be useful in extensions to this
2777      * class that re-assign work in systems with multiple pools.
2778      *
2779      * @return the next submission, or {@code null} if none
2780      */
2781     protected ForkJoinTask<?> pollSubmission() {
2782         return pollScan(true);
2783     }
2784 
2785     /**
2786      * Removes all available unexecuted submitted and forked tasks
2787      * from scheduling queues and adds them to the given collection,
2788      * without altering their execution status. These may include
2789      * artificially generated or wrapped tasks. This method is
2790      * designed to be invoked only when the pool is known to be
2791      * quiescent. Invocations at other times may not remove all
2792      * tasks. A failure encountered while attempting to add elements
2793      * to collection {@code c} may result in elements being in
2794      * neither, either or both collections when the associated
2795      * exception is thrown.  The behavior of this operation is
2796      * undefined if the specified collection is modified while the
2797      * operation is in progress.
2798      *
2799      * @param c the collection to transfer elements into
2800      * @return the number of elements transferred
2801      */
2802     protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
2803         int count = 0;
2804         WorkQueue[] ws; WorkQueue w; ForkJoinTask<?> t;


2805         if ((ws = workQueues) != null) {
2806             for (int i = 0; i < ws.length; ++i) {
2807                 if ((w = ws[i]) != null) {
2808                     while ((t = w.poll()) != null) {
2809                         c.add(t);
2810                         ++count;
2811                     }
2812                 }
2813             }
2814         }
2815         return count;
2816     }
2817 
2818     /**
2819      * Returns a string identifying this pool, as well as its state,
2820      * including indications of run state, parallelism level, and
2821      * worker and task counts.
2822      *
2823      * @return a string identifying this pool, as well as its state
2824      */
2825     public String toString() {
2826         // Use a single pass through workQueues to collect counts
2827         long qt = 0L, qs = 0L; int rc = 0;

2828         long st = stealCount;

2829         WorkQueue[] ws; WorkQueue w;
2830         if ((ws = workQueues) != null) {
2831             for (int i = 0; i < ws.length; ++i) {
2832                 if ((w = ws[i]) != null) {
2833                     int size = w.queueSize();
2834                     if ((i & 1) == 0)
2835                         qs += size;
2836                     else {
2837                         qt += size;
2838                         st += (long)w.nsteals & 0xffffffffL;
2839                         if (w.isApparentlyUnblocked())
2840                             ++rc;
2841                     }
2842                 }
2843             }
2844         }
2845 
2846         int md = mode;
2847         int pc = (md & SMASK);
2848         long c = ctl;
2849         int tc = pc + (short)(c >>> TC_SHIFT);
2850         int ac = pc + (int)(c >> RC_SHIFT);
2851         if (ac < 0) // ignore transient negative
2852             ac = 0;
2853         String level = ((md & TERMINATED) != 0 ? "Terminated" :
2854                         (md & STOP)       != 0 ? "Terminating" :
2855                         (md & SHUTDOWN)   != 0 ? "Shutting down" :
2856                         "Running");
2857         return super.toString() +
2858             "[" + level +
2859             ", parallelism = " + pc +
2860             ", size = " + tc +
2861             ", active = " + ac +
2862             ", running = " + rc +
2863             ", steals = " + st +
2864             ", tasks = " + qt +
2865             ", submissions = " + qs +
2866             "]";
2867     }
2868 


3114      * {@code blocker.block()} until either method returns {@code true}.
3115      * Every call to {@code blocker.block()} is preceded by a call to
3116      * {@code blocker.isReleasable()} that returned {@code false}.
3117      *
3118      * <p>If not running in a ForkJoinPool, this method is
3119      * behaviorally equivalent to
3120      * <pre> {@code
3121      * while (!blocker.isReleasable())
3122      *   if (blocker.block())
3123      *     break;}</pre>
3124      *
3125      * If running in a ForkJoinPool, the pool may first be expanded to
3126      * ensure sufficient parallelism available during the call to
3127      * {@code blocker.block()}.
3128      *
3129      * @param blocker the blocker task
3130      * @throws InterruptedException if {@code blocker.block()} did so
3131      */
3132     public static void managedBlock(ManagedBlocker blocker)
3133         throws InterruptedException {

3134         ForkJoinPool p;
3135         ForkJoinWorkerThread wt;
3136         WorkQueue w;
3137         Thread t = Thread.currentThread();
3138         if ((t instanceof ForkJoinWorkerThread) &&
3139             (p = (wt = (ForkJoinWorkerThread)t).pool) != null &&
3140             (w = wt.workQueue) != null) {
3141             int block;
3142             while (!blocker.isReleasable()) {
3143                 if ((block = p.tryCompensate(w)) != 0) {
3144                     try {
3145                         do {} while (!blocker.isReleasable() &&
3146                                      !blocker.block());
3147                     } finally {
3148                         CTL.getAndAdd(p, (block > 0) ? RC_UNIT : 0L);
3149                     }
3150                     break;
3151                 }
3152             }
3153         }
3154         else {
3155             do {} while (!blocker.isReleasable() &&
3156                          !blocker.block());
3157         }
3158     }
3159 
3160     /**
3161      * If the given executor is a ForkJoinPool, poll and execute
3162      * AsynchronousCompletionTasks from worker's queue until none are
3163      * available or blocker is released.
3164      */
3165     static void helpAsyncBlocker(Executor e, ManagedBlocker blocker) {
3166         if (blocker != null && (e instanceof ForkJoinPool)) {
3167             WorkQueue w; ForkJoinWorkerThread wt; WorkQueue[] ws; int r, n;
3168             ForkJoinPool p = (ForkJoinPool)e;
3169             Thread thread = Thread.currentThread();
3170             if (thread instanceof ForkJoinWorkerThread &&
3171                 (wt = (ForkJoinWorkerThread)thread).pool == p)
3172                 w = wt.workQueue;
3173             else if ((r = ThreadLocalRandom.getProbe()) != 0 &&
3174                      (ws = p.workQueues) != null && (n = ws.length) > 0)
3175                 w = ws[(n - 1) & r & SQMASK];
3176             else
3177                 w = null;
3178             if (w != null) {
3179                 for (;;) {
3180                     int b = w.base, s = w.top, d, al; ForkJoinTask<?>[] a;
3181                     if ((a = w.array) != null && (d = b - s) < 0 &&
3182                         (al = a.length) > 0) {
3183                         int index = (al - 1) & b;
3184                         ForkJoinTask<?> t = (ForkJoinTask<?>)
3185                             QA.getAcquire(a, index);
3186                         if (blocker.isReleasable())
3187                             break;
3188                         else if (b++ == w.base) {
3189                             if (t == null) {
3190                                 if (d == -1)
3191                                     break;
3192                             }
3193                             else if (!(t instanceof CompletableFuture.
3194                                   AsynchronousCompletionTask))
3195                                 break;
3196                             else if (QA.compareAndSet(a, index, t, null)) {
3197                                 w.base = b;
3198                                 t.doExec();
3199                             }
3200                         }
3201                     }
3202                     else
3203                         break;
3204                 }
3205             }
3206         }
3207     }
3208 
3209     // AbstractExecutorService overrides.  These rely on undocumented
3210     // fact that ForkJoinTask.adapt returns ForkJoinTasks that also
3211     // implement RunnableFuture.
3212 
3213     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
3214         return new ForkJoinTask.AdaptedRunnable<T>(runnable, value);
3215     }
3216 
3217     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
3218         return new ForkJoinTask.AdaptedCallable<T>(callable);
3219     }
3220 
3221     // VarHandle mechanics
3222     private static final VarHandle CTL;
3223     private static final VarHandle MODE;
3224     private static final VarHandle QA;
3225 
3226     static {
3227         try {
3228             MethodHandles.Lookup l = MethodHandles.lookup();
3229             CTL = l.findVarHandle(ForkJoinPool.class, "ctl", long.class);
3230             MODE = l.findVarHandle(ForkJoinPool.class, "mode", int.class);
3231             QA = MethodHandles.arrayElementVarHandle(ForkJoinTask[].class);
3232         } catch (ReflectiveOperationException e) {
3233             throw new Error(e);
3234         }
3235 
3236         // Reduce the risk of rare disastrous classloading in first call to
3237         // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
3238         Class<?> ensureLoaded = LockSupport.class;
3239 
3240         int commonMaxSpares = DEFAULT_COMMON_MAX_SPARES;
3241         try {
3242             String p = System.getProperty
3243                 ("java.util.concurrent.ForkJoinPool.common.maximumSpares");
3244             if (p != null)
3245                 commonMaxSpares = Integer.parseInt(p);
3246         } catch (Exception ignore) {}
3247         COMMON_MAX_SPARES = commonMaxSpares;
3248 
3249         defaultForkJoinWorkerThreadFactory =
3250             new DefaultForkJoinWorkerThreadFactory();
3251         modifyThreadPermission = new RuntimePermission("modifyThread");
3252 
3253         common = AccessController.doPrivileged(new PrivilegedAction<>() {




 167  * maximum number of running threads to 32767. Attempts to create
 168  * pools with greater than the maximum number result in
 169  * {@code IllegalArgumentException}.
 170  *
 171  * <p>This implementation rejects submitted tasks (that is, by throwing
 172  * {@link RejectedExecutionException}) only when the pool is shut down
 173  * or internal resources have been exhausted.
 174  *
 175  * @since 1.7
 176  * @author Doug Lea
 177  */
 178 public class ForkJoinPool extends AbstractExecutorService {
 179 
 180     /*
 181      * Implementation Overview
 182      *
 183      * This class and its nested classes provide the main
 184      * functionality and control for a set of worker threads:
 185      * Submissions from non-FJ threads enter into submission queues.
 186      * Workers take these tasks and typically split them into subtasks
 187      * that may be stolen by other workers. Work-stealing based on
 188      * randomized scans generally leads to better throughput than
 189      * "work dealing" in which producers assign tasks to idle threads,
 190      * in part because threads that have finished other tasks before
 191      * the signalled thread wakes up (which can be a long time) can
 192      * take the task instead.  Preference rules give first priority to
 193      * processing tasks from their own queues (LIFO or FIFO, depending
 194      * on mode), then to randomized FIFO steals of tasks in other
 195      * queues.  This framework began as vehicle for supporting
 196      * tree-structured parallelism using work-stealing.  Over time,
 197      * its scalability advantages led to extensions and changes to
 198      * better support more diverse usage contexts.  Because most
 199      * internal methods and nested classes are interrelated, their
 200      * main rationale and descriptions are presented here; individual
 201      * methods and nested classes contain only brief comments about
 202      * details.
 203      *
 204      * WorkQueues
 205      * ==========
 206      *
 207      * Most operations occur within work-stealing queues (in nested
 208      * class WorkQueue).  These are special forms of Deques that
 209      * support only three of the four possible end-operations -- push,
 210      * pop, and poll (aka steal), under the further constraints that
 211      * push and pop are called only from the owning thread (or, as
 212      * extended here, under a lock), while poll may be called from
 213      * other threads.  (If you are unfamiliar with them, you probably
 214      * want to read Herlihy and Shavit's book "The Art of
 215      * Multiprocessor programming", chapter 16 describing these in
 216      * more detail before proceeding.)  The main work-stealing queue
 217      * design is roughly similar to those in the papers "Dynamic
 218      * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
 219      * (http://research.sun.com/scalable/pubs/index.html) and
 220      * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
 221      * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
 222      * The main differences ultimately stem from GC requirements that
 223      * we null out taken slots as soon as we can, to maintain as small
 224      * a footprint as possible even in programs generating huge
 225      * numbers of tasks. To accomplish this, we shift the CAS
 226      * arbitrating pop vs poll (steal) from being on the indices
 227      * ("base" and "top") to the slots themselves.
 228      *
 229      * Adding tasks then takes the form of a classic array push(task)
 230      * in a circular buffer:
 231      *    q.array[q.top++ % length] = task;
 232      *
 233      * (The actual code needs to null-check and size-check the array,
 234      * uses masking, not mod, for indexing a power-of-two-sized array,
 235      * adds a release fence for publication, and possibly signals
 236      * waiting workers to start scanning -- see below.)  Both a
 237      * successful pop and poll mainly entail a CAS of a slot from
 238      * non-null to null.
 239      *
 240      * The pop operation (always performed by owner) is:
 241      *   if ((the task at top slot is not null) and
 242      *        (CAS slot to null))
 243      *           decrement top and return task;
 244      *
 245      * And the poll operation (usually by a stealer) is
 246      *    if ((the task at base slot is not null) and
 247      *        (CAS slot to null))
 248      *           increment base and return task;
 249      *
 250      * There are several variants of each of these. Most uses occur
 251      * within operations that also interleave contention or emptiness
 252      * tracking or inspection of elements before extracting them, so
 253      * must interleave these with the above code. When performed by
 254      * owner, getAndSet is used instead of CAS (see for example method
 255      * nextLocalTask) which is usually more efficient, and possible
 256      * because the top index cannot independently change during the
 257      * operation.
 258      *
 259      * Memory ordering.  See "Correct and Efficient Work-Stealing for
 260      * Weak Memory Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013
 261      * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an
 262      * analysis of memory ordering requirements in work-stealing
 263      * algorithms similar to (but different than) the one used here.
 264      * Extracting tasks in array slots via (fully fenced) CAS provides
 265      * primary synchronization. The base and top indices imprecisely
 266      * guide where to extract from. We do not usually require strict
 267      * orderings of array and index updates. Many index accesses use
 268      * plain mode, with ordering constrained by surrounding context
 269      * (usually with respect to element CASes or the two WorkQueue
 270      * volatile fields source and phase). When not otherwise already
 271      * constrained, reads of "base" by queue owners use acquire-mode,
 272      * and some externally callable methods preface accesses with
 273      * acquire fences.  Additionally, to ensure that index update
 274      * writes are not coalesced or postponed in loops etc, "opaque"
 275      * mode is used in a few cases where timely writes are not
 276      * otherwise ensured. The "locked" versions of push- and pop-
 277      * based methods for shared queues differ from owned versions
 278      * because locking already forces some of the ordering.
 279      *
 280      * Because indices and slot contents cannot always be consistent,
 281      * a check that base == top indicates (momentary) emptiness, but
 282      * otherwise may err on the side of possibly making the queue
 283      * appear nonempty when a push, pop, or poll have not fully
 284      * committed, or making it appear empty when an update of top has
 285      * not yet been visibly written.  (Method isEmpty() checks the
 286      * case of a partially completed removal of the last element.)
 287      * Because of this, the poll operation, considered individually,
 288      * is not wait-free. One thief cannot successfully continue until
 289      * another in-progress one (or, if previously empty, a push)
 290      * visibly completes.  This can stall threads when required to
 291      * consume from a given queue (see method poll()).  However, in
 292      * the aggregate, we ensure at least probabilistic
 293      * non-blockingness.  If an attempted steal fails, a scanning
 294      * thief chooses a different random victim target to try next. So,
 295      * in order for one thief to progress, it suffices for any
 296      * in-progress poll or new push on any empty queue to complete.

 297      *
 298      * This approach also enables support of a user mode in which
 299      * local task processing is in FIFO, not LIFO order, simply by
 300      * using poll rather than pop.  This can be useful in
 301      * message-passing frameworks in which tasks are never joined.
 302      *
 303      * WorkQueues are also used in a similar way for tasks submitted
 304      * to the pool. We cannot mix these tasks in the same queues used
 305      * by workers. Instead, we randomly associate submission queues
 306      * with submitting threads, using a form of hashing.  The
 307      * ThreadLocalRandom probe value serves as a hash code for
 308      * choosing existing queues, and may be randomly repositioned upon
 309      * contention with other submitters.  In essence, submitters act
 310      * like workers except that they are restricted to executing local
 311      * tasks that they submitted.  Insertion of tasks in shared mode
 312      * requires a lock but we use only a simple spinlock (using field
 313      * phase), because submitters encountering a busy queue move to a
 314      * different position to use or create other queues -- they block
 315      * only when creating and registering new queues. Because it is
 316      * used only as a spinlock, unlocking requires only a "releasing"
 317      * store (using setRelease) unless otherwise signalling.
 318      *
 319      * Management
 320      * ==========
 321      *
 322      * The main throughput advantages of work-stealing stem from
 323      * decentralized control -- workers mostly take tasks from
 324      * themselves or each other, at rates that can exceed a billion
 325      * per second.  The pool itself creates, activates (enables
 326      * scanning for and running tasks), deactivates, blocks, and
 327      * terminates threads, all with minimal central information.
 328      * There are only a few properties that we can globally track or
 329      * maintain, so we pack them into a small number of variables,
 330      * often maintaining atomicity without blocking or locking.
 331      * Nearly all essentially atomic control state is held in a few
 332      * volatile variables that are by far most often read (not
 333      * written) as status and consistency checks. We pack as much
 334      * information into them as we can.
 335      *
 336      * Field "ctl" contains 64 bits holding information needed to
 337      * atomically decide to add, enqueue (on an event queue), and
 338      * dequeue and release workers.  To enable this packing, we
 339      * restrict maximum parallelism to (1<<15)-1 (which is far in
 340      * excess of normal operating range) to allow ids, counts, and
 341      * their negations (used for thresholding) to fit into 16bit
 342      * subfields.
 343      *
 344      * Field "mode" holds configuration parameters as well as lifetime
 345      * status, atomically and monotonically setting SHUTDOWN, STOP,
 346      * and finally TERMINATED bits.
 347      *
 348      * Field "workQueues" holds references to WorkQueues.  It is
 349      * updated (only during worker creation and termination) under
 350      * lock (using field workerNamePrefix as lock), but is otherwise
 351      * concurrently readable, and accessed directly. We also ensure
 352      * that uses of the array reference itself never become too stale
 353      * in case of resizing, by arranging that (re-)reads are separated
 354      * by at least one acquiring read access.  To simplify index-based
 355      * operations, the array size is always a power of two, and all
 356      * readers must tolerate null slots. Worker queues are at odd
 357      * indices. Shared (submission) queues are at even indices, up to
 358      * a maximum of 64 slots, to limit growth even if the array needs
 359      * to expand to add more workers. Grouping them together in this
 360      * way simplifies and speeds up task scanning.
 361      *
 362      * All worker thread creation is on-demand, triggered by task
 363      * submissions, replacement of terminated workers, and/or
 364      * compensation for blocked workers. However, all other support
 365      * code is set up to work with other policies.  To ensure that we
 366      * do not hold on to worker references that would prevent GC, all
 367      * accesses to workQueues are via indices into the workQueues
 368      * array (which is one source of some of the messy code
 369      * constructions here). In essence, the workQueues array serves as
 370      * a weak reference mechanism. Thus for example the stack top
 371      * subfield of ctl stores indices, not references.
 372      *
 373      * Queuing Idle Workers. Unlike HPC work-stealing frameworks, we
 374      * cannot let workers spin indefinitely scanning for tasks when
 375      * none can be found immediately, and we cannot start/resume
 376      * workers unless there appear to be tasks available.  On the
 377      * other hand, we must quickly prod them into action when new
 378      * tasks are submitted or generated. In many usages, ramp-up time
 379      * is the main limiting factor in overall performance, which is
 380      * compounded at program start-up by JIT compilation and


 418      * exception is propagated, generally to some external caller.
 419      * Worker index assignment avoids the bias in scanning that would
 420      * occur if entries were sequentially packed starting at the front
 421      * of the workQueues array. We treat the array as a simple
 422      * power-of-two hash table, expanding as needed. The seedIndex
 423      * increment ensures no collisions until a resize is needed or a
 424      * worker is deregistered and replaced, and thereafter keeps
 425      * probability of collision low. We cannot use
 426      * ThreadLocalRandom.getProbe() for similar purposes here because
 427      * the thread has not started yet, but do so for creating
 428      * submission queues for existing external threads (see
 429      * externalPush).
 430      *
 431      * WorkQueue field "phase" is used by both workers and the pool to
 432      * manage and track whether a worker is UNSIGNALLED (possibly
 433      * blocked waiting for a signal).  When a worker is enqueued its
 434      * phase field is set. Note that phase field updates lag queue CAS
 435      * releases so usage requires care -- seeing a negative phase does
 436      * not guarantee that the worker is available. When queued, the
 437      * lower 16 bits of scanState must hold its pool index. So we
 438      * place the index there upon initialization and otherwise keep it
 439      * there or restore it when necessary.
 440      *
 441      * The ctl field also serves as the basis for memory
 442      * synchronization surrounding activation. This uses a more
 443      * efficient version of a Dekker-like rule that task producers and
 444      * consumers sync with each other by both writing/CASing ctl (even
 445      * if to its current value).  This would be extremely costly. So
 446      * we relax it in several ways: (1) Producers only signal when
 447      * their queue is possibly empty at some point during a push
 448      * operation (which requires conservatively checking size zero or
 449      * one to cover races). (2) Other workers propagate this signal
 450      * when they find tasks in a queue with size greater than one. (3)
 451      * Workers only enqueue after scanning (see below) and not finding
 452      * any tasks.  (4) Rather than CASing ctl to its current value in
 453      * the common case where no action is required, we reduce write
 454      * contention by equivalently prefacing signalWork when called by
 455      * an external task producer using a memory access with
 456      * full-volatile semantics or a "fullFence".
 457      *
 458      * Almost always, too many signals are issued, in part because a
 459      * task producer cannot tell if some existing worker is in the
 460      * midst of finishing one task (or already scanning) and ready to
 461      * take another without being signalled. So the producer might
 462      * instead activate a different worker that does not find any
 463      * work, and then inactivates. This scarcely matters in
 464      * steady-state computations involving all workers, but can create
 465      * contention and bookkeeping bottlenecks during ramp-up,
 466      * ramp-down, and small computations involving only a few workers.
 467      *
 468      * Scanning. Method scan (from runWorker) performs top-level
 469      * scanning for tasks. (Similar scans appear in helpQuiesce and
 470      * pollScan.)  Each scan traverses and tries to poll from each
 471      * queue starting at a random index. Scans are not performed in
 472      * ideal random permutation order, to reduce cacheline
 473      * contention. The pseudorandom generator need not have
 474      * high-quality statistical properties in the long term, but just
 475      * within computations; We use Marsaglia XorShifts (often via
 476      * ThreadLocalRandom.nextSecondarySeed), which are cheap and
 477      * suffice. Scanning also includes contention reduction: When
 478      * scanning workers fail to extract an apparently existing task,
 479      * they soon restart at a different pseudorandom index.  This form
 480      * of backoff improves throughput when many threads are trying to
 481      * take tasks from few queues, which can be common in some usages.
 482      * Scans do not otherwise explicitly take into account core
 483      * affinities, loads, cache localities, etc, However, they do
 484      * exploit temporal locality (which usually approximates these) by
 485      * preferring to re-poll from the same queue after a successful
 486      * poll before trying others (see method topLevelExec). However
 487      * this preference is bounded (see TOP_BOUND_SHIFT) as a safeguard
 488      * against infinitely unfair looping under unbounded user task
 489      * recursion, and also to reduce long-term contention when many
 490      * threads poll few queues holding many small tasks. The bound is
 491      * high enough to avoid much impact on locality and scheduling
 492      * overhead.
 493      *
 494      * Trimming workers. To release resources after periods of lack of
 495      * use, a worker starting to wait when the pool is quiescent will
 496      * time out and terminate (see method runWorker) if the pool has
 497      * remained quiescent for period given by field keepAlive.
 498      *
 499      * Shutdown and Termination. A call to shutdownNow invokes
 500      * tryTerminate to atomically set a runState bit. The calling
 501      * thread, as well as every other worker thereafter terminating,
 502      * helps terminate others by cancelling their unprocessed tasks,
 503      * and waking them up, doing so repeatedly until stable. Calls to
 504      * non-abrupt shutdown() preface this by checking whether
 505      * termination should commence by sweeping through queues (until
 506      * stable) to ensure lack of in-flight submissions and workers
 507      * about to process them before triggering the "STOP" phase of
 508      * termination.
 509      *
 510      * Joining Tasks
 511      * =============
 512      *
 513      * Any of several actions may be taken when one worker is waiting
 514      * to join a task stolen (or always held) by another.  Because we
 515      * are multiplexing many tasks on to a pool of workers, we can't
 516      * always just let them block (as in Thread.join).  We also cannot


 544      * actively joined task.  Thus, the joiner executes a task that
 545      * would be on its own local deque if the to-be-joined task had
 546      * not been stolen. This is a conservative variant of the approach
 547      * described in Wagner & Calder "Leapfrogging: a portable
 548      * technique for implementing efficient futures" SIGPLAN Notices,
 549      * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs
 550      * mainly in that we only record queue ids, not full dependency
 551      * links.  This requires a linear scan of the workQueues array to
 552      * locate stealers, but isolates cost to when it is needed, rather
 553      * than adding to per-task overhead. Searches can fail to locate
 554      * stealers GC stalls and the like delay recording sources.
 555      * Further, even when accurately identified, stealers might not
 556      * ever produce a task that the joiner can in turn help with. So,
 557      * compensation is tried upon failure to find tasks to run.
 558      *
 559      * Compensation does not by default aim to keep exactly the target
 560      * parallelism number of unblocked threads running at any given
 561      * time. Some previous versions of this class employed immediate
 562      * compensations for any blocked join. However, in practice, the
 563      * vast majority of blockages are transient byproducts of GC and
 564      * other JVM or OS activities that are made worse by replacement
 565      * when they cause longer-term oversubscription.  Rather than
 566      * impose arbitrary policies, we allow users to override the
 567      * default of only adding threads upon apparent starvation.  The
 568      * compensation mechanism may also be bounded.  Bounds for the
 569      * commonPool (see COMMON_MAX_SPARES) better enable JVMs to cope
 570      * with programming errors and abuse before running out of
 571      * resources to do so.
 572      *
 573      * Common Pool
 574      * ===========
 575      *
 576      * The static common pool always exists after static
 577      * initialization.  Since it (or any other created pool) need
 578      * never be used, we minimize initial construction overhead and
 579      * footprint to the setup of about a dozen fields.
 580      *
 581      * When external threads submit to the common pool, they can
 582      * perform subtask processing (see externalHelpComplete and
 583      * related methods) upon joins.  This caller-helps policy makes it
 584      * sensible to set common pool parallelism level to one (or more)
 585      * less than the total number of available cores, or even zero for
 586      * pure caller-runs.  We do not need to record whether external
 587      * submissions are to the common pool -- if not, external help
 588      * methods return quickly. These submitters would otherwise be
 589      * blocked waiting for completion, so the extra effort (with
 590      * liberally sprinkled task status checks) in inapplicable cases
 591      * amounts to an odd form of limited spin-wait before blocking in
 592      * ForkJoinTask.join.
 593      *
 594      * As a more appropriate default in managed environments, unless
 595      * overridden by system properties, we use workers of subclass
 596      * InnocuousForkJoinWorkerThread when there is a SecurityManager
 597      * present. These workers have no permissions set, do not belong
 598      * to any user-defined ThreadGroup, and erase all ThreadLocals
 599      * after executing any top-level task (see
 600      * WorkQueue.afterTopLevelExec).  The associated mechanics (mainly
 601      * in ForkJoinWorkerThread) may be JVM-dependent and must access
 602      * particular Thread class fields to achieve this effect.
 603      *
 604      * Memory placement
 605      * ================
 606      *
 607      * Performance can be very sensitive to placement of instances of
 608      * ForkJoinPool and WorkQueues and their queue arrays. To reduce
 609      * false-sharing impact, the @Contended annotation isolates
 610      * adjacent WorkQueue instances, as well as the ForkJoinPool.ctl
 611      * field. WorkQueue arrays are allocated (by their threads) with
 612      * larger initial sizes than most ever need, mostly to reduce
 613      * false sharing with current garbage collectors that use cardmark
 614      * tables.
 615      *
 616      * Style notes
 617      * ===========
 618      *
 619      * Memory ordering relies mainly on VarHandles.  This can be
 620      * awkward and ugly, but also reflects the need to control
 621      * outcomes across the unusual cases that arise in very racy code
 622      * with very few invariants. All fields are read into locals
 623      * before use, and null-checked if they are references.  Array
 624      * accesses using masked indices include checks (that are always
 625      * true) that the array length is non-zero to avoid compilers
 626      * inserting more expensive traps.  This is usually done in a
 627      * "C"-like style of listing declarations at the heads of methods
 628      * or blocks, and using inline assignments on first encounter.
 629      * Nearly all explicit checks lead to bypass/return, not exception
 630      * throws, because they may legitimately arise due to
 631      * cancellation/revocation during shutdown.
 632      *
 633      * There is a lot of representation-level coupling among classes
 634      * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask.  The
 635      * fields of WorkQueue maintain data structures managed by
 636      * ForkJoinPool, so are directly accessed.  There is little point
 637      * trying to reduce this, since any associated future changes in
 638      * representations will need to be accompanied by algorithmic
 639      * changes anyway. Several methods intrinsically sprawl because
 640      * they must accumulate sets of consistent reads of fields held in
 641      * local variables. Some others are artificially broken up to
 642      * reduce producer/consumer imbalances due to dynamic compilation.
 643      * There are also other coding oddities (including several
 644      * unnecessary-looking hoisted null checks) that help some methods
 645      * perform reasonably even when interpreted (not compiled).
 646      *
 647      * The order of declarations in this file is (with a few exceptions):
 648      * (1) Static utility functions
 649      * (2) Nested (static) classes
 650      * (3) Static fields
 651      * (4) Fields, along with constants used when unpacking some of them
 652      * (5) Internal control methods
 653      * (6) Callbacks and other support for ForkJoinTask methods
 654      * (7) Exported methods
 655      * (8) Static block initializing statics in minimally dependent order
 656      */
 657 
 658     // Static utilities
 659 
 660     /**
 661      * If there is a security manager, makes sure caller has
 662      * permission to modify threads.
 663      */
 664     private static void checkPermission() {
 665         SecurityManager security = System.getSecurityManager();


 729     static final int SWIDTH       = 16;            // width of short
 730     static final int SMASK        = 0xffff;        // short bits == max index
 731     static final int MAX_CAP      = 0x7fff;        // max #workers - 1
 732     static final int SQMASK       = 0x007e;        // max 64 (even) slots
 733 
 734     // Masks and units for WorkQueue.phase and ctl sp subfield
 735     static final int UNSIGNALLED  = 1 << 31;       // must be negative
 736     static final int SS_SEQ       = 1 << 16;       // version count
 737     static final int QLOCK        = 1;             // must be 1
 738 
 739     // Mode bits and sentinels, some also used in WorkQueue id and.source fields
 740     static final int OWNED        = 1;             // queue has owner thread
 741     static final int FIFO         = 1 << 16;       // fifo queue or access mode
 742     static final int SHUTDOWN     = 1 << 18;
 743     static final int TERMINATED   = 1 << 19;
 744     static final int STOP         = 1 << 31;       // must be negative
 745     static final int QUIET        = 1 << 30;       // not scanning or working
 746     static final int DORMANT      = QUIET | UNSIGNALLED;
 747 
 748     /**
 749      * Initial capacity of work-stealing queue array.
 750      * Must be a power of two, at least 2.


 751      */
 752     static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
 753 
 754     /**
 755      * Maximum capacity for queue arrays. Must be a power of two less
 756      * than or equal to 1 << (31 - width of array entry) to ensure
 757      * lack of wraparound of index calculations, but defined to a
 758      * value a bit less than this to help users trap runaway programs
 759      * before saturating systems.


 760      */
 761     static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M

 762 
 763     /**
 764      * The maximum number of top-level polls per worker before
 765      * checking other queues, expressed as a bit shift to, in effect,
 766      * multiply by pool size, and then use as random value mask, so
 767      * average bound is about poolSize*(1<<TOP_BOUND_SHIFT).  See
 768      * above for rationale.


 769      */
 770     static final int TOP_BOUND_SHIFT = 10;
 771 
 772     /**
 773      * Queues supporting work-stealing as well as external task
 774      * submission. See above for descriptions and algorithms.



 775      */
 776     @jdk.internal.vm.annotation.Contended
 777     static final class WorkQueue {
 778         volatile int source;       // source queue id, or sentinel
 779         int id;                    // pool index, mode, tag
 780         int base;                  // index of next slot for poll
 781         int top;                   // index of next slot for push
 782         volatile int phase;        // versioned, negative: queued, 1: locked
 783         int stackPred;             // pool stack (ctl) predecessor link
 784         int nsteals;               // number of steals
 785         ForkJoinTask<?>[] array;   // the queued tasks; power of 2 size




 786         final ForkJoinPool pool;   // the containing pool (may be null)
 787         final ForkJoinWorkerThread owner; // owning thread or null if shared
 788 
 789         WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) {
 790             this.pool = pool;
 791             this.owner = owner;
 792             // Place indices in the center of array (that is not yet allocated)
 793             base = top = INITIAL_QUEUE_CAPACITY >>> 1;
 794         }
 795 
 796         /**
 797          * Tries to lock shared queue by CASing phase field.
 798          */
 799         final boolean tryLockPhase() {
 800             return PHASE.compareAndSet(this, 0, 1);
 801         }
 802 
 803         final void releasePhaseLock() {
 804             PHASE.setRelease(this, 0);
 805         }
 806 
 807         /**
 808          * Returns an exportable index (used by ForkJoinWorkerThread).
 809          */
 810         final int getPoolIndex() {
 811             return (id & 0xffff) >>> 1; // ignore odd/even tag bit
 812         }
 813 
 814         /**
 815          * Returns the approximate number of tasks in the queue.
 816          */
 817         final int queueSize() {
 818             int n = (int)BASE.getAcquire(this) - top;
 819             return (n >= 0) ? 0 : -n; // ignore transient negative
 820         }
 821 
 822         /**
 823          * Provides a more accurate estimate of whether this queue has
 824          * any tasks than does queueSize, by checking whether a
 825          * near-empty queue has at least one unclaimed task.
 826          */
 827         final boolean isEmpty() {
 828             ForkJoinTask<?>[] a; int n, cap, b;
 829             VarHandle.acquireFence(); // needed by external callers
 830             return ((n = (b = base) - top) >= 0 || // possibly one task
 831                     (n == -1 && ((a = array) == null ||
 832                                  (cap = a.length) == 0 ||
 833                                  a[(cap - 1) & b] == null)));
 834         }
 835 

 836         /**
 837          * Pushes a task. Call only by owner in unshared queues.
 838          *
 839          * @param task the task. Caller must ensure non-null.
 840          * @throws RejectedExecutionException if array cannot be resized
 841          */
 842         final void push(ForkJoinTask<?> task) {
 843             ForkJoinTask<?>[] a;
 844             int s = top, d, cap, m;

 845             ForkJoinPool p = pool;
 846             if ((a = array) != null && (cap = a.length) > 0) {
 847                 QA.setRelease(a, (m = cap - 1) & s, task);
 848                 top = s + 1;
 849                 if (((d = s - (int)BASE.getAcquire(this)) & ~1) == 0 &&
 850                     p != null) {                 // size 0 or 1
 851                     VarHandle.fullFence();
 852                     p.signalWork();
 853                 }
 854                 else if (d == m)
 855                     growArray(false);
 856             }
 857         }
 858 
 859         /**
 860          * Version of push for shared queues. Call only with phase lock held.
 861          * @return true if should signal work

 862          */
 863         final boolean lockedPush(ForkJoinTask<?> task) {
 864             ForkJoinTask<?>[] a;
 865             boolean signal = false;
 866             int s = top, b = base, cap, d;
 867             if ((a = array) != null && (cap = a.length) > 0) {
 868                 a[(cap - 1) & s] = task;
 869                 top = s + 1;
 870                 if (b - s + cap - 1 == 0)
 871                     growArray(true);
 872                 else {
 873                     phase = 0; // full volatile unlock
 874                     if (((s - base) & ~1) == 0) // size 0 or 1
 875                         signal = true;







 876                 }
 877             }
 878             return signal;
 879         }
 880 
 881         /**
 882          * Doubles the capacity of array. Call either by owner or with
 883          * lock held -- it is OK for base, but not top, to move while
 884          * resizings are in progress.
 885          */
 886         final void growArray(boolean locked) {
 887             ForkJoinTask<?>[] newA = null;
 888             try {
 889                 ForkJoinTask<?>[] oldA; int oldSize, newSize;
 890                 if ((oldA = array) != null && (oldSize = oldA.length) > 0 &&
 891                     (newSize = oldSize << 1) <= MAXIMUM_QUEUE_CAPACITY &&
 892                     newSize > 0) {
 893                     try {
 894                         newA = new ForkJoinTask<?>[newSize];
 895                     } catch (OutOfMemoryError ex) {
 896                     }
 897                     if (newA != null) { // poll from old array, push to new
 898                         int oldMask = oldSize - 1, newMask = newSize - 1;
 899                         for (int s = top - 1, k = oldMask; k >= 0; --k) {
 900                             ForkJoinTask<?> x = (ForkJoinTask<?>)
 901                                 QA.getAndSet(oldA, s & oldMask, null);
 902                             if (x != null)
 903                                 newA[s-- & newMask] = x;
 904                             else
 905                                 break;
 906                         }
 907                         array = newA;
 908                         VarHandle.releaseFence();

 909                     }
 910                 }
 911             } finally {
 912                 if (locked)
 913                     phase = 0;
 914             }
 915             if (newA == null)
 916                 throw new RejectedExecutionException("Queue capacity exceeded");
 917         }
 918 
 919         /**
 920          * Takes next task, if one exists, in FIFO order.
 921          */
 922         final ForkJoinTask<?> poll() {
 923             int b, k, cap; ForkJoinTask<?>[] a;
 924             while ((a = array) != null && (cap = a.length) > 0 &&
 925                    top - (b = base) > 0) {


 926                 ForkJoinTask<?> t = (ForkJoinTask<?>)
 927                     QA.getAcquire(a, k = (cap - 1) & b);
 928                 if (base == b++) {
 929                     if (t == null)
 930                         Thread.yield(); // await index advance
 931                     else if (QA.compareAndSet(a, k, t, null)) {
 932                         BASE.setOpaque(this, b);
 933                         return t;
 934                     }
 935                 }






 936             }
 937             return null;
 938         }
 939 
 940         /**
 941          * Takes next task, if one exists, in order specified by mode.
 942          */
 943         final ForkJoinTask<?> nextLocalTask() {
 944             ForkJoinTask<?> t = null;
 945             int md = id, b, s, d, cap; ForkJoinTask<?>[] a;
 946             if ((a = array) != null && (cap = a.length) > 0 &&
 947                 (d = (s = top) - (b = base)) > 0) {
 948                 if ((md & FIFO) == 0 || d == 1) {
 949                     if ((t = (ForkJoinTask<?>)
 950                          QA.getAndSet(a, (cap - 1) & --s, null)) != null)
 951                         TOP.setOpaque(this, s);
 952                 }
 953                 else if ((t = (ForkJoinTask<?>)
 954                           QA.getAndSet(a, (cap - 1) & b++, null)) != null) {
 955                     BASE.setOpaque(this, b);
 956                 }
 957                 else // on contention in FIFO mode, use regular poll
 958                     t = poll();
 959             }
 960             return t;
 961         }
 962 
 963         /**
 964          * Returns next task, if one exists, in order specified by mode.
 965          */
 966         final ForkJoinTask<?> peek() {
 967             int cap; ForkJoinTask<?>[] a;
 968             return ((a = array) != null && (cap = a.length) > 0) ?
 969                 a[(cap - 1) & ((id & FIFO) != 0 ? base : top - 1)] : null;

 970         }
 971 
 972         /**
 973          * Pops the given task only if it is at the current top.
 974          */
 975         final boolean tryUnpush(ForkJoinTask<?> task) {
 976             boolean popped = false;
 977             int s, cap; ForkJoinTask<?>[] a;
 978             if ((a = array) != null && (cap = a.length) > 0 &&
 979                 (s = top) != base &&
 980                 (popped = QA.compareAndSet(a, (cap - 1) & --s, task, null)))
 981                 TOP.setOpaque(this, s);
 982             return popped;
 983         }
 984 
 985         /**
 986          * Shared version of tryUnpush.
 987          */
 988         final boolean tryLockedUnpush(ForkJoinTask<?> task) {
 989             boolean popped = false;
 990             int s = top - 1, k, cap; ForkJoinTask<?>[] a;
 991             if ((a = array) != null && (cap = a.length) > 0 &&
 992                 a[k = (cap - 1) & s] == task && tryLockPhase()) {
 993                 if (top == s + 1 && array == a &&
 994                     (popped = QA.compareAndSet(a, k, task, null)))
 995                     top = s;
 996                 releasePhaseLock();
 997             }
 998             return popped;
 999         }
1000 
1001         /**
1002          * Removes and cancels all known tasks, ignoring any exceptions.
1003          */
1004         final void cancelAll() {
1005             for (ForkJoinTask<?> t; (t = poll()) != null; )
1006                 ForkJoinTask.cancelIgnoringExceptions(t);
1007         }
1008 
1009         // Specialized execution methods
1010 
1011         /**
1012          * Runs the given (stolen) task if nonnull, as well as
1013          * remaining local tasks and others available from the given
1014          * queue, up to bound n (to avoid infinite unfairness).
1015          */
1016         final void topLevelExec(ForkJoinTask<?> t, WorkQueue q, int n) {
1017             if (t != null && q != null) { // hoist checks
1018                 int nstolen = 1;
1019                 for (;;) {








1020                     t.doExec();
1021                     if (n-- < 0)
1022                         break;
1023                     else if ((t = nextLocalTask()) == null) {
1024                         if ((t = q.poll()) == null)
1025                             break;

1026                         else
1027                             ++nstolen;
1028                     }
1029                 }
1030                 ForkJoinWorkerThread thread = owner;
1031                 nsteals += nstolen;
1032                 source = 0;
1033                 if (thread != null)
1034                     thread.afterTopLevelExec();






















1035             }
1036         }
1037 
1038         /**
1039          * If present, removes task from queue and executes it.
1040          */
1041         final void tryRemoveAndExec(ForkJoinTask<?> task) {
1042             ForkJoinTask<?>[] a; int s, cap;
1043             if ((a = array) != null && (cap = a.length) > 0 &&
1044                 (s = top) - base > 0) { // traverse from top
1045                 for (int m = cap - 1, ns = s - 1, i = ns; ; --i) {
1046                     int index = i & m;
1047                     ForkJoinTask<?> t = (ForkJoinTask<?>)QA.get(a, index);

1048                     if (t == null)
1049                         break;
1050                     else if (t == task) {
1051                         if (QA.compareAndSet(a, index, t, null)) {
1052                             top = ns;   // safely shift down
1053                             for (int j = i; j != ns; ++j) {
1054                                 ForkJoinTask<?> f;
1055                                 int pindex = (j + 1) & m;
1056                                 f = (ForkJoinTask<?>)QA.get(a, pindex);
1057                                 QA.setVolatile(a, pindex, null);
1058                                 int jindex = j & m;
1059                                 QA.setRelease(a, jindex, f);
1060                             }
1061                             VarHandle.releaseFence();
1062                             t.doExec();
1063                         }
1064                         break;
1065                     }
1066                 }
1067             }
1068         }
1069 
1070         /**
1071          * Tries to pop and run tasks within the target's computation
1072          * until done, not found, or limit exceeded.
1073          *
1074          * @param task root of CountedCompleter computation
1075          * @param limit max runs, or zero for no limit
1076          * @param shared true if must lock to extract task
1077          * @return task status on exit
1078          */
1079         final int helpCC(CountedCompleter<?> task, int limit, boolean shared) {
1080             int status = 0;
1081             if (task != null && (status = task.status) >= 0) {
1082                 int s, k, cap; ForkJoinTask<?>[] a;
1083                 while ((a = array) != null && (cap = a.length) > 0 &&
1084                        (s = top) - base > 0) {
1085                     CountedCompleter<?> v = null;
1086                     ForkJoinTask<?> o = a[k = (cap - 1) & (s - 1)];


1087                     if (o instanceof CountedCompleter) {
1088                         CountedCompleter<?> t = (CountedCompleter<?>)o;
1089                         for (CountedCompleter<?> f = t;;) {
1090                             if (f != task) {
1091                                 if ((f = f.completer) == null)
1092                                     break;
1093                             }
1094                             else if (shared) {
1095                                 if (tryLockPhase()) {
1096                                     if (top == s && array == a &&
1097                                         QA.compareAndSet(a, k, t, null)) {
1098                                         top = s - 1;
1099                                         v = t;
1100                                     }
1101                                     releasePhaseLock();
1102                                 }
1103                                 break;
1104                             }
1105                             else {
1106                                 if (QA.compareAndSet(a, k, t, null)) {
1107                                     top = s - 1;
1108                                     v = t;
1109                                 }
1110                                 break;
1111                             }
1112                         }
1113                     }
1114                     if (v != null)
1115                         v.doExec();
1116                     if ((status = task.status) < 0 || v == null ||
1117                         (limit != 0 && --limit == 0))
1118                         break;
1119                 }
1120             }
1121             return status;
1122         }
1123 









1124         /**
1125          * Tries to poll and run AsynchronousCompletionTasks until
1126          * none found or blocker is released
1127          *
1128          * @param blocker the blocker



















1129          */
1130         final void helpAsyncBlocker(ManagedBlocker blocker) {
1131             if (blocker != null) {
1132                 int b, k, cap; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
1133                 while ((a = array) != null && (cap = a.length) > 0 &&
1134                        top - (b = base) > 0) {
1135                     t = (ForkJoinTask<?>)QA.getAcquire(a, k = (cap - 1) & b);
1136                     if (blocker.isReleasable())








1137                         break;
1138                     else if (base == b++ && t != null) {
1139                         if (!(t instanceof CompletableFuture.
1140                               AsynchronousCompletionTask))









1141                             break;
1142                         else if (QA.compareAndSet(a, k, t, null)) {
1143                             BASE.setOpaque(this, b);
1144                             t.doExec();
1145                         }
1146                     }
1147                 }
1148             }






1149         }
1150 
1151         /**
1152          * Returns true if owned and not known to be blocked.
1153          */
1154         final boolean isApparentlyUnblocked() {
1155             Thread wt; Thread.State s;
1156             return ((wt = owner) != null &&
1157                     (s = wt.getState()) != Thread.State.BLOCKED &&
1158                     s != Thread.State.WAITING &&
1159                     s != Thread.State.TIMED_WAITING);
1160         }
1161 
1162         // VarHandle mechanics.
1163         static final VarHandle PHASE;
1164         static final VarHandle BASE;
1165         static final VarHandle TOP;
1166         static {
1167             try {
1168                 MethodHandles.Lookup l = MethodHandles.lookup();
1169                 PHASE = l.findVarHandle(WorkQueue.class, "phase", int.class);
1170                 BASE = l.findVarHandle(WorkQueue.class, "base", int.class);
1171                 TOP = l.findVarHandle(WorkQueue.class, "top", int.class);
1172             } catch (ReflectiveOperationException e) {
1173                 throw new ExceptionInInitializerError(e);
1174             }
1175         }
1176     }
1177 
1178     // static fields (initialized in static initializer below)
1179 
1180     /**
1181      * Creates a new ForkJoinWorkerThread. This factory is used unless
1182      * overridden in ForkJoinPool constructors.
1183      */
1184     public static final ForkJoinWorkerThreadFactory
1185         defaultForkJoinWorkerThreadFactory;
1186 
1187     /**
1188      * Permission required for callers of methods that may start or
1189      * kill threads.
1190      */
1191     static final RuntimePermission modifyThreadPermission;
1192 
1193     /**


1350                        (TC_MASK & (c + TC_UNIT)));
1351             if (ctl == c && CTL.compareAndSet(this, c, nc)) {
1352                 createWorker();
1353                 break;
1354             }
1355         } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
1356     }
1357 
1358     /**
1359      * Callback from ForkJoinWorkerThread constructor to establish and
1360      * record its WorkQueue.
1361      *
1362      * @param wt the worker thread
1363      * @return the worker's queue
1364      */
1365     final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
1366         UncaughtExceptionHandler handler;
1367         wt.setDaemon(true);                             // configure thread
1368         if ((handler = ueh) != null)
1369             wt.setUncaughtExceptionHandler(handler);

1370         int tid = 0;                                    // for thread name
1371         int idbits = mode & FIFO;
1372         String prefix = workerNamePrefix;
1373         WorkQueue w = new WorkQueue(this, wt);
1374         if (prefix != null) {
1375             synchronized (prefix) {
1376                 WorkQueue[] ws = workQueues; int n;
1377                 int s = indexSeed += SEED_INCREMENT;
1378                 idbits |= (s & ~(SMASK | FIFO | DORMANT));
1379                 if (ws != null && (n = ws.length) > 1) {
1380                     int m = n - 1;
1381                     tid = m & ((s << 1) | 1);           // odd-numbered indices

1382                     for (int probes = n >>> 1;;) {      // find empty slot
1383                         WorkQueue q;
1384                         if ((q = ws[tid]) == null || q.phase == QUIET)
1385                             break;
1386                         else if (--probes == 0) {
1387                             tid = n | 1;                // resize below
1388                             break;
1389                         }
1390                         else
1391                             tid = (tid + 2) & m;
1392                     }
1393                     w.phase = w.id = tid | idbits;      // now publishable
1394 
1395                     if (tid < n)
1396                         ws[tid] = w;



1397                     else {                              // expand array
1398                         int an = n << 1;
1399                         WorkQueue[] as = new WorkQueue[an];
1400                         as[tid] = w;
1401                         int am = an - 1;
1402                         for (int j = 0; j < n; ++j) {
1403                             WorkQueue v;                // copy external queue
1404                             if ((v = ws[j]) != null)    // position may change
1405                                 as[v.id & am & SQMASK] = v;
1406                             if (++j >= n)
1407                                 break;
1408                             as[j] = ws[j];              // copy worker
1409                         }
1410                         workQueues = as;
1411                     }
1412                 }
1413             }
1414             wt.setName(prefix.concat(Integer.toString(tid)));
1415         }
1416         return w;
1417     }
1418 
1419     /**
1420      * Final callback from terminating worker, as well as upon failure
1421      * to construct or start a worker.  Removes record of worker from
1422      * array, and adjusts counts. If pool is shutting down, tries to
1423      * complete termination.
1424      *
1425      * @param wt the worker thread, or null if construction failed
1426      * @param ex the exception causing failure, or null if none
1427      */
1428     final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
1429         WorkQueue w = null;
1430         int phase = 0;
1431         if (wt != null && (w = wt.workQueue) != null) {
1432             Object lock = workerNamePrefix;
1433             int wid = w.id;
1434             long ns = (long)w.nsteals & 0xffffffffL;

1435             if (lock != null) {

1436                 synchronized (lock) {
1437                     WorkQueue[] ws; int n, i;         // remove index from array
1438                     if ((ws = workQueues) != null && (n = ws.length) > 0 &&
1439                         ws[i = wid & (n - 1)] == w)
1440                         ws[i] = null;
1441                     stealCount += ns;
1442                 }
1443             }
1444             phase = w.phase;
1445         }
1446         if (phase != QUIET) {                         // else pre-adjusted
1447             long c;                                   // decrement counts
1448             do {} while (!CTL.weakCompareAndSet
1449                          (this, c = ctl, ((RC_MASK & (c - RC_UNIT)) |
1450                                           (TC_MASK & (c - TC_UNIT)) |
1451                                           (SP_MASK & c))));
1452         }
1453         if (w != null)
1454             w.cancelAll();                            // cancel remaining tasks
1455 
1456         if (!tryTerminate(false, false) &&            // possibly replace worker
1457             w != null && w.array != null)             // avoid repeated failures
1458             signalWork();
1459 
1460         if (ex == null)                               // help clean on way out


1472             if ((c = ctl) >= 0L)                      // enough workers
1473                 break;
1474             else if ((sp = (int)c) == 0) {            // no idle workers
1475                 if ((c & ADD_WORKER) != 0L)           // too few workers
1476                     tryAddWorker(c);
1477                 break;
1478             }
1479             else if ((ws = workQueues) == null)
1480                 break;                                // unstarted/terminated
1481             else if (ws.length <= (i = sp & SMASK))
1482                 break;                                // terminated
1483             else if ((v = ws[i]) == null)
1484                 break;                                // terminating
1485             else {
1486                 int np = sp & ~UNSIGNALLED;
1487                 int vp = v.phase;
1488                 long nc = (v.stackPred & SP_MASK) | (UC_MASK & (c + RC_UNIT));
1489                 Thread vt = v.owner;
1490                 if (sp == vp && CTL.compareAndSet(this, c, nc)) {
1491                     v.phase = np;
1492                     if (vt != null && v.source < 0)
1493                         LockSupport.unpark(vt);
1494                     break;
1495                 }
1496             }
1497         }
1498     }
1499 
1500     /**
1501      * Tries to decrement counts (sometimes implicitly) and possibly
1502      * arrange for a compensating worker in preparation for blocking:
1503      * If not all core workers yet exist, creates one, else if any are
1504      * unreleased (possibly including caller) releases one, else if
1505      * fewer than the minimum allowed number of workers running,
1506      * checks to see that they are all active, and if so creates an
1507      * extra worker unless over maximum limit and policy is to
1508      * saturate.  Most of these steps can fail due to interference, in
1509      * which case 0 is returned so caller will retry. A negative
1510      * return value indicates that the caller doesn't need to
1511      * re-adjust counts when later unblocked.
1512      *
1513      * @return 1: block then adjust, -1: block without adjust, 0 : retry
1514      */
1515     private int tryCompensate(WorkQueue w) {
1516         int t, n, sp;
1517         long c = ctl;
1518         WorkQueue[] ws = workQueues;
1519         if ((t = (short)(c >>> TC_SHIFT)) >= 0) {
1520             if (ws == null || (n = ws.length) <= 0 || w == null)
1521                 return 0;                        // disabled
1522             else if ((sp = (int)c) != 0) {       // replace or release
1523                 WorkQueue v = ws[sp & (n - 1)];
1524                 int wp = w.phase;
1525                 long uc = UC_MASK & ((wp < 0) ? c + RC_UNIT : c);
1526                 int np = sp & ~UNSIGNALLED;
1527                 if (v != null) {
1528                     int vp = v.phase;
1529                     Thread vt = v.owner;
1530                     long nc = ((long)v.stackPred & SP_MASK) | uc;
1531                     if (vp == sp && CTL.compareAndSet(this, c, nc)) {
1532                         v.phase = np;
1533                         if (vt != null && v.source < 0)
1534                             LockSupport.unpark(vt);
1535                         return (wp < 0) ? -1 : 1;
1536                     }
1537                 }
1538                 return 0;
1539             }
1540             else if ((int)(c >> RC_SHIFT) -      // reduce parallelism
1541                      (short)(bounds & SMASK) > 0) {
1542                 long nc = ((RC_MASK & (c - RC_UNIT)) | (~RC_MASK & c));
1543                 return CTL.compareAndSet(this, c, nc) ? 1 : 0;
1544             }
1545             else {                               // validate
1546                 int md = mode, pc = md & SMASK, tc = pc + t, bc = 0;
1547                 boolean unstable = false;
1548                 for (int i = 1; i < n; i += 2) {
1549                     WorkQueue q; Thread wt; Thread.State ts;
1550                     if ((q = ws[i]) != null) {
1551                         if (q.source == 0) {
1552                             unstable = true;
1553                             break;


1570                     else if (bc < pc) {          // lagging
1571                         Thread.yield();          // for retry spins
1572                         return 0;
1573                     }
1574                     else
1575                         throw new RejectedExecutionException(
1576                             "Thread limit exceeded replacing blocked worker");
1577                 }
1578             }
1579         }
1580 
1581         long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK); // expand pool
1582         return CTL.compareAndSet(this, c, nc) && createWorker() ? 1 : 0;
1583     }
1584 
1585     /**
1586      * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
1587      * See above for explanation.
1588      */
1589     final void runWorker(WorkQueue w) {
1590         int r = (w.id ^ ThreadLocalRandom.nextSecondarySeed()) | FIFO; // rng
1591         w.array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY]; // initialize
1592         for (;;) {










































1593             int phase;
1594             if (scan(w, r)) {                     // scan until apparently empty
1595                 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // move (xorshift)
1596             }
1597             else if ((phase = w.phase) >= 0) {    // enqueue, then rescan
1598                 long np = (w.phase = (phase + SS_SEQ) | UNSIGNALLED) & SP_MASK;
1599                 long c, nc;
1600                 do {
1601                     w.stackPred = (int)(c = ctl);
1602                     nc = ((c - RC_UNIT) & UC_MASK) | np;
1603                 } while (!CTL.weakCompareAndSet(this, c, nc));
1604             }
1605             else {                                // already queued
1606                 int pred = w.stackPred;
1607                 Thread.interrupted();             // clear before park
1608                 w.source = DORMANT;               // enable signal
1609                 long c = ctl;
1610                 int md = mode, rc = (md & SMASK) + (int)(c >> RC_SHIFT);
1611                 if (md < 0)                       // terminating

1612                     break;
1613                 else if (rc <= 0 && (md & SHUTDOWN) != 0 &&





1614                          tryTerminate(false, false))
1615                     break;                        // quiescent shutdown


1616                 else if (rc <= 0 && pred != 0 && phase == (int)c) {
1617                     long nc = (UC_MASK & (c - TC_UNIT)) | (SP_MASK & pred);
1618                     long d = keepAlive + System.currentTimeMillis();
1619                     LockSupport.parkUntil(this, d);
1620                     if (ctl == c &&               // drop on timeout if all idle
1621                         d - System.currentTimeMillis() <= TIMEOUT_SLOP &&
1622                         CTL.compareAndSet(this, c, nc)) {


1623                         w.phase = QUIET;
1624                         break;
1625                     }
1626                 }
1627                 else if (w.phase < 0)
1628                     LockSupport.park(this);       // OK if spuriously woken
1629                 w.source = 0;                     // disable signal
1630             }


1631         }
1632     }
1633 
1634     /**
1635      * Scans for and if found executes one or more top-level tasks from a queue.
1636      *
1637      * @return true if found an apparently non-empty queue, and
1638      * possibly ran task(s).
1639      */
1640     private boolean scan(WorkQueue w, int r) {
1641         WorkQueue[] ws; int n;
1642         if ((ws = workQueues) != null && (n = ws.length) > 0 && w != null) {
1643             for (int m = n - 1, j = r & m;;) {
1644                 WorkQueue q; int b;
1645                 if ((q = ws[j]) != null && q.top != (b = q.base)) {
1646                     int qid = q.id;
1647                     ForkJoinTask<?>[] a; int cap, k; ForkJoinTask<?> t;
1648                     if ((a = q.array) != null && (cap = a.length) > 0) {
1649                         t = (ForkJoinTask<?>)QA.getAcquire(a, k = (cap - 1) & b);
1650                         if (q.base == b++ && t != null &&
1651                             QA.compareAndSet(a, k, t, null)) {
1652                             q.base = b;
1653                             w.source = qid;
1654                             if (q.top - b > 0)
1655                                 signalWork();
1656                             w.topLevelExec(t, q,  // random fairness bound
1657                                            r & ((n << TOP_BOUND_SHIFT) - 1));
1658                         }
1659                     }
1660                     return true;
1661                 }
1662                 else if (--n > 0)
1663                     j = (j + 1) & m;
1664                 else
1665                     break;
1666             }
1667         }
1668         return false;
1669     }
1670 
1671     /**
1672      * Helps and/or blocks until the given task is done or timeout.
1673      * First tries locally helping, then scans other queues for a task
1674      * produced by one of w's stealers; compensating and blocking if
1675      * none are found (rescanning if tryCompensate fails).
1676      *
1677      * @param w caller
1678      * @param task the task
1679      * @param deadline for timed waits, if nonzero
1680      * @return task status on exit
1681      */
1682     final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
1683         int s = 0;
1684         int seed = ThreadLocalRandom.nextSecondarySeed();
1685         if (w != null && task != null &&
1686             (!(task instanceof CountedCompleter) ||
1687              (s = w.helpCC((CountedCompleter<?>)task, 0, false)) >= 0)) {
1688             w.tryRemoveAndExec(task);
1689             int src = w.source, id = w.id;
1690             int r = (seed >>> 16) | 1, step = (seed & ~1) | 2;
1691             s = task.status;
1692             while (s >= 0) {
1693                 WorkQueue[] ws;
1694                 int n = (ws = workQueues) == null ? 0 : ws.length, m = n - 1;
1695                 while (n > 0) {
1696                     WorkQueue q; int b;
1697                     if ((q = ws[r & m]) != null && q.source == id &&
1698                         q.top != (b = q.base)) {
1699                         ForkJoinTask<?>[] a; int cap, k;



1700                         int qid = q.id;
1701                         if ((a = q.array) != null && (cap = a.length) > 0) {
1702                             ForkJoinTask<?> t = (ForkJoinTask<?>)
1703                                 QA.getAcquire(a, k = (cap - 1) & b);
1704                             if (q.source == id && q.base == b++ &&
1705                                 t != null && QA.compareAndSet(a, k, t, null)) {
1706                                 q.base = b;
1707                                 w.source = qid;
1708                                 t.doExec();
1709                                 w.source = src;
1710                             }
1711                         }
1712                         break;
1713                     }
1714                     else {
1715                         r += step;
1716                         --n;
1717                     }
1718                 }
1719                 if ((s = task.status) < 0)
1720                     break;
1721                 else if (n == 0) { // empty scan
1722                     long ms, ns; int block;
1723                     if (deadline == 0L)
1724                         ms = 0L;                       // untimed
1725                     else if ((ns = deadline - System.nanoTime()) <= 0L)
1726                         break;                         // timeout
1727                     else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
1728                         ms = 1L;                       // avoid 0 for timed wait
1729                     if ((block = tryCompensate(w)) != 0) {
1730                         task.internalWait(ms);
1731                         CTL.getAndAdd(this, (block > 0) ? RC_UNIT : 0L);
1732                     }
1733                     s = task.status;
1734                 }
1735             }
1736         }
1737         return s;
1738     }
1739 
1740     /**
1741      * Runs tasks until {@code isQuiescent()}. Rather than blocking
1742      * when tasks cannot be found, rescans until all others cannot
1743      * find tasks either.
1744      */
1745     final void helpQuiescePool(WorkQueue w) {
1746         int prevSrc = w.source;
1747         int seed = ThreadLocalRandom.nextSecondarySeed();
1748         int r = seed >>> 16, step = r | 1;
1749         for (int source = prevSrc, released = -1;;) { // -1 until known
1750             ForkJoinTask<?> localTask; WorkQueue[] ws;
1751             while ((localTask = w.nextLocalTask()) != null)
1752                 localTask.doExec();
1753             if (w.phase >= 0 && released == -1)


1754                 released = 1;
1755             boolean quiet = true, empty = true;
1756             int n = (ws = workQueues) == null ? 0 : ws.length;
1757             for (int m = n - 1; n > 0; r += step, --n) {
1758                 WorkQueue q; int b;
1759                 if ((q = ws[r & m]) != null) {
1760                     int qs = q.source;
1761                     if (q.top != (b = q.base)) {
1762                         quiet = empty = false;
1763                         ForkJoinTask<?>[] a; int cap, k;
1764                         int qid = q.id;
1765                         if ((a = q.array) != null && (cap = a.length) > 0) {
1766                             if (released == 0) {    // increment
1767                                 released = 1;
1768                                 CTL.getAndAdd(this, RC_UNIT);
1769                             }

1770                             ForkJoinTask<?> t = (ForkJoinTask<?>)
1771                                 QA.getAcquire(a, k = (cap - 1) & b);
1772                             if (q.base == b++ && t != null &&
1773                                 QA.compareAndSet(a, k, t, null)) {
1774                                 q.base = b;
1775                                 w.source = qid;
1776                                 t.doExec();
1777                                 w.source = source = prevSrc;
1778                             }
1779                         }
1780                         break;
1781                     }
1782                     else if ((qs & QUIET) == 0)
1783                         quiet = false;
1784                 }
1785             }

1786             if (quiet) {
1787                 if (released == 0)
1788                     CTL.getAndAdd(this, RC_UNIT);
1789                 w.source = prevSrc;
1790                 break;
1791             }
1792             else if (empty) {
1793                 if (source != QUIET)
1794                     w.source = source = QUIET;
1795                 if (released == 1) {                 // decrement
1796                     released = 0;
1797                     CTL.getAndAdd(this, RC_MASK & -RC_UNIT);
1798                 }
1799             }
1800         }
1801     }
1802 
1803     /**
1804      * Scans for and returns a polled task, if available.
1805      * Used only for untracked polls.
1806      *
1807      * @param submissionsOnly if true, only scan submission queues
1808      */
1809     private ForkJoinTask<?> pollScan(boolean submissionsOnly) {
1810         WorkQueue[] ws; int n;
1811         rescan: while ((mode & STOP) == 0 && (ws = workQueues) != null &&
1812                       (n = ws.length) > 0) {
1813             int m = n - 1;
1814             int r = ThreadLocalRandom.nextSecondarySeed();
1815             int h = r >>> 16;
1816             int origin, step;
1817             if (submissionsOnly) {
1818                 origin = (r & ~1) & m;         // even indices and steps
1819                 step = (h & ~1) | 2;
1820             }
1821             else {
1822                 origin = r & m;
1823                 step = h | 1;
1824             }
1825             boolean nonempty = false;
1826             for (int i = origin, oldSum = 0, checkSum = 0;;) {
1827                 WorkQueue q;
1828                 if ((q = ws[i]) != null) {
1829                     int b; ForkJoinTask<?> t;
1830                     if (q.top - (b = q.base) > 0) {
1831                         nonempty = true;
1832                         if ((t = q.poll()) != null)




1833                             return t;
1834                     }
1835                     else
1836                         checkSum += b + q.id;

1837                 }
1838                 if ((i = (i + step) & m) == origin) {
1839                     if (!nonempty && oldSum == (oldSum = checkSum))
1840                         break rescan;
1841                     checkSum = 0;
1842                     nonempty = false;
1843                 }
1844             }
1845         }
1846         return null;
1847     }
1848 
1849     /**
1850      * Gets and removes a local or stolen task for the given worker.
1851      *
1852      * @return a task, if available
1853      */
1854     final ForkJoinTask<?> nextTaskFor(WorkQueue w) {
1855         ForkJoinTask<?> t;
1856         if (w == null || (t = w.nextLocalTask()) == null)
1857             t = pollScan(false);
1858         return t;


1859     }
1860 
1861     // External operations
1862 
1863     /**
1864      * Adds the given task to a submission queue at submitter's
1865      * current queue, creating one if null or contended.
1866      *
1867      * @param task the task. Caller must ensure non-null.
1868      */
1869     final void externalPush(ForkJoinTask<?> task) {
1870         int r;                                // initialize caller's probe
1871         if ((r = ThreadLocalRandom.getProbe()) == 0) {
1872             ThreadLocalRandom.localInit();
1873             r = ThreadLocalRandom.getProbe();
1874         }
1875         for (;;) {
1876             WorkQueue q;
1877             int md = mode, n;
1878             WorkQueue[] ws = workQueues;
1879             if ((md & SHUTDOWN) != 0 || ws == null || (n = ws.length) <= 0)
1880                 throw new RejectedExecutionException();
1881             else if ((q = ws[(n - 1) & r & SQMASK]) == null) { // add queue




1882                 int qid = (r | QUIET) & ~(FIFO | OWNED);
1883                 Object lock = workerNamePrefix;
1884                 ForkJoinTask<?>[] qa =
1885                     new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
1886                 q = new WorkQueue(this, null);
1887                 q.array = qa;
1888                 q.id = qid;
1889                 q.source = QUIET;
1890                 if (lock != null) {     // unless disabled, lock pool to install
1891                     synchronized (lock) {
1892                         WorkQueue[] vs; int i, vn;
1893                         if ((vs = workQueues) != null && (vn = vs.length) > 0 &&
1894                             vs[i = qid & (vn - 1) & SQMASK] == null)
1895                             vs[i] = q;  // else another thread already installed
































1896                     }
1897                 }


1898             }
1899             else if (!q.tryLockPhase()) // move if busy
1900                 r = ThreadLocalRandom.advanceProbe(r);
1901             else {
1902                 if (q.lockedPush(task))
1903                     signalWork();
1904                 return;
1905             }
1906         }
1907     }
1908 
1909     /**
1910      * Pushes a possibly-external submission.
1911      */
1912     private <T> ForkJoinTask<T> externalSubmit(ForkJoinTask<T> task) {
1913         Thread t; ForkJoinWorkerThread w; WorkQueue q;
1914         if (task == null)
1915             throw new NullPointerException();
1916         if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
1917             (w = (ForkJoinWorkerThread)t).pool == this &&
1918             (q = w.workQueue) != null)
1919             q.push(task);
1920         else
1921             externalPush(task);
1922         return task;
1923     }
1924 


1926      * Returns common pool queue for an external thread.
1927      */
1928     static WorkQueue commonSubmitterQueue() {
1929         ForkJoinPool p = common;
1930         int r = ThreadLocalRandom.getProbe();
1931         WorkQueue[] ws; int n;
1932         return (p != null && (ws = p.workQueues) != null &&
1933                 (n = ws.length) > 0) ?
1934             ws[(n - 1) & r & SQMASK] : null;
1935     }
1936 
1937     /**
1938      * Performs tryUnpush for an external submitter.
1939      */
1940     final boolean tryExternalUnpush(ForkJoinTask<?> task) {
1941         int r = ThreadLocalRandom.getProbe();
1942         WorkQueue[] ws; WorkQueue w; int n;
1943         return ((ws = workQueues) != null &&
1944                 (n = ws.length) > 0 &&
1945                 (w = ws[(n - 1) & r & SQMASK]) != null &&
1946                 w.tryLockedUnpush(task));
1947     }
1948 
1949     /**
1950      * Performs helpComplete for an external submitter.
1951      */
1952     final int externalHelpComplete(CountedCompleter<?> task, int maxTasks) {
1953         int r = ThreadLocalRandom.getProbe();
1954         WorkQueue[] ws; WorkQueue w; int n;
1955         return ((ws = workQueues) != null && (n = ws.length) > 0 &&
1956                 (w = ws[(n - 1) & r & SQMASK]) != null) ?
1957             w.helpCC(task, maxTasks, true) : 0;
1958     }
1959 
1960     /**
1961      * Tries to steal and run tasks within the target's computation.
1962      * The maxTasks argument supports external usages; internal calls
1963      * use zero, allowing unbounded steps (external calls trap
1964      * non-positive values).
1965      *
1966      * @param w caller
1967      * @param maxTasks if non-zero, the maximum number of other tasks to run
1968      * @return task status on exit
1969      */
1970     final int helpComplete(WorkQueue w, CountedCompleter<?> task,
1971                            int maxTasks) {
1972         return (w == null) ? 0 : w.helpCC(task, maxTasks, false);
1973     }
1974 
1975     /**
1976      * Returns a cheap heuristic guide for task partitioning when
1977      * programmers, frameworks, tools, or languages have little or no
1978      * idea about task granularity.  In essence, by offering this
1979      * method, we ask users only about tradeoffs in overhead vs
1980      * expected throughput and its variance, rather than how finely to
1981      * partition tasks.
1982      *
1983      * In a steady state strict (tree-structured) computation, each
1984      * thread makes available for stealing enough tasks for other
1985      * threads to remain active. Inductively, if all threads play by
1986      * the same rules, each thread should make available only a
1987      * constant number of tasks.
1988      *
1989      * The minimum useful constant is just 1. But using a value of 1
1990      * would require immediate replenishment upon each steal to
1991      * maintain enough tasks, which is infeasible.  Further,
1992      * partitionings/granularities of offered tasks should minimize


2043      */
2044     private boolean tryTerminate(boolean now, boolean enable) {
2045         int md; // 3 phases: try to set SHUTDOWN, then STOP, then TERMINATED
2046 
2047         while (((md = mode) & SHUTDOWN) == 0) {
2048             if (!enable || this == common)        // cannot shutdown
2049                 return false;
2050             else
2051                 MODE.compareAndSet(this, md, md | SHUTDOWN);
2052         }
2053 
2054         while (((md = mode) & STOP) == 0) {       // try to initiate termination
2055             if (!now) {                           // check if quiescent & empty
2056                 for (long oldSum = 0L;;) {        // repeat until stable
2057                     boolean running = false;
2058                     long checkSum = ctl;
2059                     WorkQueue[] ws = workQueues;
2060                     if ((md & SMASK) + (int)(checkSum >> RC_SHIFT) > 0)
2061                         running = true;
2062                     else if (ws != null) {
2063                         WorkQueue w;
2064                         for (int i = 0; i < ws.length; ++i) {
2065                             if ((w = ws[i]) != null) {
2066                                 int s = w.source, p = w.phase;
2067                                 int d = w.id, b = w.base;
2068                                 if (b != w.top ||
2069                                     ((d & 1) == 1 && (s >= 0 || p >= 0))) {
2070                                     running = true;
2071                                     break;     // working, scanning, or have work
2072                                 }
2073                                 checkSum += (((long)s << 48) + ((long)p << 32) +
2074                                              ((long)b << 16) + (long)d);
2075                             }
2076                         }
2077                     }
2078                     if (((md = mode) & STOP) != 0)
2079                         break;                 // already triggered
2080                     else if (running)
2081                         return false;
2082                     else if (workQueues == ws && oldSum == (oldSum = checkSum))
2083                         break;
2084                 }
2085             }
2086             if ((md & STOP) == 0)
2087                 MODE.compareAndSet(this, md, md | STOP);
2088         }
2089 
2090         while (((md = mode) & TERMINATED) == 0) { // help terminate others
2091             for (long oldSum = 0L;;) {            // repeat until stable
2092                 WorkQueue[] ws; WorkQueue w;
2093                 long checkSum = ctl;
2094                 if ((ws = workQueues) != null) {
2095                     for (int i = 0; i < ws.length; ++i) {
2096                         if ((w = ws[i]) != null) {
2097                             ForkJoinWorkerThread wt = w.owner;
2098                             w.cancelAll();        // clear queues
2099                             if (wt != null) {
2100                                 try {             // unblock join or park
2101                                     wt.interrupt();
2102                                 } catch (Throwable ignore) {
2103                                 }
2104                             }
2105                             checkSum += ((long)w.phase << 32) + w.base;
2106                         }
2107                     }
2108                 }
2109                 if (((md = mode) & TERMINATED) != 0 ||
2110                     (workQueues == ws && oldSum == (oldSum = checkSum)))
2111                     break;
2112             }
2113             if ((md & TERMINATED) != 0)
2114                 break;
2115             else if ((md & SMASK) + (short)(ctl >>> TC_SHIFT) > 0)
2116                 break;
2117             else if (MODE.compareAndSet(this, md, md | TERMINATED)) {
2118                 synchronized (this) {
2119                     notifyAll();                  // for awaitTermination
2120                 }
2121                 break;
2122             }
2123         }
2124         return true;
2125     }


2578 
2579     /**
2580      * Returns {@code true} if this pool uses local first-in-first-out
2581      * scheduling mode for forked tasks that are never joined.
2582      *
2583      * @return {@code true} if this pool uses async mode
2584      */
2585     public boolean getAsyncMode() {
2586         return (mode & FIFO) != 0;
2587     }
2588 
2589     /**
2590      * Returns an estimate of the number of worker threads that are
2591      * not blocked waiting to join tasks or for other managed
2592      * synchronization. This method may overestimate the
2593      * number of running threads.
2594      *
2595      * @return the number of worker threads
2596      */
2597     public int getRunningThreadCount() {

2598         WorkQueue[] ws; WorkQueue w;
2599         VarHandle.acquireFence();
2600         int rc = 0;
2601         if ((ws = workQueues) != null) {
2602             for (int i = 1; i < ws.length; i += 2) {
2603                 if ((w = ws[i]) != null && w.isApparentlyUnblocked())
2604                     ++rc;
2605             }
2606         }
2607         return rc;
2608     }
2609 
2610     /**
2611      * Returns an estimate of the number of threads that are currently
2612      * stealing or executing tasks. This method may overestimate the
2613      * number of active threads.
2614      *
2615      * @return the number of active threads
2616      */
2617     public int getActiveThreadCount() {
2618         int r = (mode & SMASK) + (int)(ctl >> RC_SHIFT);
2619         return (r <= 0) ? 0 : r; // suppress momentarily negative values
2620     }


2628      * idleness of all threads, but will eventually become true if
2629      * threads remain inactive.
2630      *
2631      * @return {@code true} if all threads are currently idle
2632      */
2633     public boolean isQuiescent() {
2634         for (;;) {
2635             long c = ctl;
2636             int md = mode, pc = md & SMASK;
2637             int tc = pc + (short)(c >>> TC_SHIFT);
2638             int rc = pc + (int)(c >> RC_SHIFT);
2639             if ((md & (STOP | TERMINATED)) != 0)
2640                 return true;
2641             else if (rc > 0)
2642                 return false;
2643             else {
2644                 WorkQueue[] ws; WorkQueue v;
2645                 if ((ws = workQueues) != null) {
2646                     for (int i = 1; i < ws.length; i += 2) {
2647                         if ((v = ws[i]) != null) {
2648                             if (v.source > 0)
2649                                 return false;
2650                             --tc;
2651                         }
2652                     }
2653                 }
2654                 if (tc == 0 && ctl == c)
2655                     return true;
2656             }
2657         }
2658     }
2659 
2660     /**
2661      * Returns an estimate of the total number of tasks stolen from
2662      * one thread's work queue by another. The reported value
2663      * underestimates the actual total number of steals when the pool
2664      * is not quiescent. This value may be useful for monitoring and
2665      * tuning fork/join programs: in general, steal counts should be
2666      * high enough to keep threads busy, but low enough to avoid
2667      * overhead and contention across threads.
2668      *


2674         if ((ws = workQueues) != null) {
2675             for (int i = 1; i < ws.length; i += 2) {
2676                 if ((w = ws[i]) != null)
2677                     count += (long)w.nsteals & 0xffffffffL;
2678             }
2679         }
2680         return count;
2681     }
2682 
2683     /**
2684      * Returns an estimate of the total number of tasks currently held
2685      * in queues by worker threads (but not including tasks submitted
2686      * to the pool that have not begun executing). This value is only
2687      * an approximation, obtained by iterating across all threads in
2688      * the pool. This method may be useful for tuning task
2689      * granularities.
2690      *
2691      * @return the number of queued tasks
2692      */
2693     public long getQueuedTaskCount() {

2694         WorkQueue[] ws; WorkQueue w;
2695         VarHandle.acquireFence();
2696         int count = 0;
2697         if ((ws = workQueues) != null) {
2698             for (int i = 1; i < ws.length; i += 2) {
2699                 if ((w = ws[i]) != null)
2700                     count += w.queueSize();
2701             }
2702         }
2703         return count;
2704     }
2705 
2706     /**
2707      * Returns an estimate of the number of tasks submitted to this
2708      * pool that have not yet begun executing.  This method may take
2709      * time proportional to the number of submissions.
2710      *
2711      * @return the number of queued submissions
2712      */
2713     public int getQueuedSubmissionCount() {

2714         WorkQueue[] ws; WorkQueue w;
2715         VarHandle.acquireFence();
2716         int count = 0;
2717         if ((ws = workQueues) != null) {
2718             for (int i = 0; i < ws.length; i += 2) {
2719                 if ((w = ws[i]) != null)
2720                     count += w.queueSize();
2721             }
2722         }
2723         return count;
2724     }
2725 
2726     /**
2727      * Returns {@code true} if there are any tasks submitted to this
2728      * pool that have not yet begun executing.
2729      *
2730      * @return {@code true} if there are any queued submissions
2731      */
2732     public boolean hasQueuedSubmissions() {
2733         WorkQueue[] ws; WorkQueue w;
2734         VarHandle.acquireFence();
2735         if ((ws = workQueues) != null) {
2736             for (int i = 0; i < ws.length; i += 2) {
2737                 if ((w = ws[i]) != null && !w.isEmpty())
2738                     return true;
2739             }
2740         }
2741         return false;
2742     }
2743 
2744     /**
2745      * Removes and returns the next unexecuted submission if one is
2746      * available.  This method may be useful in extensions to this
2747      * class that re-assign work in systems with multiple pools.
2748      *
2749      * @return the next submission, or {@code null} if none
2750      */
2751     protected ForkJoinTask<?> pollSubmission() {
2752         return pollScan(true);
2753     }
2754 
2755     /**
2756      * Removes all available unexecuted submitted and forked tasks
2757      * from scheduling queues and adds them to the given collection,
2758      * without altering their execution status. These may include
2759      * artificially generated or wrapped tasks. This method is
2760      * designed to be invoked only when the pool is known to be
2761      * quiescent. Invocations at other times may not remove all
2762      * tasks. A failure encountered while attempting to add elements
2763      * to collection {@code c} may result in elements being in
2764      * neither, either or both collections when the associated
2765      * exception is thrown.  The behavior of this operation is
2766      * undefined if the specified collection is modified while the
2767      * operation is in progress.
2768      *
2769      * @param c the collection to transfer elements into
2770      * @return the number of elements transferred
2771      */
2772     protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {

2773         WorkQueue[] ws; WorkQueue w; ForkJoinTask<?> t;
2774         VarHandle.acquireFence();
2775         int count = 0;
2776         if ((ws = workQueues) != null) {
2777             for (int i = 0; i < ws.length; ++i) {
2778                 if ((w = ws[i]) != null) {
2779                     while ((t = w.poll()) != null) {
2780                         c.add(t);
2781                         ++count;
2782                     }
2783                 }
2784             }
2785         }
2786         return count;
2787     }
2788 
2789     /**
2790      * Returns a string identifying this pool, as well as its state,
2791      * including indications of run state, parallelism level, and
2792      * worker and task counts.
2793      *
2794      * @return a string identifying this pool, as well as its state
2795      */
2796     public String toString() {
2797         // Use a single pass through workQueues to collect counts
2798         int md = mode; // read volatile fields first
2799         long c = ctl;
2800         long st = stealCount;
2801         long qt = 0L, qs = 0L; int rc = 0;
2802         WorkQueue[] ws; WorkQueue w;
2803         if ((ws = workQueues) != null) {
2804             for (int i = 0; i < ws.length; ++i) {
2805                 if ((w = ws[i]) != null) {
2806                     int size = w.queueSize();
2807                     if ((i & 1) == 0)
2808                         qs += size;
2809                     else {
2810                         qt += size;
2811                         st += (long)w.nsteals & 0xffffffffL;
2812                         if (w.isApparentlyUnblocked())
2813                             ++rc;
2814                     }
2815                 }
2816             }
2817         }
2818 

2819         int pc = (md & SMASK);

2820         int tc = pc + (short)(c >>> TC_SHIFT);
2821         int ac = pc + (int)(c >> RC_SHIFT);
2822         if (ac < 0) // ignore transient negative
2823             ac = 0;
2824         String level = ((md & TERMINATED) != 0 ? "Terminated" :
2825                         (md & STOP)       != 0 ? "Terminating" :
2826                         (md & SHUTDOWN)   != 0 ? "Shutting down" :
2827                         "Running");
2828         return super.toString() +
2829             "[" + level +
2830             ", parallelism = " + pc +
2831             ", size = " + tc +
2832             ", active = " + ac +
2833             ", running = " + rc +
2834             ", steals = " + st +
2835             ", tasks = " + qt +
2836             ", submissions = " + qs +
2837             "]";
2838     }
2839 


3085      * {@code blocker.block()} until either method returns {@code true}.
3086      * Every call to {@code blocker.block()} is preceded by a call to
3087      * {@code blocker.isReleasable()} that returned {@code false}.
3088      *
3089      * <p>If not running in a ForkJoinPool, this method is
3090      * behaviorally equivalent to
3091      * <pre> {@code
3092      * while (!blocker.isReleasable())
3093      *   if (blocker.block())
3094      *     break;}</pre>
3095      *
3096      * If running in a ForkJoinPool, the pool may first be expanded to
3097      * ensure sufficient parallelism available during the call to
3098      * {@code blocker.block()}.
3099      *
3100      * @param blocker the blocker task
3101      * @throws InterruptedException if {@code blocker.block()} did so
3102      */
3103     public static void managedBlock(ManagedBlocker blocker)
3104         throws InterruptedException {
3105         if (blocker == null) throw new NullPointerException();
3106         ForkJoinPool p;
3107         ForkJoinWorkerThread wt;
3108         WorkQueue w;
3109         Thread t = Thread.currentThread();
3110         if ((t instanceof ForkJoinWorkerThread) &&
3111             (p = (wt = (ForkJoinWorkerThread)t).pool) != null &&
3112             (w = wt.workQueue) != null) {
3113             int block;
3114             while (!blocker.isReleasable()) {
3115                 if ((block = p.tryCompensate(w)) != 0) {
3116                     try {
3117                         do {} while (!blocker.isReleasable() &&
3118                                      !blocker.block());
3119                     } finally {
3120                         CTL.getAndAdd(p, (block > 0) ? RC_UNIT : 0L);
3121                     }
3122                     break;
3123                 }
3124             }
3125         }
3126         else {
3127             do {} while (!blocker.isReleasable() &&
3128                          !blocker.block());
3129         }
3130     }
3131 
3132     /**
3133      * If the given executor is a ForkJoinPool, poll and execute
3134      * AsynchronousCompletionTasks from worker's queue until none are
3135      * available or blocker is released.
3136      */
3137     static void helpAsyncBlocker(Executor e, ManagedBlocker blocker) {
3138         if (e instanceof ForkJoinPool) {
3139             WorkQueue w; ForkJoinWorkerThread wt; WorkQueue[] ws; int r, n;
3140             ForkJoinPool p = (ForkJoinPool)e;
3141             Thread thread = Thread.currentThread();
3142             if (thread instanceof ForkJoinWorkerThread &&
3143                 (wt = (ForkJoinWorkerThread)thread).pool == p)
3144                 w = wt.workQueue;
3145             else if ((r = ThreadLocalRandom.getProbe()) != 0 &&
3146                      (ws = p.workQueues) != null && (n = ws.length) > 0)
3147                 w = ws[(n - 1) & r & SQMASK];
3148             else
3149                 w = null;
3150             if (w != null)
3151                 w.helpAsyncBlocker(blocker);


























3152         }
3153     }
3154 
3155     // AbstractExecutorService overrides.  These rely on undocumented
3156     // fact that ForkJoinTask.adapt returns ForkJoinTasks that also
3157     // implement RunnableFuture.
3158 
3159     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
3160         return new ForkJoinTask.AdaptedRunnable<T>(runnable, value);
3161     }
3162 
3163     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
3164         return new ForkJoinTask.AdaptedCallable<T>(callable);
3165     }
3166 
3167     // VarHandle mechanics
3168     private static final VarHandle CTL;
3169     private static final VarHandle MODE;
3170     static final VarHandle QA;
3171 
3172     static {
3173         try {
3174             MethodHandles.Lookup l = MethodHandles.lookup();
3175             CTL = l.findVarHandle(ForkJoinPool.class, "ctl", long.class);
3176             MODE = l.findVarHandle(ForkJoinPool.class, "mode", int.class);
3177             QA = MethodHandles.arrayElementVarHandle(ForkJoinTask[].class);
3178         } catch (ReflectiveOperationException e) {
3179             throw new ExceptionInInitializerError(e);
3180         }
3181 
3182         // Reduce the risk of rare disastrous classloading in first call to
3183         // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
3184         Class<?> ensureLoaded = LockSupport.class;
3185 
3186         int commonMaxSpares = DEFAULT_COMMON_MAX_SPARES;
3187         try {
3188             String p = System.getProperty
3189                 ("java.util.concurrent.ForkJoinPool.common.maximumSpares");
3190             if (p != null)
3191                 commonMaxSpares = Integer.parseInt(p);
3192         } catch (Exception ignore) {}
3193         COMMON_MAX_SPARES = commonMaxSpares;
3194 
3195         defaultForkJoinWorkerThreadFactory =
3196             new DefaultForkJoinWorkerThreadFactory();
3197         modifyThreadPermission = new RuntimePermission("modifyThread");
3198 
3199         common = AccessController.doPrivileged(new PrivilegedAction<>() {


< prev index next >