1 /*
   2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   3  *
   4  * This code is free software; you can redistribute it and/or modify it
   5  * under the terms of the GNU General Public License version 2 only, as
   6  * published by the Free Software Foundation.  Oracle designates this
   7  * particular file as subject to the "Classpath" exception as provided
   8  * by Oracle in the LICENSE file that accompanied this code.
   9  *
  10  * This code is distributed in the hope that it will be useful, but WITHOUT
  11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  13  * version 2 for more details (a copy is included in the LICENSE file that
  14  * accompanied this code).
  15  *
  16  * You should have received a copy of the GNU General Public License version
  17  * 2 along with this work; if not, write to the Free Software Foundation,
  18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  19  *
  20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  21  * or visit www.oracle.com if you need additional information or have any
  22  * questions.
  23  */
  24 
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/publicdomain/zero/1.0/
  34  */
  35 
  36 package java.util.concurrent;
  37 
  38 import java.lang.Thread.UncaughtExceptionHandler;
  39 import java.util.ArrayList;
  40 import java.util.Arrays;
  41 import java.util.Collection;
  42 import java.util.Collections;
  43 import java.util.List;
  44 import java.util.concurrent.AbstractExecutorService;
  45 import java.util.concurrent.Callable;
  46 import java.util.concurrent.ExecutorService;
  47 import java.util.concurrent.Future;
  48 import java.util.concurrent.RejectedExecutionException;
  49 import java.util.concurrent.RunnableFuture;
  50 import java.util.concurrent.TimeUnit;
  51 
  52 /**
  53  * An {@link ExecutorService} for running {@link ForkJoinTask}s.
  54  * A {@code ForkJoinPool} provides the entry point for submissions
  55  * from non-{@code ForkJoinTask} clients, as well as management and
  56  * monitoring operations.
  57  *
  58  * <p>A {@code ForkJoinPool} differs from other kinds of {@link
  59  * ExecutorService} mainly by virtue of employing
  60  * <em>work-stealing</em>: all threads in the pool attempt to find and
  61  * execute tasks submitted to the pool and/or created by other active
  62  * tasks (eventually blocking waiting for work if none exist). This
  63  * enables efficient processing when most tasks spawn other subtasks
  64  * (as do most {@code ForkJoinTask}s), as well as when many small
  65  * tasks are submitted to the pool from external clients.  Especially
  66  * when setting <em>asyncMode</em> to true in constructors, {@code
  67  * ForkJoinPool}s may also be appropriate for use with event-style
  68  * tasks that are never joined.
  69  *
  70  * <p>A static {@link #commonPool()} is available and appropriate for
  71  * most applications. The common pool is used by any ForkJoinTask that
  72  * is not explicitly submitted to a specified pool. Using the common
  73  * pool normally reduces resource usage (its threads are slowly
  74  * reclaimed during periods of non-use, and reinstated upon subsequent
  75  * use).
  76  *
  77  * <p>For applications that require separate or custom pools, a {@code
  78  * ForkJoinPool} may be constructed with a given target parallelism
  79  * level; by default, equal to the number of available processors. The
  80  * pool attempts to maintain enough active (or available) threads by
  81  * dynamically adding, suspending, or resuming internal worker
  82  * threads, even if some tasks are stalled waiting to join
  83  * others. However, no such adjustments are guaranteed in the face of
  84  * blocked I/O or other unmanaged synchronization. The nested {@link
  85  * ManagedBlocker} interface enables extension of the kinds of
  86  * synchronization accommodated.
  87  *
  88  * <p>In addition to execution and lifecycle control methods, this
  89  * class provides status check methods (for example
  90  * {@link #getStealCount}) that are intended to aid in developing,
  91  * tuning, and monitoring fork/join applications. Also, method
  92  * {@link #toString} returns indications of pool state in a
  93  * convenient form for informal monitoring.
  94  *
  95  * <p>As is the case with other ExecutorServices, there are three
  96  * main task execution methods summarized in the following table.
  97  * These are designed to be used primarily by clients not already
  98  * engaged in fork/join computations in the current pool.  The main
  99  * forms of these methods accept instances of {@code ForkJoinTask},
 100  * but overloaded forms also allow mixed execution of plain {@code
 101  * Runnable}- or {@code Callable}- based activities as well.  However,
 102  * tasks that are already executing in a pool should normally instead
 103  * use the within-computation forms listed in the table unless using
 104  * async event-style tasks that are not usually joined, in which case
 105  * there is little difference among choice of methods.
 106  *
 107  * <table BORDER CELLPADDING=3 CELLSPACING=1>
 108  * <caption>Summary of task execution methods</caption>
 109  *  <tr>
 110  *    <td></td>
 111  *    <td ALIGN=CENTER> <b>Call from non-fork/join clients</b></td>
 112  *    <td ALIGN=CENTER> <b>Call from within fork/join computations</b></td>
 113  *  </tr>
 114  *  <tr>
 115  *    <td> <b>Arrange async execution</b></td>
 116  *    <td> {@link #execute(ForkJoinTask)}</td>
 117  *    <td> {@link ForkJoinTask#fork}</td>
 118  *  </tr>
 119  *  <tr>
 120  *    <td> <b>Await and obtain result</b></td>
 121  *    <td> {@link #invoke(ForkJoinTask)}</td>
 122  *    <td> {@link ForkJoinTask#invoke}</td>
 123  *  </tr>
 124  *  <tr>
 125  *    <td> <b>Arrange exec and obtain Future</b></td>
 126  *    <td> {@link #submit(ForkJoinTask)}</td>
 127  *    <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td>
 128  *  </tr>
 129  * </table>
 130  *
 131  * <p>The common pool is by default constructed with default
 132  * parameters, but these may be controlled by setting three
 133  * {@linkplain System#getProperty system properties} with prefix
 134  * {@code "java.util.concurrent.ForkJoinPool.common."}:
 135  * {@code parallelism} -- a non-negative integer,
 136  * {@code threadFactory} -- the class name of a
 137  * {@link ForkJoinWorkerThreadFactory}, and
 138  * {@code exceptionHandler} --
 139  * the class name of a {@link UncaughtExceptionHandler}.
 140  * Upon any error in establishing these settings, default parameters
 141  * are used. It is possible to disable or limit the use of threads in
 142  * the common pool by setting the parallelism property to zero, and/or
 143  * using a factory that may return {@code null}.
 144  *
 145  * <p><b>Implementation notes</b>: This implementation restricts the
 146  * maximum number of running threads to 32767. Attempts to create
 147  * pools with greater than the maximum number result in
 148  * {@code IllegalArgumentException}.
 149  *
 150  * <p>This implementation rejects submitted tasks (that is, by throwing
 151  * {@link RejectedExecutionException}) only when the pool is shut down
 152  * or internal resources have been exhausted.
 153  *
 154  * @since 1.7
 155  * @author Doug Lea
 156  */
 157 public class ForkJoinPool extends AbstractExecutorService {
 158 
 159     /*
 160      * Implementation Overview
 161      *
 162      * This class and its nested classes provide the main
 163      * functionality and control for a set of worker threads:
 164      * Submissions from non-FJ threads enter into submission queues.
 165      * Workers take these tasks and typically split them into subtasks
 166      * that may be stolen by other workers.  Preference rules give
 167      * first priority to processing tasks from their own queues (LIFO
 168      * or FIFO, depending on mode), then to randomized FIFO steals of
 169      * tasks in other queues.
 170      *
 171      * WorkQueues
 172      * ==========
 173      *
 174      * Most operations occur within work-stealing queues (in nested
 175      * class WorkQueue).  These are special forms of Deques that
 176      * support only three of the four possible end-operations -- push,
 177      * pop, and poll (aka steal), under the further constraints that
 178      * push and pop are called only from the owning thread (or, as
 179      * extended here, under a lock), while poll may be called from
 180      * other threads.  (If you are unfamiliar with them, you probably
 181      * want to read Herlihy and Shavit's book "The Art of
 182      * Multiprocessor programming", chapter 16 describing these in
 183      * more detail before proceeding.)  The main work-stealing queue
 184      * design is roughly similar to those in the papers "Dynamic
 185      * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
 186      * (http://research.sun.com/scalable/pubs/index.html) and
 187      * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
 188      * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
 189      * The main differences ultimately stem from GC requirements that
 190      * we null out taken slots as soon as we can, to maintain as small
 191      * a footprint as possible even in programs generating huge
 192      * numbers of tasks. To accomplish this, we shift the CAS
 193      * arbitrating pop vs poll (steal) from being on the indices
 194      * ("base" and "top") to the slots themselves.  So, both a
 195      * successful pop and poll mainly entail a CAS of a slot from
 196      * non-null to null.  Because we rely on CASes of references, we
 197      * do not need tag bits on base or top.  They are simple ints as
 198      * used in any circular array-based queue (see for example
 199      * ArrayDeque).  Updates to the indices must still be ordered in a
 200      * way that guarantees that top == base means the queue is empty,
 201      * but otherwise may err on the side of possibly making the queue
 202      * appear nonempty when a push, pop, or poll have not fully
 203      * committed. Note that this means that the poll operation,
 204      * considered individually, is not wait-free. One thief cannot
 205      * successfully continue until another in-progress one (or, if
 206      * previously empty, a push) completes.  However, in the
 207      * aggregate, we ensure at least probabilistic non-blockingness.
 208      * If an attempted steal fails, a thief always chooses a different
 209      * random victim target to try next. So, in order for one thief to
 210      * progress, it suffices for any in-progress poll or new push on
 211      * any empty queue to complete. (This is why we normally use
 212      * method pollAt and its variants that try once at the apparent
 213      * base index, else consider alternative actions, rather than
 214      * method poll.)
 215      *
 216      * This approach also enables support of a user mode in which local
 217      * task processing is in FIFO, not LIFO order, simply by using
 218      * poll rather than pop.  This can be useful in message-passing
 219      * frameworks in which tasks are never joined.  However neither
 220      * mode considers affinities, loads, cache localities, etc, so
 221      * rarely provide the best possible performance on a given
 222      * machine, but portably provide good throughput by averaging over
 223      * these factors.  (Further, even if we did try to use such
 224      * information, we do not usually have a basis for exploiting it.
 225      * For example, some sets of tasks profit from cache affinities,
 226      * but others are harmed by cache pollution effects.)
 227      *
 228      * WorkQueues are also used in a similar way for tasks submitted
 229      * to the pool. We cannot mix these tasks in the same queues used
 230      * for work-stealing (this would contaminate lifo/fifo
 231      * processing). Instead, we randomly associate submission queues
 232      * with submitting threads, using a form of hashing.  The
 233      * ThreadLocalRandom probe value serves as a hash code for
 234      * choosing existing queues, and may be randomly repositioned upon
 235      * contention with other submitters.  In essence, submitters act
 236      * like workers except that they are restricted to executing local
 237      * tasks that they submitted (or in the case of CountedCompleters,
 238      * others with the same root task).  However, because most
 239      * shared/external queue operations are more expensive than
 240      * internal, and because, at steady state, external submitters
 241      * will compete for CPU with workers, ForkJoinTask.join and
 242      * related methods disable them from repeatedly helping to process
 243      * tasks if all workers are active.  Insertion of tasks in shared
 244      * mode requires a lock (mainly to protect in the case of
 245      * resizing) but we use only a simple spinlock (using bits in
 246      * field qlock), because submitters encountering a busy queue move
 247      * on to try or create other queues -- they block only when
 248      * creating and registering new queues.
 249      *
 250      * Management
 251      * ==========
 252      *
 253      * The main throughput advantages of work-stealing stem from
 254      * decentralized control -- workers mostly take tasks from
 255      * themselves or each other. We cannot negate this in the
 256      * implementation of other management responsibilities. The main
 257      * tactic for avoiding bottlenecks is packing nearly all
 258      * essentially atomic control state into two volatile variables
 259      * that are by far most often read (not written) as status and
 260      * consistency checks.
 261      *
 262      * Field "ctl" contains 64 bits holding all the information needed
 263      * to atomically decide to add, inactivate, enqueue (on an event
 264      * queue), dequeue, and/or re-activate workers.  To enable this
 265      * packing, we restrict maximum parallelism to (1<<15)-1 (which is
 266      * far in excess of normal operating range) to allow ids, counts,
 267      * and their negations (used for thresholding) to fit into 16bit
 268      * fields.
 269      *
 270      * Field "plock" is a form of sequence lock with a saturating
 271      * shutdown bit (similarly for per-queue "qlocks"), mainly
 272      * protecting updates to the workQueues array, as well as to
 273      * enable shutdown.  When used as a lock, it is normally only very
 274      * briefly held, so is nearly always available after at most a
 275      * brief spin, but we use a monitor-based backup strategy to
 276      * block when needed.
 277      *
 278      * Recording WorkQueues.  WorkQueues are recorded in the
 279      * "workQueues" array that is created upon first use and expanded
 280      * if necessary.  Updates to the array while recording new workers
 281      * and unrecording terminated ones are protected from each other
 282      * by a lock but the array is otherwise concurrently readable, and
 283      * accessed directly.  To simplify index-based operations, the
 284      * array size is always a power of two, and all readers must
 285      * tolerate null slots. Worker queues are at odd indices. Shared
 286      * (submission) queues are at even indices, up to a maximum of 64
 287      * slots, to limit growth even if array needs to expand to add
 288      * more workers. Grouping them together in this way simplifies and
 289      * speeds up task scanning.
 290      *
 291      * All worker thread creation is on-demand, triggered by task
 292      * submissions, replacement of terminated workers, and/or
 293      * compensation for blocked workers. However, all other support
 294      * code is set up to work with other policies.  To ensure that we
 295      * do not hold on to worker references that would prevent GC, ALL
 296      * accesses to workQueues are via indices into the workQueues
 297      * array (which is one source of some of the messy code
 298      * constructions here). In essence, the workQueues array serves as
 299      * a weak reference mechanism. Thus for example the wait queue
 300      * field of ctl stores indices, not references.  Access to the
 301      * workQueues in associated methods (for example signalWork) must
 302      * both index-check and null-check the IDs. All such accesses
 303      * ignore bad IDs by returning out early from what they are doing,
 304      * since this can only be associated with termination, in which
 305      * case it is OK to give up.  All uses of the workQueues array
 306      * also check that it is non-null (even if previously
 307      * non-null). This allows nulling during termination, which is
 308      * currently not necessary, but remains an option for
 309      * resource-revocation-based shutdown schemes. It also helps
 310      * reduce JIT issuance of uncommon-trap code, which tends to
 311      * unnecessarily complicate control flow in some methods.
 312      *
 313      * Event Queuing. Unlike HPC work-stealing frameworks, we cannot
 314      * let workers spin indefinitely scanning for tasks when none can
 315      * be found immediately, and we cannot start/resume workers unless
 316      * there appear to be tasks available.  On the other hand, we must
 317      * quickly prod them into action when new tasks are submitted or
 318      * generated. In many usages, ramp-up time to activate workers is
 319      * the main limiting factor in overall performance (this is
 320      * compounded at program start-up by JIT compilation and
 321      * allocation). So we try to streamline this as much as possible.
 322      * We park/unpark workers after placing in an event wait queue
 323      * when they cannot find work. This "queue" is actually a simple
 324      * Treiber stack, headed by the "id" field of ctl, plus a 15bit
 325      * counter value (that reflects the number of times a worker has
 326      * been inactivated) to avoid ABA effects (we need only as many
 327      * version numbers as worker threads). Successors are held in
 328      * field WorkQueue.nextWait.  Queuing deals with several intrinsic
 329      * races, mainly that a task-producing thread can miss seeing (and
 330      * signalling) another thread that gave up looking for work but
 331      * has not yet entered the wait queue. We solve this by requiring
 332      * a full sweep of all workers (via repeated calls to method
 333      * scan()) both before and after a newly waiting worker is added
 334      * to the wait queue. During a rescan, the worker might release
 335      * some other queued worker rather than itself, which has the same
 336      * net effect. Because enqueued workers may actually be rescanning
 337      * rather than waiting, we set and clear the "parker" field of
 338      * WorkQueues to reduce unnecessary calls to unpark.  (This
 339      * requires a secondary recheck to avoid missed signals.)  Note
 340      * the unusual conventions about Thread.interrupts surrounding
 341      * parking and other blocking: Because interrupts are used solely
 342      * to alert threads to check termination, which is checked anyway
 343      * upon blocking, we clear status (using Thread.interrupted)
 344      * before any call to park, so that park does not immediately
 345      * return due to status being set via some other unrelated call to
 346      * interrupt in user code.
 347      *
 348      * Signalling.  We create or wake up workers only when there
 349      * appears to be at least one task they might be able to find and
 350      * execute. However, many other threads may notice the same task
 351      * and each signal to wake up a thread that might take it. So in
 352      * general, pools will be over-signalled.  When a submission is
 353      * added or another worker adds a task to a queue that has fewer
 354      * than two tasks, they signal waiting workers (or trigger
 355      * creation of new ones if fewer than the given parallelism level
 356      * -- signalWork), and may leave a hint to the unparked worker to
 357      * help signal others upon wakeup).  These primary signals are
 358      * buttressed by others (see method helpSignal) whenever other
 359      * threads scan for work or do not have a task to process.  On
 360      * most platforms, signalling (unpark) overhead time is noticeably
 361      * long, and the time between signalling a thread and it actually
 362      * making progress can be very noticeably long, so it is worth
 363      * offloading these delays from critical paths as much as
 364      * possible.
 365      *
 366      * Trimming workers. To release resources after periods of lack of
 367      * use, a worker starting to wait when the pool is quiescent will
 368      * time out and terminate if the pool has remained quiescent for a
 369      * given period -- a short period if there are more threads than
 370      * parallelism, longer as the number of threads decreases. This
 371      * will slowly propagate, eventually terminating all workers after
 372      * periods of non-use.
 373      *
 374      * Shutdown and Termination. A call to shutdownNow atomically sets
 375      * a plock bit and then (non-atomically) sets each worker's
 376      * qlock status, cancels all unprocessed tasks, and wakes up
 377      * all waiting workers.  Detecting whether termination should
 378      * commence after a non-abrupt shutdown() call requires more work
 379      * and bookkeeping. We need consensus about quiescence (i.e., that
 380      * there is no more work). The active count provides a primary
 381      * indication but non-abrupt shutdown still requires a rechecking
 382      * scan for any workers that are inactive but not queued.
 383      *
 384      * Joining Tasks
 385      * =============
 386      *
 387      * Any of several actions may be taken when one worker is waiting
 388      * to join a task stolen (or always held) by another.  Because we
 389      * are multiplexing many tasks on to a pool of workers, we can't
 390      * just let them block (as in Thread.join).  We also cannot just
 391      * reassign the joiner's run-time stack with another and replace
 392      * it later, which would be a form of "continuation", that even if
 393      * possible is not necessarily a good idea since we sometimes need
 394      * both an unblocked task and its continuation to progress.
 395      * Instead we combine two tactics:
 396      *
 397      *   Helping: Arranging for the joiner to execute some task that it
 398      *      would be running if the steal had not occurred.
 399      *
 400      *   Compensating: Unless there are already enough live threads,
 401      *      method tryCompensate() may create or re-activate a spare
 402      *      thread to compensate for blocked joiners until they unblock.
 403      *
 404      * A third form (implemented in tryRemoveAndExec) amounts to
 405      * helping a hypothetical compensator: If we can readily tell that
 406      * a possible action of a compensator is to steal and execute the
 407      * task being joined, the joining thread can do so directly,
 408      * without the need for a compensation thread (although at the
 409      * expense of larger run-time stacks, but the tradeoff is
 410      * typically worthwhile).
 411      *
 412      * The ManagedBlocker extension API can't use helping so relies
 413      * only on compensation in method awaitBlocker.
 414      *
 415      * The algorithm in tryHelpStealer entails a form of "linear"
 416      * helping: Each worker records (in field currentSteal) the most
 417      * recent task it stole from some other worker. Plus, it records
 418      * (in field currentJoin) the task it is currently actively
 419      * joining. Method tryHelpStealer uses these markers to try to
 420      * find a worker to help (i.e., steal back a task from and execute
 421      * it) that could hasten completion of the actively joined task.
 422      * In essence, the joiner executes a task that would be on its own
 423      * local deque had the to-be-joined task not been stolen. This may
 424      * be seen as a conservative variant of the approach in Wagner &
 425      * Calder "Leapfrogging: a portable technique for implementing
 426      * efficient futures" SIGPLAN Notices, 1993
 427      * (http://portal.acm.org/citation.cfm?id=155354). It differs in
 428      * that: (1) We only maintain dependency links across workers upon
 429      * steals, rather than use per-task bookkeeping.  This sometimes
 430      * requires a linear scan of workQueues array to locate stealers,
 431      * but often doesn't because stealers leave hints (that may become
 432      * stale/wrong) of where to locate them.  It is only a hint
 433      * because a worker might have had multiple steals and the hint
 434      * records only one of them (usually the most current).  Hinting
 435      * isolates cost to when it is needed, rather than adding to
 436      * per-task overhead.  (2) It is "shallow", ignoring nesting and
 437      * potentially cyclic mutual steals.  (3) It is intentionally
 438      * racy: field currentJoin is updated only while actively joining,
 439      * which means that we miss links in the chain during long-lived
 440      * tasks, GC stalls etc (which is OK since blocking in such cases
 441      * is usually a good idea).  (4) We bound the number of attempts
 442      * to find work (see MAX_HELP) and fall back to suspending the
 443      * worker and if necessary replacing it with another.
 444      *
 445      * Helping actions for CountedCompleters are much simpler: Method
 446      * helpComplete can take and execute any task with the same root
 447      * as the task being waited on. However, this still entails some
 448      * traversal of completer chains, so is less efficient than using
 449      * CountedCompleters without explicit joins.
 450      *
 451      * It is impossible to keep exactly the target parallelism number
 452      * of threads running at any given time.  Determining the
 453      * existence of conservatively safe helping targets, the
 454      * availability of already-created spares, and the apparent need
 455      * to create new spares are all racy, so we rely on multiple
 456      * retries of each.  Compensation in the apparent absence of
 457      * helping opportunities is challenging to control on JVMs, where
 458      * GC and other activities can stall progress of tasks that in
 459      * turn stall out many other dependent tasks, without us being
 460      * able to determine whether they will ever require compensation.
 461      * Even though work-stealing otherwise encounters little
 462      * degradation in the presence of more threads than cores,
 463      * aggressively adding new threads in such cases entails risk of
 464      * unwanted positive feedback control loops in which more threads
 465      * cause more dependent stalls (as well as delayed progress of
 466      * unblocked threads to the point that we know they are available)
 467      * leading to more situations requiring more threads, and so
 468      * on. This aspect of control can be seen as an (analytically
 469      * intractable) game with an opponent that may choose the worst
 470      * (for us) active thread to stall at any time.  We take several
 471      * precautions to bound losses (and thus bound gains), mainly in
 472      * methods tryCompensate and awaitJoin.
 473      *
 474      * Common Pool
 475      * ===========
 476      *
 477      * The static common Pool always exists after static
 478      * initialization.  Since it (or any other created pool) need
 479      * never be used, we minimize initial construction overhead and
 480      * footprint to the setup of about a dozen fields, with no nested
 481      * allocation. Most bootstrapping occurs within method
 482      * fullExternalPush during the first submission to the pool.
 483      *
 484      * When external threads submit to the common pool, they can
 485      * perform some subtask processing (see externalHelpJoin and
 486      * related methods).  We do not need to record whether these
 487      * submissions are to the common pool -- if not, externalHelpJoin
 488      * returns quickly (at the most helping to signal some common pool
 489      * workers). These submitters would otherwise be blocked waiting
 490      * for completion, so the extra effort (with liberally sprinkled
 491      * task status checks) in inapplicable cases amounts to an odd
 492      * form of limited spin-wait before blocking in ForkJoinTask.join.
 493      *
 494      * Style notes
 495      * ===========
 496      *
 497      * There is a lot of representation-level coupling among classes
 498      * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask.  The
 499      * fields of WorkQueue maintain data structures managed by
 500      * ForkJoinPool, so are directly accessed.  There is little point
 501      * trying to reduce this, since any associated future changes in
 502      * representations will need to be accompanied by algorithmic
 503      * changes anyway. Several methods intrinsically sprawl because
 504      * they must accumulate sets of consistent reads of volatiles held
 505      * in local variables.  Methods signalWork() and scan() are the
 506      * main bottlenecks, so are especially heavily
 507      * micro-optimized/mangled.  There are lots of inline assignments
 508      * (of form "while ((local = field) != 0)") which are usually the
 509      * simplest way to ensure the required read orderings (which are
 510      * sometimes critical). This leads to a "C"-like style of listing
 511      * declarations of these locals at the heads of methods or blocks.
 512      * There are several occurrences of the unusual "do {} while
 513      * (!cas...)"  which is the simplest way to force an update of a
 514      * CAS'ed variable. There are also other coding oddities (including
 515      * several unnecessary-looking hoisted null checks) that help
 516      * some methods perform reasonably even when interpreted (not
 517      * compiled).
 518      *
 519      * The order of declarations in this file is:
 520      * (1) Static utility functions
 521      * (2) Nested (static) classes
 522      * (3) Static fields
 523      * (4) Fields, along with constants used when unpacking some of them
 524      * (5) Internal control methods
 525      * (6) Callbacks and other support for ForkJoinTask methods
 526      * (7) Exported methods
 527      * (8) Static block initializing statics in minimally dependent order
 528      */
 529 
 530     // Static utilities
 531 
 532     /**
 533      * If there is a security manager, makes sure caller has
 534      * permission to modify threads.
 535      */
 536     private static void checkPermission() {
 537         SecurityManager security = System.getSecurityManager();
 538         if (security != null)
 539             security.checkPermission(modifyThreadPermission);
 540     }
 541 
 542     // Nested classes
 543 
 544     /**
 545      * Factory for creating new {@link ForkJoinWorkerThread}s.
 546      * A {@code ForkJoinWorkerThreadFactory} must be defined and used
 547      * for {@code ForkJoinWorkerThread} subclasses that extend base
 548      * functionality or initialize threads with different contexts.
 549      */
 550     public static interface ForkJoinWorkerThreadFactory {
 551         /**
 552          * Returns a new worker thread operating in the given pool.
 553          *
 554          * @param pool the pool this thread works in
 555          * @throws NullPointerException if the pool is null
 556          * @return the new worker thread
 557          */
 558         public ForkJoinWorkerThread newThread(ForkJoinPool pool);
 559     }
 560 
 561     /**
 562      * Default ForkJoinWorkerThreadFactory implementation; creates a
 563      * new ForkJoinWorkerThread.
 564      */
 565     static final class DefaultForkJoinWorkerThreadFactory
 566         implements ForkJoinWorkerThreadFactory {
 567         public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
 568             return new ForkJoinWorkerThread(pool);
 569         }
 570     }
 571 
 572     /**




















 573      * Class for artificial tasks that are used to replace the target
 574      * of local joins if they are removed from an interior queue slot
 575      * in WorkQueue.tryRemoveAndExec. We don't need the proxy to
 576      * actually do anything beyond having a unique identity.
 577      */
 578     static final class EmptyTask extends ForkJoinTask<Void> {
 579         private static final long serialVersionUID = -7721805057305804111L;
 580         EmptyTask() { status = ForkJoinTask.NORMAL; } // force done
 581         public final Void getRawResult() { return null; }
 582         public final void setRawResult(Void x) {}
 583         public final boolean exec() { return true; }
 584     }
 585 
 586     /**
 587      * Queues supporting work-stealing as well as external task
 588      * submission. See above for main rationale and algorithms.
 589      * Implementation relies heavily on "Unsafe" intrinsics
 590      * and selective use of "volatile":
 591      *
 592      * Field "base" is the index (mod array.length) of the least valid
 593      * queue slot, which is always the next position to steal (poll)
 594      * from if nonempty. Reads and writes require volatile orderings
 595      * but not CAS, because updates are only performed after slot
 596      * CASes.
 597      *
 598      * Field "top" is the index (mod array.length) of the next queue
 599      * slot to push to or pop from. It is written only by owner thread
 600      * for push, or under lock for external/shared push, and accessed
 601      * by other threads only after reading (volatile) base.  Both top
 602      * and base are allowed to wrap around on overflow, but (top -
 603      * base) (or more commonly -(base - top) to force volatile read of
 604      * base before top) still estimates size. The lock ("qlock") is
 605      * forced to -1 on termination, causing all further lock attempts
 606      * to fail. (Note: we don't need CAS for termination state because
 607      * upon pool shutdown, all shared-queues will stop being used
 608      * anyway.)  Nearly all lock bodies are set up so that exceptions
 609      * within lock bodies are "impossible" (modulo JVM errors that
 610      * would cause failure anyway.)
 611      *
 612      * The array slots are read and written using the emulation of
 613      * volatiles/atomics provided by Unsafe. Insertions must in
 614      * general use putOrderedObject as a form of releasing store to
 615      * ensure that all writes to the task object are ordered before
 616      * its publication in the queue.  All removals entail a CAS to
 617      * null.  The array is always a power of two. To ensure safety of
 618      * Unsafe array operations, all accesses perform explicit null
 619      * checks and implicit bounds checks via power-of-two masking.
 620      *
 621      * In addition to basic queuing support, this class contains
 622      * fields described elsewhere to control execution. It turns out
 623      * to work better memory-layout-wise to include them in this class
 624      * rather than a separate class.
 625      *
 626      * Performance on most platforms is very sensitive to placement of
 627      * instances of both WorkQueues and their arrays -- we absolutely
 628      * do not want multiple WorkQueue instances or multiple queue
 629      * arrays sharing cache lines. (It would be best for queue objects
 630      * and their arrays to share, but there is nothing available to
 631      * help arrange that).  Unfortunately, because they are recorded
 632      * in a common array, WorkQueue instances are often moved to be
 633      * adjacent by garbage collectors. To reduce impact, we use field
 634      * padding that works OK on common platforms; this effectively
 635      * trades off slightly slower average field access for the sake of
 636      * avoiding really bad worst-case access. (Until better JVM
 637      * support is in place, this padding is dependent on transient
 638      * properties of JVM field layout rules.) We also take care in
 639      * allocating, sizing and resizing the array. Non-shared queue
 640      * arrays are initialized by workers before use. Others are
 641      * allocated on first use.
 642      */
 643     static final class WorkQueue {
 644         /**
 645          * Capacity of work-stealing queue array upon initialization.
 646          * Must be a power of two; at least 4, but should be larger to
 647          * reduce or eliminate cacheline sharing among queues.
 648          * Currently, it is much larger, as a partial workaround for
 649          * the fact that JVMs often place arrays in locations that
 650          * share GC bookkeeping (especially cardmarks) such that
 651          * per-write accesses encounter serious memory contention.
 652          */
 653         static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
 654 
 655         /**
 656          * Maximum size for queue arrays. Must be a power of two less
 657          * than or equal to 1 << (31 - width of array entry) to ensure
 658          * lack of wraparound of index calculations, but defined to a
 659          * value a bit less than this to help users trap runaway
 660          * programs before saturating systems.
 661          */
 662         static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
 663 
 664         // Heuristic padding to ameliorate unfortunate memory placements
 665         volatile long pad00, pad01, pad02, pad03, pad04, pad05, pad06;
 666 
 667         int seed;                  // for random scanning; initialize nonzero
 668         volatile int eventCount;   // encoded inactivation count; < 0 if inactive
 669         int nextWait;              // encoded record of next event waiter
 670         int hint;                  // steal or signal hint (index)
 671         int poolIndex;             // index of this queue in pool (or 0)
 672         final int mode;            // 0: lifo, > 0: fifo, < 0: shared
 673         int nsteals;               // number of steals
 674         volatile int qlock;        // 1: locked, -1: terminate; else 0
 675         volatile int base;         // index of next slot for poll
 676         int top;                   // index of next slot for push
 677         ForkJoinTask<?>[] array;   // the elements (initially unallocated)
 678         final ForkJoinPool pool;   // the containing pool (may be null)
 679         final ForkJoinWorkerThread owner; // owning thread or null if shared
 680         volatile Thread parker;    // == owner during call to park; else null
 681         volatile ForkJoinTask<?> currentJoin;  // task being joined in awaitJoin
 682         ForkJoinTask<?> currentSteal; // current non-local task being executed
 683 
 684         volatile Object pad10, pad11, pad12, pad13, pad14, pad15, pad16, pad17;
 685         volatile Object pad18, pad19, pad1a, pad1b, pad1c, pad1d;
 686 
 687         WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner, int mode,
 688                   int seed) {
 689             this.pool = pool;
 690             this.owner = owner;
 691             this.mode = mode;
 692             this.seed = seed;
 693             // Place indices in the center of array (that is not yet allocated)
 694             base = top = INITIAL_QUEUE_CAPACITY >>> 1;
 695         }
 696 
 697         /**
 698          * Returns the approximate number of tasks in the queue.
 699          */
 700         final int queueSize() {
 701             int n = base - top;       // non-owner callers must read base first
 702             return (n >= 0) ? 0 : -n; // ignore transient negative
 703         }
 704 
 705        /**
 706          * Provides a more accurate estimate of whether this queue has
 707          * any tasks than does queueSize, by checking whether a
 708          * near-empty queue has at least one unclaimed task.
 709          */
 710         final boolean isEmpty() {
 711             ForkJoinTask<?>[] a; int m, s;
 712             int n = base - (s = top);
 713             return (n >= 0 ||
 714                     (n == -1 &&
 715                      ((a = array) == null ||
 716                       (m = a.length - 1) < 0 ||
 717                       U.getObject
 718                       (a, (long)((m & (s - 1)) << ASHIFT) + ABASE) == null)));
 719         }
 720 
 721         /**
 722          * Pushes a task. Call only by owner in unshared queues.  (The
 723          * shared-queue version is embedded in method externalPush.)
 724          *
 725          * @param task the task. Caller must ensure non-null.
 726          * @throws RejectedExecutionException if array cannot be resized
 727          */
 728         final void push(ForkJoinTask<?> task) {
 729             ForkJoinTask<?>[] a; ForkJoinPool p;
 730             int s = top, m, n;
 731             if ((a = array) != null) {    // ignore if queue removed
 732                 int j = (((m = a.length - 1) & s) << ASHIFT) + ABASE;
 733                 U.putOrderedObject(a, j, task);
 734                 if ((n = (top = s + 1) - base) <= 2) {
 735                     if ((p = pool) != null)
 736                         p.signalWork(this);
 737                 }
 738                 else if (n >= m)
 739                     growArray();
 740             }
 741         }
 742 
 743        /**
 744          * Initializes or doubles the capacity of array. Call either
 745          * by owner or with lock held -- it is OK for base, but not
 746          * top, to move while resizings are in progress.
 747          */
 748         final ForkJoinTask<?>[] growArray() {
 749             ForkJoinTask<?>[] oldA = array;
 750             int size = oldA != null ? oldA.length << 1 : INITIAL_QUEUE_CAPACITY;
 751             if (size > MAXIMUM_QUEUE_CAPACITY)
 752                 throw new RejectedExecutionException("Queue capacity exceeded");
 753             int oldMask, t, b;
 754             ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size];
 755             if (oldA != null && (oldMask = oldA.length - 1) >= 0 &&
 756                 (t = top) - (b = base) > 0) {
 757                 int mask = size - 1;
 758                 do {
 759                     ForkJoinTask<?> x;
 760                     int oldj = ((b & oldMask) << ASHIFT) + ABASE;
 761                     int j    = ((b &    mask) << ASHIFT) + ABASE;
 762                     x = (ForkJoinTask<?>)U.getObjectVolatile(oldA, oldj);
 763                     if (x != null &&
 764                         U.compareAndSwapObject(oldA, oldj, x, null))
 765                         U.putObjectVolatile(a, j, x);
 766                 } while (++b != t);
 767             }
 768             return a;
 769         }
 770 
 771         /**
 772          * Takes next task, if one exists, in LIFO order.  Call only
 773          * by owner in unshared queues.
 774          */
 775         final ForkJoinTask<?> pop() {
 776             ForkJoinTask<?>[] a; ForkJoinTask<?> t; int m;
 777             if ((a = array) != null && (m = a.length - 1) >= 0) {
 778                 for (int s; (s = top - 1) - base >= 0;) {
 779                     long j = ((m & s) << ASHIFT) + ABASE;
 780                     if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)
 781                         break;
 782                     if (U.compareAndSwapObject(a, j, t, null)) {
 783                         top = s;
 784                         return t;
 785                     }
 786                 }
 787             }
 788             return null;
 789         }
 790 
 791         /**
 792          * Takes a task in FIFO order if b is base of queue and a task
 793          * can be claimed without contention. Specialized versions
 794          * appear in ForkJoinPool methods scan and tryHelpStealer.
 795          */
 796         final ForkJoinTask<?> pollAt(int b) {
 797             ForkJoinTask<?> t; ForkJoinTask<?>[] a;
 798             if ((a = array) != null) {
 799                 int j = (((a.length - 1) & b) << ASHIFT) + ABASE;
 800                 if ((t = (ForkJoinTask<?>)U.getObjectVolatile(a, j)) != null &&
 801                     base == b &&
 802                     U.compareAndSwapObject(a, j, t, null)) {
 803                     base = b + 1;
 804                     return t;
 805                 }
 806             }
 807             return null;
 808         }
 809 
 810         /**
 811          * Takes next task, if one exists, in FIFO order.
 812          */
 813         final ForkJoinTask<?> poll() {
 814             ForkJoinTask<?>[] a; int b; ForkJoinTask<?> t;
 815             while ((b = base) - top < 0 && (a = array) != null) {
 816                 int j = (((a.length - 1) & b) << ASHIFT) + ABASE;
 817                 t = (ForkJoinTask<?>)U.getObjectVolatile(a, j);
 818                 if (t != null) {
 819                     if (base == b &&
 820                         U.compareAndSwapObject(a, j, t, null)) {
 821                         base = b + 1;
 822                         return t;
 823                     }
 824                 }
 825                 else if (base == b) {
 826                     if (b + 1 == top)
 827                         break;
 828                     Thread.yield(); // wait for lagging update (very rare)
 829                 }
 830             }
 831             return null;
 832         }
 833 
 834         /**
 835          * Takes next task, if one exists, in order specified by mode.
 836          */
 837         final ForkJoinTask<?> nextLocalTask() {
 838             return mode == 0 ? pop() : poll();
 839         }
 840 
 841         /**
 842          * Returns next task, if one exists, in order specified by mode.
 843          */
 844         final ForkJoinTask<?> peek() {
 845             ForkJoinTask<?>[] a = array; int m;
 846             if (a == null || (m = a.length - 1) < 0)
 847                 return null;
 848             int i = mode == 0 ? top - 1 : base;
 849             int j = ((i & m) << ASHIFT) + ABASE;
 850             return (ForkJoinTask<?>)U.getObjectVolatile(a, j);
 851         }
 852 
 853         /**
 854          * Pops the given task only if it is at the current top.
 855          * (A shared version is available only via FJP.tryExternalUnpush)
 856          */
 857         final boolean tryUnpush(ForkJoinTask<?> t) {
 858             ForkJoinTask<?>[] a; int s;
 859             if ((a = array) != null && (s = top) != base &&
 860                 U.compareAndSwapObject
 861                 (a, (((a.length - 1) & --s) << ASHIFT) + ABASE, t, null)) {
 862                 top = s;
 863                 return true;
 864             }
 865             return false;
 866         }
 867 
 868         /**
 869          * Removes and cancels all known tasks, ignoring any exceptions.
 870          */
 871         final void cancelAll() {
 872             ForkJoinTask.cancelIgnoringExceptions(currentJoin);
 873             ForkJoinTask.cancelIgnoringExceptions(currentSteal);
 874             for (ForkJoinTask<?> t; (t = poll()) != null; )
 875                 ForkJoinTask.cancelIgnoringExceptions(t);
 876         }
 877 
 878         /**
 879          * Computes next value for random probes.  Scans don't require
 880          * a very high quality generator, but also not a crummy one.
 881          * Marsaglia xor-shift is cheap and works well enough.  Note:
 882          * This is manually inlined in its usages in ForkJoinPool to
 883          * avoid writes inside busy scan loops.
 884          */
 885         final int nextSeed() {
 886             int r = seed;
 887             r ^= r << 13;
 888             r ^= r >>> 17;
 889             return seed = r ^= r << 5;
 890         }
 891 
 892         // Specialized execution methods
 893 
 894         /**
 895          * Pops and runs tasks until empty.
 896          */
 897         private void popAndExecAll() {
 898             // A bit faster than repeated pop calls
 899             ForkJoinTask<?>[] a; int m, s; long j; ForkJoinTask<?> t;
 900             while ((a = array) != null && (m = a.length - 1) >= 0 &&
 901                    (s = top - 1) - base >= 0 &&
 902                    (t = ((ForkJoinTask<?>)
 903                          U.getObject(a, j = ((m & s) << ASHIFT) + ABASE)))
 904                    != null) {
 905                 if (U.compareAndSwapObject(a, j, t, null)) {
 906                     top = s;
 907                     t.doExec();
 908                 }
 909             }
 910         }
 911 
 912         /**
 913          * Polls and runs tasks until empty.
 914          */
 915         private void pollAndExecAll() {
 916             for (ForkJoinTask<?> t; (t = poll()) != null;)
 917                 t.doExec();
 918         }
 919 
 920         /**
 921          * If present, removes from queue and executes the given task,
 922          * or any other cancelled task. Returns (true) on any CAS
 923          * or consistency check failure so caller can retry.
 924          *
 925          * @return false if no progress can be made, else true
 926          */
 927         final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
 928             boolean stat = true, removed = false, empty = true;
 929             ForkJoinTask<?>[] a; int m, s, b, n;
 930             if ((a = array) != null && (m = a.length - 1) >= 0 &&
 931                 (n = (s = top) - (b = base)) > 0) {
 932                 for (ForkJoinTask<?> t;;) {           // traverse from s to b
 933                     int j = ((--s & m) << ASHIFT) + ABASE;
 934                     t = (ForkJoinTask<?>)U.getObjectVolatile(a, j);
 935                     if (t == null)                    // inconsistent length
 936                         break;
 937                     else if (t == task) {
 938                         if (s + 1 == top) {           // pop
 939                             if (!U.compareAndSwapObject(a, j, task, null))
 940                                 break;
 941                             top = s;
 942                             removed = true;
 943                         }
 944                         else if (base == b)           // replace with proxy
 945                             removed = U.compareAndSwapObject(a, j, task,
 946                                                              new EmptyTask());
 947                         break;
 948                     }
 949                     else if (t.status >= 0)
 950                         empty = false;
 951                     else if (s + 1 == top) {          // pop and throw away
 952                         if (U.compareAndSwapObject(a, j, t, null))
 953                             top = s;
 954                         break;
 955                     }
 956                     if (--n == 0) {
 957                         if (!empty && base == b)
 958                             stat = false;
 959                         break;
 960                     }
 961                 }
 962             }
 963             if (removed)
 964                 task.doExec();
 965             return stat;
 966         }
 967 
 968         /**
 969          * Polls for and executes the given task or any other task in
 970          * its CountedCompleter computation.
 971          */
 972         final boolean pollAndExecCC(ForkJoinTask<?> root) {
 973             ForkJoinTask<?>[] a; int b; Object o;
 974             outer: while ((b = base) - top < 0 && (a = array) != null) {
 975                 long j = (((a.length - 1) & b) << ASHIFT) + ABASE;
 976                 if ((o = U.getObject(a, j)) == null ||
 977                     !(o instanceof CountedCompleter))
 978                     break;
 979                 for (CountedCompleter<?> t = (CountedCompleter<?>)o, r = t;;) {
 980                     if (r == root) {
 981                         if (base == b &&
 982                             U.compareAndSwapObject(a, j, t, null)) {
 983                             base = b + 1;
 984                             t.doExec();
 985                             return true;
 986                         }
 987                         else
 988                             break; // restart
 989                     }
 990                     if ((r = r.completer) == null)
 991                         break outer; // not part of root computation
 992                 }
 993             }
 994             return false;
 995         }
 996 
 997         /**
 998          * Executes a top-level task and any local tasks remaining
 999          * after execution.
1000          */
1001         final void runTask(ForkJoinTask<?> t) {
1002             if (t != null) {
1003                 (currentSteal = t).doExec();
1004                 currentSteal = null;
1005                 ++nsteals;
1006                 if (base - top < 0) {       // process remaining local tasks
1007                     if (mode == 0)
1008                         popAndExecAll();
1009                     else
1010                         pollAndExecAll();
1011                 }
1012             }
1013         }
1014 
1015         /**
1016          * Executes a non-top-level (stolen) task.
1017          */
1018         final void runSubtask(ForkJoinTask<?> t) {
1019             if (t != null) {
1020                 ForkJoinTask<?> ps = currentSteal;
1021                 (currentSteal = t).doExec();
1022                 currentSteal = ps;
1023             }
1024         }
1025 
1026         /**
1027          * Returns true if owned and not known to be blocked.
1028          */
1029         final boolean isApparentlyUnblocked() {
1030             Thread wt; Thread.State s;
1031             return (eventCount >= 0 &&
1032                     (wt = owner) != null &&
1033                     (s = wt.getState()) != Thread.State.BLOCKED &&
1034                     s != Thread.State.WAITING &&
1035                     s != Thread.State.TIMED_WAITING);
1036         }
1037 
1038         // Unsafe mechanics
1039         private static final sun.misc.Unsafe U;
1040         private static final long QLOCK;
1041         private static final int ABASE;
1042         private static final int ASHIFT;
1043         static {

1044             try {
1045                 U = sun.misc.Unsafe.getUnsafe();
1046                 Class<?> k = WorkQueue.class;
1047                 Class<?> ak = ForkJoinTask[].class;
1048                 QLOCK = U.objectFieldOffset
1049                     (k.getDeclaredField("qlock"));
1050                 ABASE = U.arrayBaseOffset(ak);
1051                 int scale = U.arrayIndexScale(ak);
1052                 if ((scale & (scale - 1)) != 0)
1053                     throw new Error("data type scale not a power of two");
1054                 ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
1055             } catch (Exception e) {
1056                 throw new Error(e);
1057             }



1058         }
1059     }
1060 
1061     // static fields (initialized in static initializer below)
1062 
1063     /**
1064      * Creates a new ForkJoinWorkerThread. This factory is used unless
1065      * overridden in ForkJoinPool constructors.
1066      */
1067     public static final ForkJoinWorkerThreadFactory
1068         defaultForkJoinWorkerThreadFactory;
1069 
1070     /**









1071      * Permission required for callers of methods that may start or
1072      * kill threads.
1073      */
1074     private static final RuntimePermission modifyThreadPermission;
1075 
1076     /**
1077      * Common (static) pool. Non-null for public use unless a static
1078      * construction exception, but internal usages null-check on use
1079      * to paranoically avoid potential initialization circularities
1080      * as well as to simplify generated code.
1081      */
1082     static final ForkJoinPool common;
1083 
1084     /**
1085      * Common pool parallelism. To allow simpler use and management
1086      * when common pool threads are disabled, we allow the underlying
1087      * common.config field to be zero, but in that case still report
1088      * parallelism as 1 to reflect resulting caller-runs mechanics.
1089      */
1090     static final int commonParallelism;
1091 
1092     /**
1093      * Sequence number for creating workerNamePrefix.
1094      */
1095     private static int poolNumberSequence;
1096 
1097     /**
1098      * Returns the next sequence number. We don't expect this to
1099      * ever contend, so use simple builtin sync.
1100      */
1101     private static final synchronized int nextPoolId() {
1102         return ++poolNumberSequence;
1103     }
1104 
1105     // static constants
1106 
1107     /**
1108      * Initial timeout value (in nanoseconds) for the thread
1109      * triggering quiescence to park waiting for new work. On timeout,
1110      * the thread will instead try to shrink the number of
1111      * workers. The value should be large enough to avoid overly
1112      * aggressive shrinkage during most transient stalls (long GCs
1113      * etc).
1114      */
1115     private static final long IDLE_TIMEOUT      = 2000L * 1000L * 1000L; // 2sec
1116 
1117     /**
1118      * Timeout value when there are more threads than parallelism level
1119      */
1120     private static final long FAST_IDLE_TIMEOUT =  200L * 1000L * 1000L;
1121 
1122     /**
1123      * Tolerance for idle timeouts, to cope with timer undershoots
1124      */
1125     private static final long TIMEOUT_SLOP = 2000000L;
1126 
1127     /**
1128      * The maximum stolen->joining link depth allowed in method
1129      * tryHelpStealer.  Must be a power of two.  Depths for legitimate
1130      * chains are unbounded, but we use a fixed constant to avoid
1131      * (otherwise unchecked) cycles and to bound staleness of
1132      * traversal parameters at the expense of sometimes blocking when
1133      * we could be helping.
1134      */
1135     private static final int MAX_HELP = 64;
1136 
1137     /**
1138      * Increment for seed generators. See class ThreadLocal for
1139      * explanation.
1140      */
1141     private static final int SEED_INCREMENT = 0x61c88647;
1142 
1143     /**
1144      * Bits and masks for control variables
1145      *
1146      * Field ctl is a long packed with:
1147      * AC: Number of active running workers minus target parallelism (16 bits)
1148      * TC: Number of total workers minus target parallelism (16 bits)
1149      * ST: true if pool is terminating (1 bit)
1150      * EC: the wait count of top waiting thread (15 bits)
1151      * ID: poolIndex of top of Treiber stack of waiters (16 bits)
1152      *
1153      * When convenient, we can extract the upper 32 bits of counts and
1154      * the lower 32 bits of queue state, u = (int)(ctl >>> 32) and e =
1155      * (int)ctl.  The ec field is never accessed alone, but always
1156      * together with id and st. The offsets of counts by the target
1157      * parallelism and the positionings of fields makes it possible to
1158      * perform the most common checks via sign tests of fields: When
1159      * ac is negative, there are not enough active workers, when tc is
1160      * negative, there are not enough total workers, and when e is
1161      * negative, the pool is terminating.  To deal with these possibly
1162      * negative fields, we use casts in and out of "short" and/or
1163      * signed shifts to maintain signedness.
1164      *
1165      * When a thread is queued (inactivated), its eventCount field is
1166      * set negative, which is the only way to tell if a worker is
1167      * prevented from executing tasks, even though it must continue to
1168      * scan for them to avoid queuing races. Note however that
1169      * eventCount updates lag releases so usage requires care.
1170      *
1171      * Field plock is an int packed with:
1172      * SHUTDOWN: true if shutdown is enabled (1 bit)
1173      * SEQ:  a sequence lock, with PL_LOCK bit set if locked (30 bits)
1174      * SIGNAL: set when threads may be waiting on the lock (1 bit)
1175      *
1176      * The sequence number enables simple consistency checks:
1177      * Staleness of read-only operations on the workQueues array can
1178      * be checked by comparing plock before vs after the reads.
1179      */
1180 
1181     // bit positions/shifts for fields
1182     private static final int  AC_SHIFT   = 48;
1183     private static final int  TC_SHIFT   = 32;
1184     private static final int  ST_SHIFT   = 31;
1185     private static final int  EC_SHIFT   = 16;
1186 
1187     // bounds
1188     private static final int  SMASK      = 0xffff;  // short bits
1189     private static final int  MAX_CAP    = 0x7fff;  // max #workers - 1
1190     private static final int  EVENMASK   = 0xfffe;  // even short bits
1191     private static final int  SQMASK     = 0x007e;  // max 64 (even) slots
1192     private static final int  SHORT_SIGN = 1 << 15;
1193     private static final int  INT_SIGN   = 1 << 31;
1194 
1195     // masks
1196     private static final long STOP_BIT   = 0x0001L << ST_SHIFT;
1197     private static final long AC_MASK    = ((long)SMASK) << AC_SHIFT;
1198     private static final long TC_MASK    = ((long)SMASK) << TC_SHIFT;
1199 
1200     // units for incrementing and decrementing
1201     private static final long TC_UNIT    = 1L << TC_SHIFT;
1202     private static final long AC_UNIT    = 1L << AC_SHIFT;
1203 
1204     // masks and units for dealing with u = (int)(ctl >>> 32)
1205     private static final int  UAC_SHIFT  = AC_SHIFT - 32;
1206     private static final int  UTC_SHIFT  = TC_SHIFT - 32;
1207     private static final int  UAC_MASK   = SMASK << UAC_SHIFT;
1208     private static final int  UTC_MASK   = SMASK << UTC_SHIFT;
1209     private static final int  UAC_UNIT   = 1 << UAC_SHIFT;
1210     private static final int  UTC_UNIT   = 1 << UTC_SHIFT;
1211 
1212     // masks and units for dealing with e = (int)ctl
1213     private static final int E_MASK      = 0x7fffffff; // no STOP_BIT
1214     private static final int E_SEQ       = 1 << EC_SHIFT;
1215 
1216     // plock bits
1217     private static final int SHUTDOWN    = 1 << 31;
1218     private static final int PL_LOCK     = 2;
1219     private static final int PL_SIGNAL   = 1;
1220     private static final int PL_SPINS    = 1 << 8;
1221 
1222     // access mode for WorkQueue
1223     static final int LIFO_QUEUE          =  0;
1224     static final int FIFO_QUEUE          =  1;
1225     static final int SHARED_QUEUE        = -1;
1226 
1227     // bounds for #steps in scan loop -- must be power 2 minus 1
1228     private static final int MIN_SCAN    = 0x1ff;   // cover estimation slop
1229     private static final int MAX_SCAN    = 0x1ffff; // 4 * max workers
1230 
1231     // Instance fields
1232 
1233     /*
1234      * Field layout of this class tends to matter more than one would
1235      * like. Runtime layout order is only loosely related to
1236      * declaration order and may differ across JVMs, but the following
1237      * empirically works OK on current JVMs.
1238      */
1239 
1240     // Heuristic padding to ameliorate unfortunate memory placements
1241     volatile long pad00, pad01, pad02, pad03, pad04, pad05, pad06;
1242 
1243     volatile long stealCount;                  // collects worker counts
1244     volatile long ctl;                         // main pool control
1245     volatile int plock;                        // shutdown status and seqLock
1246     volatile int indexSeed;                    // worker/submitter index seed
1247     final int config;                          // mode and parallelism level
1248     WorkQueue[] workQueues;                    // main registry
1249     final ForkJoinWorkerThreadFactory factory;
1250     final UncaughtExceptionHandler ueh;        // per-worker UEH
1251     final String workerNamePrefix;             // to create worker name string
1252 
1253     volatile Object pad10, pad11, pad12, pad13, pad14, pad15, pad16, pad17;
1254     volatile Object pad18, pad19, pad1a, pad1b;
1255 
1256     /**
1257      * Acquires the plock lock to protect worker array and related
1258      * updates. This method is called only if an initial CAS on plock
1259      * fails. This acts as a spinlock for normal cases, but falls back
1260      * to builtin monitor to block when (rarely) needed. This would be
1261      * a terrible idea for a highly contended lock, but works fine as
1262      * a more conservative alternative to a pure spinlock.
1263      */
1264     private int acquirePlock() {
1265         int spins = PL_SPINS, ps, nps;
1266         for (;;) {
1267             if (((ps = plock) & PL_LOCK) == 0 &&
1268                 U.compareAndSwapInt(this, PLOCK, ps, nps = ps + PL_LOCK))
1269                 return nps;










1270             else if (spins >= 0) {
1271                 if (ThreadLocalRandom.nextSecondarySeed() >= 0)

1272                     --spins;
1273             }
1274             else if (U.compareAndSwapInt(this, PLOCK, ps, ps | PL_SIGNAL)) {
1275                 synchronized (this) {
1276                     if ((plock & PL_SIGNAL) != 0) {
1277                         try {
1278                             wait();
1279                         } catch (InterruptedException ie) {
1280                             try {
1281                                 Thread.currentThread().interrupt();
1282                             } catch (SecurityException ignore) {
1283                             }
1284                         }
1285                     }
1286                     else
1287                         notifyAll();
1288                 }
1289             }
1290         }
1291     }
1292 
1293     /**
1294      * Unlocks and signals any thread waiting for plock. Called only
1295      * when CAS of seq value for unlock fails.
1296      */
1297     private void releasePlock(int ps) {
1298         plock = ps;
1299         synchronized (this) { notifyAll(); }
1300     }
1301 
1302     /**

































1303      * Tries to create and start one worker if fewer than target
1304      * parallelism level exist. Adjusts counts etc on failure.
1305      */
1306     private void tryAddWorker() {
1307         long c; int u;
1308         while ((u = (int)((c = ctl) >>> 32)) < 0 &&
1309                (u & SHORT_SIGN) != 0 && (int)c == 0) {
1310             long nc = (long)(((u + UTC_UNIT) & UTC_MASK) |
1311                              ((u + UAC_UNIT) & UAC_MASK)) << 32;
1312             if (U.compareAndSwapLong(this, CTL, c, nc)) {
1313                 ForkJoinWorkerThreadFactory fac;
1314                 Throwable ex = null;
1315                 ForkJoinWorkerThread wt = null;
1316                 try {
1317                     if ((fac = factory) != null &&
1318                         (wt = fac.newThread(this)) != null) {
1319                         wt.start();
1320                         break;
1321                     }
1322                 } catch (Throwable e) {
1323                     ex = e;
1324                 }
1325                 deregisterWorker(wt, ex);
1326                 break;
1327             }
1328         }
1329     }
1330 
1331     //  Registering and deregistering workers
1332 
1333     /**
1334      * Callback from ForkJoinWorkerThread to establish and record its
1335      * WorkQueue. To avoid scanning bias due to packing entries in
1336      * front of the workQueues array, we treat the array as a simple
1337      * power-of-two hash table using per-thread seed as hash,
1338      * expanding as needed.
1339      *
1340      * @param wt the worker thread
1341      * @return the worker's queue
1342      */
1343     final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
1344         UncaughtExceptionHandler handler; WorkQueue[] ws; int s, ps;
1345         wt.setDaemon(true);
1346         if ((handler = ueh) != null)
1347             wt.setUncaughtExceptionHandler(handler);
1348         do {} while (!U.compareAndSwapInt(this, INDEXSEED, s = indexSeed,
1349                                           s += SEED_INCREMENT) ||
1350                      s == 0); // skip 0
1351         WorkQueue w = new WorkQueue(this, wt, config >>> 16, s);
1352         if (((ps = plock) & PL_LOCK) != 0 ||
1353             !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
1354             ps = acquirePlock();
1355         int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
1356         try {
1357             if ((ws = workQueues) != null) {    // skip if shutting down
1358                 int n = ws.length, m = n - 1;
1359                 int r = (s << 1) | 1;           // use odd-numbered indices
1360                 if (ws[r &= m] != null) {       // collision
1361                     int probes = 0;             // step by approx half size
1362                     int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
1363                     while (ws[r = (r + step) & m] != null) {
1364                         if (++probes >= n) {
1365                             workQueues = ws = Arrays.copyOf(ws, n <<= 1);
1366                             m = n - 1;
1367                             probes = 0;
1368                         }
1369                     }
1370                 }
1371                 w.eventCount = w.poolIndex = r; // volatile write orders
1372                 ws[r] = w;
1373             }
1374         } finally {
1375             if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
1376                 releasePlock(nps);
1377         }
1378         wt.setName(workerNamePrefix.concat(Integer.toString(w.poolIndex)));
1379         return w;
1380     }
1381 
1382     /**
1383      * Final callback from terminating worker, as well as upon failure
1384      * to construct or start a worker.  Removes record of worker from
1385      * array, and adjusts counts. If pool is shutting down, tries to
1386      * complete termination.
1387      *
1388      * @param wt the worker thread, or null if construction failed
1389      * @param ex the exception causing failure, or null if none
1390      */
1391     final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
1392         WorkQueue w = null;
1393         if (wt != null && (w = wt.workQueue) != null) {
1394             int ps;
1395             w.qlock = -1;                // ensure set
1396             long ns = w.nsteals, sc;     // collect steal count
1397             do {} while (!U.compareAndSwapLong(this, STEALCOUNT,
1398                                                sc = stealCount, sc + ns));
1399             if (((ps = plock) & PL_LOCK) != 0 ||
1400                 !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
1401                 ps = acquirePlock();
1402             int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
1403             try {
1404                 int idx = w.poolIndex;
1405                 WorkQueue[] ws = workQueues;
1406                 if (ws != null && idx >= 0 && idx < ws.length && ws[idx] == w)
1407                     ws[idx] = null;
1408             } finally {
1409                 if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
1410                     releasePlock(nps);
1411             }
1412         }
1413 
1414         long c;                          // adjust ctl counts
1415         do {} while (!U.compareAndSwapLong
1416                      (this, CTL, c = ctl, (((c - AC_UNIT) & AC_MASK) |
1417                                            ((c - TC_UNIT) & TC_MASK) |
1418                                            (c & ~(AC_MASK|TC_MASK)))));
1419 
1420         if (!tryTerminate(false, false) && w != null && w.array != null) {
1421             w.cancelAll();               // cancel remaining tasks
1422             WorkQueue[] ws; WorkQueue v; Thread p; int u, i, e;
1423             while ((u = (int)((c = ctl) >>> 32)) < 0 && (e = (int)c) >= 0) {
1424                 if (e > 0) {             // activate or create replacement
1425                     if ((ws = workQueues) == null ||
1426                         (i = e & SMASK) >= ws.length ||
1427                         (v = ws[i]) == null)
1428                         break;
1429                     long nc = (((long)(v.nextWait & E_MASK)) |
1430                                ((long)(u + UAC_UNIT) << 32));
1431                     if (v.eventCount != (e | INT_SIGN))
1432                         break;
1433                     if (U.compareAndSwapLong(this, CTL, c, nc)) {
1434                         v.eventCount = (e + E_SEQ) & E_MASK;
1435                         if ((p = v.parker) != null)
1436                             U.unpark(p);
1437                         break;
1438                     }
1439                 }
1440                 else {
1441                     if ((short)u < 0)
1442                         tryAddWorker();
1443                     break;
1444                 }
1445             }
1446         }
1447         if (ex == null)                     // help clean refs on way out
1448             ForkJoinTask.helpExpungeStaleExceptions();
1449         else                                // rethrow
1450             ForkJoinTask.rethrow(ex);
1451     }
1452 
1453     // Submissions
1454 
1455     /**
1456      * Unless shutting down, adds the given task to a submission queue
1457      * at submitter's current queue index (modulo submission
1458      * range). Only the most common path is directly handled in this
1459      * method. All others are relayed to fullExternalPush.
1460      *
1461      * @param task the task. Caller must ensure non-null.
1462      */
1463     final void externalPush(ForkJoinTask<?> task) {
1464         WorkQueue[] ws; WorkQueue q; int z, m; ForkJoinTask<?>[] a;
1465         if ((z = ThreadLocalRandom.getProbe()) != 0 && plock > 0 &&
1466             (ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
1467             (q = ws[m & z & SQMASK]) != null &&
1468             U.compareAndSwapInt(q, QLOCK, 0, 1)) { // lock
1469             int b = q.base, s = q.top, n, an;
1470             if ((a = q.array) != null && (an = a.length) > (n = s + 1 - b)) {
1471                 int j = (((an - 1) & s) << ASHIFT) + ABASE;
1472                 U.putOrderedObject(a, j, task);
1473                 q.top = s + 1;                     // push on to deque
1474                 q.qlock = 0;
1475                 if (n <= 2)
1476                     signalWork(q);
1477                 return;
1478             }
1479             q.qlock = 0;
1480         }
1481         fullExternalPush(task);
1482     }
1483 
1484     /**
1485      * Full version of externalPush. This method is called, among
1486      * other times, upon the first submission of the first task to the
1487      * pool, so must perform secondary initialization.  It also
1488      * detects first submission by an external thread by looking up
1489      * its ThreadLocal, and creates a new shared queue if the one at
1490      * index if empty or contended. The plock lock body must be
1491      * exception-free (so no try/finally) so we optimistically
1492      * allocate new queues outside the lock and throw them away if
1493      * (very rarely) not needed.
1494      *
1495      * Secondary initialization occurs when plock is zero, to create
1496      * workQueue array and set plock to a valid value.  This lock body
1497      * must also be exception-free. Because the plock seq value can
1498      * eventually wrap around zero, this method harmlessly fails to
1499      * reinitialize if workQueues exists, while still advancing plock.
1500      */
1501     private void fullExternalPush(ForkJoinTask<?> task) {
1502         int r;
1503         if ((r = ThreadLocalRandom.getProbe()) == 0) {
1504             ThreadLocalRandom.localInit();
1505             r = ThreadLocalRandom.getProbe();



1506         }
1507         for (;;) {
1508             WorkQueue[] ws; WorkQueue q; int ps, m, k;
1509             boolean move = false;
1510             if ((ps = plock) < 0)



1511                 throw new RejectedExecutionException();
1512             else if (ps == 0 || (ws = workQueues) == null ||
1513                      (m = ws.length - 1) < 0) { // initialize workQueues
1514                 int p = config & SMASK;         // find power of two table size
1515                 int n = (p > 1) ? p - 1 : 1;    // ensure at least 2 slots
1516                 n |= n >>> 1; n |= n >>> 2;  n |= n >>> 4;
1517                 n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
1518                 WorkQueue[] nws = ((ws = workQueues) == null || ws.length == 0 ?
1519                                    new WorkQueue[n] : null);
1520                 if (((ps = plock) & PL_LOCK) != 0 ||
1521                     !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
1522                     ps = acquirePlock();
1523                 if (((ws = workQueues) == null || ws.length == 0) && nws != null)
1524                     workQueues = nws;
1525                 int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
1526                 if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
1527                     releasePlock(nps);
1528             }
1529             else if ((q = ws[k = r & m & SQMASK]) != null) {
1530                 if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
1531                     ForkJoinTask<?>[] a = q.array;
1532                     int s = q.top;
1533                     boolean submitted = false;
1534                     try {                      // locked version of push
1535                         if ((a != null && a.length > s + 1 - q.base) ||
1536                             (a = q.growArray()) != null) {   // must presize
1537                             int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
1538                             U.putOrderedObject(a, j, task);
1539                             q.top = s + 1;
1540                             submitted = true;
1541                         }
1542                     } finally {
1543                         q.qlock = 0;  // unlock
1544                     }
1545                     if (submitted) {
1546                         signalWork(q);
1547                         return;
1548                     }
1549                 }
1550                 move = true; // move on failure
1551             }
1552             else if (((ps = plock) & PL_LOCK) == 0) { // create new queue
1553                 q = new WorkQueue(this, null, SHARED_QUEUE, r);
1554                 if (((ps = plock) & PL_LOCK) != 0 ||
1555                     !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
1556                     ps = acquirePlock();
1557                 if ((ws = workQueues) != null && k < ws.length && ws[k] == null)
1558                     ws[k] = q;
1559                 int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
1560                 if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
1561                     releasePlock(nps);
1562             }
1563             else
1564                 move = true; // move if busy
1565             if (move)
1566                 r = ThreadLocalRandom.advanceProbe(r);
1567         }
1568     }
1569 
1570     // Maintaining ctl counts
1571 
1572     /**
1573      * Increments active count; mainly called upon return from blocking.
1574      */
1575     final void incrementActiveCount() {
1576         long c;
1577         do {} while (!U.compareAndSwapLong(this, CTL, c = ctl, c + AC_UNIT));
1578     }
1579 
1580     /**
1581      * Tries to create or activate a worker if too few are active.
1582      *
1583      * @param q the (non-null) queue holding tasks to be signalled
1584      */
1585     final void signalWork(WorkQueue q) {
1586         int hint = q.poolIndex;
1587         long c; int e, u, i, n; WorkQueue[] ws; WorkQueue w; Thread p;
1588         while ((u = (int)((c = ctl) >>> 32)) < 0) {
1589             if ((e = (int)c) > 0) {
1590                 if ((ws = workQueues) != null && ws.length > (i = e & SMASK) &&
1591                     (w = ws[i]) != null && w.eventCount == (e | INT_SIGN)) {
1592                     long nc = (((long)(w.nextWait & E_MASK)) |
1593                                ((long)(u + UAC_UNIT) << 32));
1594                     if (U.compareAndSwapLong(this, CTL, c, nc)) {
1595                         w.hint = hint;
1596                         w.eventCount = (e + E_SEQ) & E_MASK;
1597                         if ((p = w.parker) != null)
1598                             U.unpark(p);
1599                         break;
1600                     }
1601                     if (q.top - q.base <= 0)
1602                         break;
1603                 }
1604                 else
1605                     break;
1606             }
1607             else {
1608                 if ((short)u < 0)
1609                     tryAddWorker();
1610                 break;
1611             }
1612         }
1613     }
1614 
1615     // Scanning for tasks
1616 
1617     /**
1618      * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
1619      */
1620     final void runWorker(WorkQueue w) {
1621         w.growArray(); // allocate queue
1622         do { w.runTask(scan(w)); } while (w.qlock >= 0);
1623     }
1624 
1625     /**
1626      * Scans for and, if found, returns one task, else possibly
1627      * inactivates the worker. This method operates on single reads of
1628      * volatile state and is designed to be re-invoked continuously,
1629      * in part because it returns upon detecting inconsistencies,
1630      * contention, or state changes that indicate possible success on
1631      * re-invocation.
1632      *
1633      * The scan searches for tasks across queues (starting at a random
1634      * index, and relying on registerWorker to irregularly scatter
1635      * them within array to avoid bias), checking each at least twice.
1636      * The scan terminates upon either finding a non-empty queue, or
1637      * completing the sweep. If the worker is not inactivated, it
1638      * takes and returns a task from this queue. Otherwise, if not
1639      * activated, it signals workers (that may include itself) and
1640      * returns so caller can retry. Also returns for true if the
1641      * worker array may have changed during an empty scan.  On failure
1642      * to find a task, we take one of the following actions, after
1643      * which the caller will retry calling this method unless
1644      * terminated.
1645      *
1646      * * If pool is terminating, terminate the worker.
1647      *
1648      * * If not already enqueued, try to inactivate and enqueue the
1649      * worker on wait queue. Or, if inactivating has caused the pool
1650      * to be quiescent, relay to idleAwaitWork to possibly shrink
1651      * pool.
1652      *
1653      * * If already enqueued and none of the above apply, possibly
1654      * park awaiting signal, else lingering to help scan and signal.
1655      *
1656      * * If a non-empty queue discovered or left as a hint,
1657      * help wake up other workers before return.
1658      *
1659      * @param w the worker (via its WorkQueue)
1660      * @return a task or null if none found
1661      */
1662     private final ForkJoinTask<?> scan(WorkQueue w) {
1663         WorkQueue[] ws; int m;
1664         int ps = plock;                          // read plock before ws
1665         if (w != null && (ws = workQueues) != null && (m = ws.length - 1) >= 0) {
1666             int ec = w.eventCount;               // ec is negative if inactive
1667             int r = w.seed; r ^= r << 13; r ^= r >>> 17; w.seed = r ^= r << 5;
1668             w.hint = -1;                         // update seed and clear hint
1669             int j = ((m + m + 1) | MIN_SCAN) & MAX_SCAN;
1670             do {
1671                 WorkQueue q; ForkJoinTask<?>[] a; int b;
1672                 if ((q = ws[(r + j) & m]) != null && (b = q.base) - q.top < 0 &&
1673                     (a = q.array) != null) {     // probably nonempty
1674                     int i = (((a.length - 1) & b) << ASHIFT) + ABASE;
1675                     ForkJoinTask<?> t = (ForkJoinTask<?>)
1676                         U.getObjectVolatile(a, i);
1677                     if (q.base == b && ec >= 0 && t != null &&
1678                         U.compareAndSwapObject(a, i, t, null)) {
1679                         if ((q.base = b + 1) - q.top < 0)
1680                             signalWork(q);
1681                         return t;                // taken
1682                     }
1683                     else if ((ec < 0 || j < m) && (int)(ctl >> AC_SHIFT) <= 0) {
1684                         w.hint = (r + j) & m;    // help signal below
1685                         break;                   // cannot take
1686                     }
1687                 }
1688             } while (--j >= 0);
1689 
1690             int h, e, ns; long c, sc; WorkQueue q;
1691             if ((ns = w.nsteals) != 0) {
1692                 if (U.compareAndSwapLong(this, STEALCOUNT,
1693                                          sc = stealCount, sc + ns))
1694                     w.nsteals = 0;               // collect steals and rescan
1695             }
1696             else if (plock != ps)                // consistency check
1697                 ;                                // skip
1698             else if ((e = (int)(c = ctl)) < 0)
1699                 w.qlock = -1;                    // pool is terminating
1700             else {
1701                 if ((h = w.hint) < 0) {
1702                     if (ec >= 0) {               // try to enqueue/inactivate
1703                         long nc = (((long)ec |
1704                                     ((c - AC_UNIT) & (AC_MASK|TC_MASK))));
1705                         w.nextWait = e;          // link and mark inactive
1706                         w.eventCount = ec | INT_SIGN;
1707                         if (ctl != c || !U.compareAndSwapLong(this, CTL, c, nc))
1708                             w.eventCount = ec;   // unmark on CAS failure
1709                         else if ((int)(c >> AC_SHIFT) == 1 - (config & SMASK))
1710                             idleAwaitWork(w, nc, c);
1711                     }
1712                     else if (w.eventCount < 0 && ctl == c) {

1713                         Thread wt = Thread.currentThread();
1714                         Thread.interrupted();    // clear status
1715                         U.putObject(wt, PARKBLOCKER, this);
1716                         w.parker = wt;           // emulate LockSupport.park
1717                         if (w.eventCount < 0)    // recheck
1718                             U.park(false, 0L);   // block
1719                         w.parker = null;
1720                         U.putObject(wt, PARKBLOCKER, null);
1721                     }
1722                 }
1723                 if ((h >= 0 || (h = w.hint) >= 0) &&
1724                     (ws = workQueues) != null && h < ws.length &&
1725                     (q = ws[h]) != null) {      // signal others before retry
1726                     WorkQueue v; Thread p; int u, i, s;
1727                     for (int n = (config & SMASK) - 1;;) {
1728                         int idleCount = (w.eventCount < 0) ? 0 : -1;
1729                         if (((s = idleCount - q.base + q.top) <= n &&
1730                              (n = s) <= 0) ||
1731                             (u = (int)((c = ctl) >>> 32)) >= 0 ||
1732                             (e = (int)c) <= 0 || m < (i = e & SMASK) ||
1733                             (v = ws[i]) == null)
1734                             break;
1735                         long nc = (((long)(v.nextWait & E_MASK)) |
1736                                    ((long)(u + UAC_UNIT) << 32));
1737                         if (v.eventCount != (e | INT_SIGN) ||
1738                             !U.compareAndSwapLong(this, CTL, c, nc))
1739                             break;
1740                         v.hint = h;
1741                         v.eventCount = (e + E_SEQ) & E_MASK;
1742                         if ((p = v.parker) != null)
1743                             U.unpark(p);
1744                         if (--n <= 0)
1745                             break;
1746                     }
1747                 }
1748             }
1749         }
1750         return null;
1751     }
1752 
1753     /**
1754      * If inactivating worker w has caused the pool to become
1755      * quiescent, checks for pool termination, and, so long as this is
1756      * not the only worker, waits for event for up to a given
1757      * duration.  On timeout, if ctl has not changed, terminates the
1758      * worker, which will in turn wake up another worker to possibly
1759      * repeat this process.
1760      *
1761      * @param w the calling worker
1762      * @param currentCtl the ctl value triggering possible quiescence
1763      * @param prevCtl the ctl value to restore if thread is terminated
1764      */
1765     private void idleAwaitWork(WorkQueue w, long currentCtl, long prevCtl) {
1766         if (w != null && w.eventCount < 0 &&
1767             !tryTerminate(false, false) && (int)prevCtl != 0 &&
1768             ctl == currentCtl) {
1769             int dc = -(short)(currentCtl >>> TC_SHIFT);
1770             long parkTime = dc < 0 ? FAST_IDLE_TIMEOUT: (dc + 1) * IDLE_TIMEOUT;
1771             long deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
1772             Thread wt = Thread.currentThread();
1773             while (ctl == currentCtl) {
1774                 Thread.interrupted();  // timed variant of version in scan()
1775                 U.putObject(wt, PARKBLOCKER, this);
1776                 w.parker = wt;
1777                 if (ctl == currentCtl)
1778                     U.park(false, parkTime);
1779                 w.parker = null;
1780                 U.putObject(wt, PARKBLOCKER, null);
1781                 if (ctl != currentCtl)
1782                     break;
1783                 if (deadline - System.nanoTime() <= 0L &&
1784                     U.compareAndSwapLong(this, CTL, currentCtl, prevCtl)) {
1785                     w.eventCount = (w.eventCount + E_SEQ) | E_MASK;
1786                     w.hint = -1;
1787                     w.qlock = -1;   // shrink
1788                     break;
1789                 }
1790             }
1791         }
1792     }
1793 
1794     /**
1795      * Scans through queues looking for work while joining a task; if
1796      * any present, signals. May return early if more signalling is
1797      * detectably unneeded.
1798      *
1799      * @param task return early if done
1800      * @param origin an index to start scan
1801      */
1802     private void helpSignal(ForkJoinTask<?> task, int origin) {
1803         WorkQueue[] ws; WorkQueue w; Thread p; long c; int m, u, e, i, s;
1804         if (task != null && task.status >= 0 &&
1805             (u = (int)(ctl >>> 32)) < 0 && (u >> UAC_SHIFT) < 0 &&
1806             (ws = workQueues) != null && (m = ws.length - 1) >= 0) {
1807             outer: for (int k = origin, j = m; j >= 0; --j) {
1808                 WorkQueue q = ws[k++ & m];
1809                 for (int n = m;;) { // limit to at most m signals
1810                     if (task.status < 0)
1811                         break outer;
1812                     if (q == null ||
1813                         ((s = -q.base + q.top) <= n && (n = s) <= 0))
1814                         break;
1815                     if ((u = (int)((c = ctl) >>> 32)) >= 0 ||
1816                         (e = (int)c) <= 0 || m < (i = e & SMASK) ||
1817                         (w = ws[i]) == null)
1818                         break outer;
1819                     long nc = (((long)(w.nextWait & E_MASK)) |
1820                                ((long)(u + UAC_UNIT) << 32));
1821                     if (w.eventCount != (e | INT_SIGN))
1822                         break outer;
1823                     if (U.compareAndSwapLong(this, CTL, c, nc)) {
1824                         w.eventCount = (e + E_SEQ) & E_MASK;
1825                         if ((p = w.parker) != null)
1826                             U.unpark(p);
1827                         if (--n <= 0)
1828                             break;
1829                     }
1830                 }
1831             }
1832         }
1833     }
1834 
1835     /**
1836      * Tries to locate and execute tasks for a stealer of the given
1837      * task, or in turn one of its stealers, Traces currentSteal ->
1838      * currentJoin links looking for a thread working on a descendant
1839      * of the given task and with a non-empty queue to steal back and
1840      * execute tasks from. The first call to this method upon a
1841      * waiting join will often entail scanning/search, (which is OK
1842      * because the joiner has nothing better to do), but this method
1843      * leaves hints in workers to speed up subsequent calls. The
1844      * implementation is very branchy to cope with potential
1845      * inconsistencies or loops encountering chains that are stale,
1846      * unknown, or so long that they are likely cyclic.
1847      *
1848      * @param joiner the joining worker
1849      * @param task the task to join
1850      * @return 0 if no progress can be made, negative if task
1851      * known complete, else positive
1852      */
1853     private int tryHelpStealer(WorkQueue joiner, ForkJoinTask<?> task) {
1854         int stat = 0, steps = 0;                    // bound to avoid cycles
1855         if (joiner != null && task != null) {       // hoist null checks
1856             restart: for (;;) {
1857                 ForkJoinTask<?> subtask = task;     // current target
1858                 for (WorkQueue j = joiner, v;;) {   // v is stealer of subtask
1859                     WorkQueue[] ws; int m, s, h;
1860                     if ((s = task.status) < 0) {
1861                         stat = s;
1862                         break restart;
1863                     }
1864                     if ((ws = workQueues) == null || (m = ws.length - 1) <= 0)
1865                         break restart;              // shutting down
1866                     if ((v = ws[h = (j.hint | 1) & m]) == null ||
1867                         v.currentSteal != subtask) {
1868                         for (int origin = h;;) {    // find stealer
1869                             if (((h = (h + 2) & m) & 15) == 1 &&
1870                                 (subtask.status < 0 || j.currentJoin != subtask))
1871                                 continue restart;   // occasional staleness check
1872                             if ((v = ws[h]) != null &&
1873                                 v.currentSteal == subtask) {
1874                                 j.hint = h;        // save hint
1875                                 break;
1876                             }
1877                             if (h == origin)
1878                                 break restart;      // cannot find stealer
1879                         }
1880                     }
1881                     for (;;) { // help stealer or descend to its stealer
1882                         ForkJoinTask[] a;  int b;
1883                         if (subtask.status < 0)     // surround probes with
1884                             continue restart;       //   consistency checks
1885                         if ((b = v.base) - v.top < 0 && (a = v.array) != null) {
1886                             int i = (((a.length - 1) & b) << ASHIFT) + ABASE;
1887                             ForkJoinTask<?> t =
1888                                 (ForkJoinTask<?>)U.getObjectVolatile(a, i);
1889                             if (subtask.status < 0 || j.currentJoin != subtask ||
1890                                 v.currentSteal != subtask)
1891                                 continue restart;   // stale
1892                             stat = 1;               // apparent progress
1893                             if (t != null && v.base == b &&
1894                                 U.compareAndSwapObject(a, i, t, null)) {
1895                                 v.base = b + 1;     // help stealer
1896                                 joiner.runSubtask(t);
1897                             }
1898                             else if (v.base == b && ++steps == MAX_HELP)
1899                                 break restart;      // v apparently stalled
1900                         }
1901                         else {                      // empty -- try to descend
1902                             ForkJoinTask<?> next = v.currentJoin;
1903                             if (subtask.status < 0 || j.currentJoin != subtask ||
1904                                 v.currentSteal != subtask)
1905                                 continue restart;   // stale
1906                             else if (next == null || ++steps == MAX_HELP)
1907                                 break restart;      // dead-end or maybe cyclic
1908                             else {
1909                                 subtask = next;
1910                                 j = v;
1911                                 break;
1912                             }
1913                         }
1914                     }
1915                 }
1916             }
1917         }
1918         return stat;
1919     }
1920 
1921     /**
1922      * Analog of tryHelpStealer for CountedCompleters. Tries to steal
1923      * and run tasks within the target's computation.
1924      *
1925      * @param task the task to join
1926      * @param mode if shared, exit upon completing any task
1927      * if all workers are active

1928      */
1929     private int helpComplete(ForkJoinTask<?> task, int mode) {
1930         WorkQueue[] ws; WorkQueue q; int m, n, s, u;
1931         if (task != null && (ws = workQueues) != null &&
1932             (m = ws.length - 1) >= 0) {
1933             for (int j = 1, origin = j;;) {
1934                 if ((s = task.status) < 0)
1935                     return s;
1936                 if ((q = ws[j & m]) != null && q.pollAndExecCC(task)) {
1937                     origin = j;
1938                     if (mode == SHARED_QUEUE &&
1939                         ((u = (int)(ctl >>> 32)) >= 0 || (u >> UAC_SHIFT) >= 0))
1940                         break;
1941                 }
1942                 else if ((j = (j + 2) & m) == origin)
1943                     break;
1944             }
1945         }
1946         return 0;
1947     }
1948 
1949     /**
1950      * Tries to decrement active count (sometimes implicitly) and
1951      * possibly release or create a compensating worker in preparation
1952      * for blocking. Fails on contention or termination. Otherwise,
1953      * adds a new thread if no idle workers are available and pool
1954      * may become starved.
1955      */
1956     final boolean tryCompensate() {
1957         int pc = config & SMASK, e, i, tc; long c;
1958         WorkQueue[] ws; WorkQueue w; Thread p;
1959         if ((ws = workQueues) != null && (e = (int)(c = ctl)) >= 0) {
1960             if (e != 0 && (i = e & SMASK) < ws.length &&
1961                 (w = ws[i]) != null && w.eventCount == (e | INT_SIGN)) {
1962                 long nc = ((long)(w.nextWait & E_MASK) |
1963                            (c & (AC_MASK|TC_MASK)));
1964                 if (U.compareAndSwapLong(this, CTL, c, nc)) {
1965                     w.eventCount = (e + E_SEQ) & E_MASK;
1966                     if ((p = w.parker) != null)
1967                         U.unpark(p);
1968                     return true;   // replace with idle worker
1969                 }
1970             }
1971             else if ((tc = (short)(c >>> TC_SHIFT)) >= 0 &&
1972                      (int)(c >> AC_SHIFT) + pc > 1) {
1973                 long nc = ((c - AC_UNIT) & AC_MASK) | (c & ~AC_MASK);
1974                 if (U.compareAndSwapLong(this, CTL, c, nc))
1975                     return true;   // no compensation
1976             }
1977             else if (tc + pc < MAX_CAP) {
1978                 long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK);
1979                 if (U.compareAndSwapLong(this, CTL, c, nc)) {
1980                     ForkJoinWorkerThreadFactory fac;
1981                     Throwable ex = null;
1982                     ForkJoinWorkerThread wt = null;
1983                     try {
1984                         if ((fac = factory) != null &&
1985                             (wt = fac.newThread(this)) != null) {
1986                             wt.start();
1987                             return true;
1988                         }
1989                     } catch (Throwable rex) {
1990                         ex = rex;
1991                     }
1992                     deregisterWorker(wt, ex); // clean up and return false
1993                 }
1994             }
1995         }
1996         return false;
1997     }
1998 
1999     /**
2000      * Helps and/or blocks until the given task is done.
2001      *
2002      * @param joiner the joining worker
2003      * @param task the task
2004      * @return task status on exit
2005      */
2006     final int awaitJoin(WorkQueue joiner, ForkJoinTask<?> task) {
2007         int s = 0;
2008         if (joiner != null && task != null && (s = task.status) >= 0) {
2009             ForkJoinTask<?> prevJoin = joiner.currentJoin;
2010             joiner.currentJoin = task;
2011             do {} while ((s = task.status) >= 0 && !joiner.isEmpty() &&
2012                          joiner.tryRemoveAndExec(task)); // process local tasks
2013             if (s >= 0 && (s = task.status) >= 0) {
2014                 helpSignal(task, joiner.poolIndex);
2015                 if ((s = task.status) >= 0 &&
2016                     (task instanceof CountedCompleter))
2017                     s = helpComplete(task, LIFO_QUEUE);
2018             }
2019             while (s >= 0 && (s = task.status) >= 0) {
2020                 if ((!joiner.isEmpty() ||           // try helping
2021                      (s = tryHelpStealer(joiner, task)) == 0) &&
2022                     (s = task.status) >= 0) {
2023                     helpSignal(task, joiner.poolIndex);
2024                     if ((s = task.status) >= 0 && tryCompensate()) {
2025                         if (task.trySetSignal() && (s = task.status) >= 0) {
2026                             synchronized (task) {
2027                                 if (task.status >= 0) {
2028                                     try {                // see ForkJoinTask
2029                                         task.wait();     //  for explanation
2030                                     } catch (InterruptedException ie) {
2031                                     }
2032                                 }
2033                                 else
2034                                     task.notifyAll();
2035                             }
2036                         }
2037                         long c;                          // re-activate
2038                         do {} while (!U.compareAndSwapLong
2039                                      (this, CTL, c = ctl, c + AC_UNIT));
2040                     }
2041                 }
2042             }
2043             joiner.currentJoin = prevJoin;
2044         }
2045         return s;
2046     }
2047 
2048     /**
2049      * Stripped-down variant of awaitJoin used by timed joins. Tries
2050      * to help join only while there is continuous progress. (Caller
2051      * will then enter a timed wait.)
2052      *
2053      * @param joiner the joining worker
2054      * @param task the task
2055      */
2056     final void helpJoinOnce(WorkQueue joiner, ForkJoinTask<?> task) {
2057         int s;
2058         if (joiner != null && task != null && (s = task.status) >= 0) {
2059             ForkJoinTask<?> prevJoin = joiner.currentJoin;
2060             joiner.currentJoin = task;
2061             do {} while ((s = task.status) >= 0 && !joiner.isEmpty() &&
2062                          joiner.tryRemoveAndExec(task));
2063             if (s >= 0 && (s = task.status) >= 0) {
2064                 helpSignal(task, joiner.poolIndex);
2065                 if ((s = task.status) >= 0 &&
2066                     (task instanceof CountedCompleter))
2067                     s = helpComplete(task, LIFO_QUEUE);
2068             }
2069             if (s >= 0 && joiner.isEmpty()) {
2070                 do {} while (task.status >= 0 &&
2071                              tryHelpStealer(joiner, task) > 0);
2072             }
2073             joiner.currentJoin = prevJoin;
2074         }
2075     }
2076 
2077     /**
2078      * Returns a (probably) non-empty steal queue, if one is found
2079      * during a scan, else null.  This method must be retried by
2080      * caller if, by the time it tries to use the queue, it is empty.

2081      * @param r a (random) seed for scanning
2082      */
2083     private WorkQueue findNonEmptyStealQueue(int r) {
2084         for (;;) {
2085             int ps = plock, m; WorkQueue[] ws; WorkQueue q;
2086             if ((ws = workQueues) != null && (m = ws.length - 1) >= 0) {
2087                 for (int j = (m + 1) << 2; j >= 0; --j) {
2088                     if ((q = ws[(((r + j) << 1) | 1) & m]) != null &&
2089                         q.base - q.top < 0)



2090                         return q;
2091                 }
2092             }
2093             if (plock == ps)
2094                 return null;

2095         }
2096     }


2097 
2098     /**
2099      * Runs tasks until {@code isQuiescent()}. We piggyback on
2100      * active count ctl maintenance, but rather than blocking
2101      * when tasks cannot be found, we rescan until all others cannot
2102      * find tasks either.
2103      */
2104     final void helpQuiescePool(WorkQueue w) {
2105         for (boolean active = true;;) {
2106             long c; WorkQueue q; ForkJoinTask<?> t; int b;
2107             while ((t = w.nextLocalTask()) != null) {
2108                 if (w.base - w.top < 0)
2109                     signalWork(w);
2110                 t.doExec();
2111             }
2112             if ((q = findNonEmptyStealQueue(w.nextSeed())) != null) {
2113                 if (!active) {      // re-establish active count

2114                     active = true;
2115                     do {} while (!U.compareAndSwapLong
2116                                  (this, CTL, c = ctl, c + AC_UNIT));
2117                 }
2118                 if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) {
2119                     if (q.base - q.top < 0)
2120                         signalWork(q);
2121                     w.runSubtask(t);
2122                 }
2123             }
2124             else if (active) {       // decrement active count without queuing
2125                 long nc = (c = ctl) - AC_UNIT;
2126                 if ((int)(nc >> AC_SHIFT) + (config & SMASK) == 0)
2127                     return;          // bypass decrement-then-increment
2128                 if (U.compareAndSwapLong(this, CTL, c, nc))
2129                     active = false;


2130             }
2131             else if ((int)((c = ctl) >> AC_SHIFT) + (config & SMASK) == 0 &&
2132                      U.compareAndSwapLong(this, CTL, c, c + AC_UNIT))
2133                 return;



2134         }
2135     }


2136 
2137     /**
2138      * Gets and removes a local or stolen task for the given worker.
2139      *
2140      * @return a task, if available
2141      */
2142     final ForkJoinTask<?> nextTaskFor(WorkQueue w) {
2143         for (ForkJoinTask<?> t;;) {
2144             WorkQueue q; int b;
2145             if ((t = w.nextLocalTask()) != null)
2146                 return t;
2147             if ((q = findNonEmptyStealQueue(w.nextSeed())) == null)
2148                 return null;
2149             if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) {
2150                 if (q.base - q.top < 0)
2151                     signalWork(q);
2152                 return t;
2153             }
2154         }
2155     }
2156 
2157     /**
2158      * Returns a cheap heuristic guide for task partitioning when
2159      * programmers, frameworks, tools, or languages have little or no
2160      * idea about task granularity.  In essence by offering this
2161      * method, we ask users only about tradeoffs in overhead vs
2162      * expected throughput and its variance, rather than how finely to
2163      * partition tasks.
2164      *
2165      * In a steady state strict (tree-structured) computation, each
2166      * thread makes available for stealing enough tasks for other
2167      * threads to remain active. Inductively, if all threads play by
2168      * the same rules, each thread should make available only a
2169      * constant number of tasks.
2170      *
2171      * The minimum useful constant is just 1. But using a value of 1
2172      * would require immediate replenishment upon each steal to
2173      * maintain enough tasks, which is infeasible.  Further,
2174      * partitionings/granularities of offered tasks should minimize
2175      * steal rates, which in general means that threads nearer the top
2176      * of computation tree should generate more than those nearer the
2177      * bottom. In perfect steady state, each thread is at
2178      * approximately the same level of computation tree. However,
2179      * producing extra tasks amortizes the uncertainty of progress and
2180      * diffusion assumptions.
2181      *
2182      * So, users will want to use values larger (but not much larger)
2183      * than 1 to both smooth over transient shortages and hedge
2184      * against uneven progress; as traded off against the cost of
2185      * extra task overhead. We leave the user to pick a threshold
2186      * value to compare with the results of this call to guide
2187      * decisions, but recommend values such as 3.
2188      *
2189      * When all threads are active, it is on average OK to estimate
2190      * surplus strictly locally. In steady-state, if one thread is
2191      * maintaining say 2 surplus tasks, then so are others. So we can
2192      * just use estimated queue length.  However, this strategy alone
2193      * leads to serious mis-estimates in some non-steady-state
2194      * conditions (ramp-up, ramp-down, other stalls). We can detect
2195      * many of these by further considering the number of "idle"
2196      * threads, that are known to have zero queued tasks, so
2197      * compensate by a factor of (#idle/#active) threads.
2198      *
2199      * Note: The approximation of #busy workers as #active workers is
2200      * not very good under current signalling scheme, and should be
2201      * improved.
2202      */
2203     static int getSurplusQueuedTaskCount() {
2204         Thread t; ForkJoinWorkerThread wt; ForkJoinPool pool; WorkQueue q;
2205         if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)) {
2206             int p = (pool = (wt = (ForkJoinWorkerThread)t).pool).config & SMASK;
2207             int n = (q = wt.workQueue).top - q.base;
2208             int a = (int)(pool.ctl >> AC_SHIFT) + p;
2209             return n - (a > (p >>>= 1) ? 0 :
2210                         a > (p >>>= 1) ? 1 :
2211                         a > (p >>>= 1) ? 2 :
2212                         a > (p >>>= 1) ? 4 :
2213                         8);
2214         }
2215         return 0;
2216     }
2217 
2218     //  Termination
2219 
2220     /**
2221      * Possibly initiates and/or completes termination.  The caller
2222      * triggering termination runs three passes through workQueues:
2223      * (0) Setting termination status, followed by wakeups of queued
2224      * workers; (1) cancelling all tasks; (2) interrupting lagging
2225      * threads (likely in external tasks, but possibly also blocked in
2226      * joins).  Each pass repeats previous steps because of potential
2227      * lagging thread creation.
2228      *
2229      * @param now if true, unconditionally terminate, else only
2230      * if no work and no active workers
2231      * @param enable if true, enable shutdown when next possible
2232      * @return true if now terminating or terminated
2233      */
2234     private boolean tryTerminate(boolean now, boolean enable) {
2235         int ps;
2236         if (this == common)                    // cannot shut down
2237             return false;
2238         if ((ps = plock) >= 0) {                   // enable by setting plock
2239             if (!enable)
2240                 return false;
2241             if ((ps & PL_LOCK) != 0 ||
2242                 !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
2243                 ps = acquirePlock();
2244             int nps = ((ps + PL_LOCK) & ~SHUTDOWN) | SHUTDOWN;
2245             if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
2246                 releasePlock(nps);
2247         }
2248         for (long c;;) {
2249             if (((c = ctl) & STOP_BIT) != 0) {     // already terminating
2250                 if ((short)(c >>> TC_SHIFT) == -(config & SMASK)) {
2251                     synchronized (this) {
2252                         notifyAll();               // signal when 0 workers
2253                     }
2254                 }
2255                 return true;
2256             }










2257             if (!now) {                            // check if idle & no tasks
2258                 WorkQueue[] ws; WorkQueue w;
2259                 if ((int)(c >> AC_SHIFT) != -(config & SMASK))
2260                     return false;
2261                 if ((ws = workQueues) != null) {
2262                     for (int i = 0; i < ws.length; ++i) {
2263                         if ((w = ws[i]) != null) {
2264                             if (!w.isEmpty()) {    // signal unprocessed tasks
2265                                 signalWork(w);
2266                                 return false;
2267                             }
2268                             if ((i & 1) != 0 && w.eventCount >= 0)
2269                                 return false;      // unqueued inactive worker
2270                         }
2271                     }
2272                 }
2273             }
2274             if (U.compareAndSwapLong(this, CTL, c, c | STOP_BIT)) {
2275                 for (int pass = 0; pass < 3; ++pass) {
2276                     WorkQueue[] ws; WorkQueue w; Thread wt;
2277                     if ((ws = workQueues) != null) {

2278                         int n = ws.length;
2279                         for (int i = 0; i < n; ++i) {
2280                             if ((w = ws[i]) != null) {
2281                                 w.qlock = -1;
2282                                 if (pass > 0) {
2283                                     w.cancelAll();
2284                                     if (pass > 1 && (wt = w.owner) != null) {
2285                                         if (!wt.isInterrupted()) {
2286                                             try {
2287                                                 wt.interrupt();
2288                                             } catch (Throwable ignore) {
2289                                             }
2290                                         }
2291                                         U.unpark(wt);
2292                                     }
2293                                 }
2294                             }
2295                         }
2296                         // Wake up workers parked on event queue
2297                         int i, e; long cc; Thread p;
2298                         while ((e = (int)(cc = ctl) & E_MASK) != 0 &&
2299                                (i = e & SMASK) < n && i >= 0 &&
2300                                (w = ws[i]) != null) {
2301                             long nc = ((long)(w.nextWait & E_MASK) |
2302                                        ((cc + AC_UNIT) & AC_MASK) |
2303                                        (cc & (TC_MASK|STOP_BIT)));
2304                             if (w.eventCount == (e | INT_SIGN) &&
2305                                 U.compareAndSwapLong(this, CTL, cc, nc)) {
2306                                 w.eventCount = (e + E_SEQ) & E_MASK;
2307                                 w.qlock = -1;
2308                                 if ((p = w.parker) != null)
2309                                     U.unpark(p);
2310                             }
2311                         }
2312                     }
2313                 }
2314             }
2315         }
2316     }
2317 
2318     // external operations on common pool
2319 
2320     /**
2321      * Returns common pool queue for a thread that has submitted at
2322      * least one task.
2323      */
2324     static WorkQueue commonSubmitterQueue() {
2325         ForkJoinPool p; WorkQueue[] ws; int m, z;
2326         return ((z = ThreadLocalRandom.getProbe()) != 0 &&
2327                 (p = common) != null &&
2328                 (ws = p.workQueues) != null &&
2329                 (m = ws.length - 1) >= 0) ?
2330             ws[m & z & SQMASK] : null;
2331     }
2332 
2333     /**
2334      * Tries to pop the given task from submitter's queue in common pool.
2335      */
2336     static boolean tryExternalUnpush(ForkJoinTask<?> t) {
2337         ForkJoinPool p; WorkQueue[] ws; WorkQueue q;
2338         ForkJoinTask<?>[] a;  int m, s, z;
2339         if (t != null &&
2340             (z = ThreadLocalRandom.getProbe()) != 0 &&
2341             (p = common) != null &&
2342             (ws = p.workQueues) != null &&
2343             (m = ws.length - 1) >= 0 &&
2344             (q = ws[m & z & SQMASK]) != null &&
2345             (s = q.top) != q.base &&
2346             (a = q.array) != null) {
2347             long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE;
2348             if (U.getObject(a, j) == t &&
2349                 U.compareAndSwapInt(q, QLOCK, 0, 1)) {
2350                 if (q.array == a && q.top == s && // recheck
2351                     U.compareAndSwapObject(a, j, t, null)) {
2352                     q.top = s - 1;
2353                     q.qlock = 0;
2354                     return true;
2355                 }
2356                 q.qlock = 0;
2357             }
2358         }
2359         return false;
2360     }
2361 
2362     /**
2363      * Tries to pop and run local tasks within the same computation
2364      * as the given root. On failure, tries to help complete from
2365      * other queues via helpComplete.
2366      */
2367     private void externalHelpComplete(WorkQueue q, ForkJoinTask<?> root) {
2368         ForkJoinTask<?>[] a; int m;
2369         if (q != null && (a = q.array) != null && (m = (a.length - 1)) >= 0 &&
2370             root != null && root.status >= 0) {
2371             for (;;) {
2372                 int s, u; Object o; CountedCompleter<?> task = null;
2373                 if ((s = q.top) - q.base > 0) {
2374                     long j = ((m & (s - 1)) << ASHIFT) + ABASE;
2375                     if ((o = U.getObject(a, j)) != null &&
2376                         (o instanceof CountedCompleter)) {
2377                         CountedCompleter<?> t = (CountedCompleter<?>)o, r = t;
2378                         do {
2379                             if (r == root) {
2380                                 if (U.compareAndSwapInt(q, QLOCK, 0, 1)) {
2381                                     if (q.array == a && q.top == s &&
2382                                         U.compareAndSwapObject(a, j, t, null)) {
2383                                         q.top = s - 1;
2384                                         task = t;
2385                                     }
2386                                     q.qlock = 0;
2387                                 }
2388                                 break;
2389                             }
2390                         } while ((r = r.completer) != null);
2391                     }
2392                 }
2393                 if (task != null)
2394                     task.doExec();
2395                 if (root.status < 0 ||
2396                     (config != 0 &&
2397                      ((u = (int)(ctl >>> 32)) >= 0 || (u >> UAC_SHIFT) >= 0)))
2398                     break;
2399                if (task == null) {
2400                     helpSignal(root, q.poolIndex);
2401                     if (root.status >= 0)
2402                         helpComplete(root, SHARED_QUEUE);
2403                     break;
2404                 }
2405             }
2406         }
2407     }
2408 
2409     /**
2410      * Tries to help execute or signal availability of the given task
2411      * from submitter's queue in common pool.
2412      */
2413     static void externalHelpJoin(ForkJoinTask<?> t) {
2414         // Some hard-to-avoid overlap with tryExternalUnpush
2415         ForkJoinPool p; WorkQueue[] ws; WorkQueue q, w;
2416         ForkJoinTask<?>[] a;  int m, s, n, z;
2417         if (t != null &&
2418             (z = ThreadLocalRandom.getProbe()) != 0 &&
2419             (p = common) != null &&
2420             (ws = p.workQueues) != null &&
2421             (m = ws.length - 1) >= 0 &&
2422             (q = ws[m & z & SQMASK]) != null &&
2423             (a = q.array) != null) {
2424             int am = a.length - 1;
2425             if ((s = q.top) != q.base) {
2426                 long j = ((am & (s - 1)) << ASHIFT) + ABASE;
2427                 if (U.getObject(a, j) == t &&
2428                     U.compareAndSwapInt(q, QLOCK, 0, 1)) {
2429                     if (q.array == a && q.top == s &&
2430                         U.compareAndSwapObject(a, j, t, null)) {
2431                         q.top = s - 1;
2432                         q.qlock = 0;
2433                         t.doExec();
2434                     }
2435                     else
2436                         q.qlock = 0;
2437                 }
2438             }
2439             if (t.status >= 0) {
2440                 if (t instanceof CountedCompleter)
2441                     p.externalHelpComplete(q, t);
2442                 else
2443                     p.helpSignal(t, q.poolIndex);
2444             }
2445         }
2446     }
2447 












2448     // Exported methods
2449 
2450     // Constructors
2451 
2452     /**
2453      * Creates a {@code ForkJoinPool} with parallelism equal to {@link
2454      * java.lang.Runtime#availableProcessors}, using the {@linkplain
2455      * #defaultForkJoinWorkerThreadFactory default thread factory},
2456      * no UncaughtExceptionHandler, and non-async LIFO processing mode.
2457      *
2458      * @throws SecurityException if a security manager exists and
2459      *         the caller is not permitted to modify threads
2460      *         because it does not hold {@link
2461      *         java.lang.RuntimePermission}{@code ("modifyThread")}
2462      */
2463     public ForkJoinPool() {
2464         this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
2465              defaultForkJoinWorkerThreadFactory, null, false);
2466     }
2467 
2468     /**
2469      * Creates a {@code ForkJoinPool} with the indicated parallelism
2470      * level, the {@linkplain
2471      * #defaultForkJoinWorkerThreadFactory default thread factory},
2472      * no UncaughtExceptionHandler, and non-async LIFO processing mode.
2473      *
2474      * @param parallelism the parallelism level
2475      * @throws IllegalArgumentException if parallelism less than or
2476      *         equal to zero, or greater than implementation limit
2477      * @throws SecurityException if a security manager exists and
2478      *         the caller is not permitted to modify threads
2479      *         because it does not hold {@link
2480      *         java.lang.RuntimePermission}{@code ("modifyThread")}
2481      */
2482     public ForkJoinPool(int parallelism) {
2483         this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
2484     }
2485 
2486     /**
2487      * Creates a {@code ForkJoinPool} with the given parameters.
2488      *
2489      * @param parallelism the parallelism level. For default value,
2490      * use {@link java.lang.Runtime#availableProcessors}.
2491      * @param factory the factory for creating new threads. For default value,
2492      * use {@link #defaultForkJoinWorkerThreadFactory}.
2493      * @param handler the handler for internal worker threads that
2494      * terminate due to unrecoverable errors encountered while executing
2495      * tasks. For default value, use {@code null}.
2496      * @param asyncMode if true,
2497      * establishes local first-in-first-out scheduling mode for forked
2498      * tasks that are never joined. This mode may be more appropriate
2499      * than default locally stack-based mode in applications in which
2500      * worker threads only process event-style asynchronous tasks.
2501      * For default value, use {@code false}.
2502      * @throws IllegalArgumentException if parallelism less than or
2503      *         equal to zero, or greater than implementation limit
2504      * @throws NullPointerException if the factory is null
2505      * @throws SecurityException if a security manager exists and
2506      *         the caller is not permitted to modify threads
2507      *         because it does not hold {@link
2508      *         java.lang.RuntimePermission}{@code ("modifyThread")}
2509      */
2510     public ForkJoinPool(int parallelism,
2511                         ForkJoinWorkerThreadFactory factory,
2512                         UncaughtExceptionHandler handler,
2513                         boolean asyncMode) {
2514         this(checkParallelism(parallelism),
2515              checkFactory(factory),
2516              handler,
2517              asyncMode,
2518              "ForkJoinPool-" + nextPoolId() + "-worker-");
2519         checkPermission();
2520     }
2521 
2522     private static int checkParallelism(int parallelism) {
2523         if (parallelism <= 0 || parallelism > MAX_CAP)
2524             throw new IllegalArgumentException();
2525         return parallelism;









2526     }
2527 
2528     private static ForkJoinWorkerThreadFactory checkFactory
2529         (ForkJoinWorkerThreadFactory factory) {
2530         if (factory == null)
2531             throw new NullPointerException();
2532         return factory;
2533     }
2534 
2535     /**
2536      * Creates a {@code ForkJoinPool} with the given parameters, without
2537      * any security checks or parameter validation.  Invoked directly by
2538      * makeCommonPool.
2539      */
2540     private ForkJoinPool(int parallelism,
2541                          ForkJoinWorkerThreadFactory factory,
2542                          UncaughtExceptionHandler handler,
2543                          boolean asyncMode,
2544                          String workerNamePrefix) {
2545         this.workerNamePrefix = workerNamePrefix;
2546         this.factory = factory;
2547         this.ueh = handler;
2548         this.config = parallelism | (asyncMode ? (FIFO_QUEUE << 16) : 0);
2549         long np = (long)(-parallelism); // offset ctl counts
2550         this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
2551     }
2552 
2553     /**
2554      * Returns the common pool instance. This pool is statically
2555      * constructed; its run state is unaffected by attempts to {@link
2556      * #shutdown} or {@link #shutdownNow}. However this pool and any
2557      * ongoing processing are automatically terminated upon program
2558      * {@link System#exit}.  Any program that relies on asynchronous
2559      * task processing to complete before program termination should
2560      * invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence},
2561      * before exit.
2562      *
2563      * @return the common pool instance
2564      * @since 1.8
2565      */
2566     public static ForkJoinPool commonPool() {
2567         // assert common != null : "static init error";
2568         return common;
2569     }
2570 
2571     // Execution methods
2572 
2573     /**
2574      * Performs the given task, returning its result upon completion.
2575      * If the computation encounters an unchecked Exception or Error,
2576      * it is rethrown as the outcome of this invocation.  Rethrown
2577      * exceptions behave in the same way as regular exceptions, but,
2578      * when possible, contain stack traces (as displayed for example
2579      * using {@code ex.printStackTrace()}) of both the current thread
2580      * as well as the thread actually encountering the exception;
2581      * minimally only the latter.
2582      *
2583      * @param task the task
2584      * @return the task's result
2585      * @throws NullPointerException if the task is null
2586      * @throws RejectedExecutionException if the task cannot be
2587      *         scheduled for execution
2588      */
2589     public <T> T invoke(ForkJoinTask<T> task) {
2590         if (task == null)
2591             throw new NullPointerException();
2592         externalPush(task);
2593         return task.join();
2594     }
2595 
2596     /**
2597      * Arranges for (asynchronous) execution of the given task.
2598      *
2599      * @param task the task
2600      * @throws NullPointerException if the task is null
2601      * @throws RejectedExecutionException if the task cannot be
2602      *         scheduled for execution
2603      */
2604     public void execute(ForkJoinTask<?> task) {
2605         if (task == null)
2606             throw new NullPointerException();
2607         externalPush(task);
2608     }
2609 
2610     // AbstractExecutorService methods
2611 
2612     /**
2613      * @throws NullPointerException if the task is null
2614      * @throws RejectedExecutionException if the task cannot be
2615      *         scheduled for execution
2616      */
2617     public void execute(Runnable task) {
2618         if (task == null)
2619             throw new NullPointerException();
2620         ForkJoinTask<?> job;
2621         if (task instanceof ForkJoinTask<?>) // avoid re-wrap
2622             job = (ForkJoinTask<?>) task;
2623         else
2624             job = new ForkJoinTask.RunnableExecuteAction(task);
2625         externalPush(job);
2626     }
2627 
2628     /**
2629      * Submits a ForkJoinTask for execution.
2630      *
2631      * @param task the task to submit
2632      * @return the task
2633      * @throws NullPointerException if the task is null
2634      * @throws RejectedExecutionException if the task cannot be
2635      *         scheduled for execution
2636      */
2637     public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
2638         if (task == null)
2639             throw new NullPointerException();
2640         externalPush(task);
2641         return task;
2642     }
2643 
2644     /**
2645      * @throws NullPointerException if the task is null
2646      * @throws RejectedExecutionException if the task cannot be
2647      *         scheduled for execution
2648      */
2649     public <T> ForkJoinTask<T> submit(Callable<T> task) {
2650         ForkJoinTask<T> job = new ForkJoinTask.AdaptedCallable<T>(task);
2651         externalPush(job);
2652         return job;
2653     }
2654 
2655     /**
2656      * @throws NullPointerException if the task is null
2657      * @throws RejectedExecutionException if the task cannot be
2658      *         scheduled for execution
2659      */
2660     public <T> ForkJoinTask<T> submit(Runnable task, T result) {
2661         ForkJoinTask<T> job = new ForkJoinTask.AdaptedRunnable<T>(task, result);
2662         externalPush(job);
2663         return job;
2664     }
2665 
2666     /**
2667      * @throws NullPointerException if the task is null
2668      * @throws RejectedExecutionException if the task cannot be
2669      *         scheduled for execution
2670      */
2671     public ForkJoinTask<?> submit(Runnable task) {
2672         if (task == null)
2673             throw new NullPointerException();
2674         ForkJoinTask<?> job;
2675         if (task instanceof ForkJoinTask<?>) // avoid re-wrap
2676             job = (ForkJoinTask<?>) task;
2677         else
2678             job = new ForkJoinTask.AdaptedRunnableAction(task);
2679         externalPush(job);
2680         return job;
2681     }
2682 
2683     /**
2684      * @throws NullPointerException       {@inheritDoc}
2685      * @throws RejectedExecutionException {@inheritDoc}
2686      */
2687     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
2688         // In previous versions of this class, this method constructed
2689         // a task to run ForkJoinTask.invokeAll, but now external
2690         // invocation of multiple tasks is at least as efficient.
2691         ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());




2692 
2693         boolean done = false;
2694         try {
2695             for (Callable<T> t : tasks) {
2696                 ForkJoinTask<T> f = new ForkJoinTask.AdaptedCallable<T>(t);
2697                 futures.add(f);
2698                 externalPush(f);

2699             }
2700             for (int i = 0, size = futures.size(); i < size; i++)
2701                 ((ForkJoinTask<?>)futures.get(i)).quietlyJoin();
2702             done = true;
2703             return futures;
2704         } finally {
2705             if (!done)
2706                 for (int i = 0, size = futures.size(); i < size; i++)
2707                     futures.get(i).cancel(false);
2708         }
2709     }
2710 
2711     /**
2712      * Returns the factory used for constructing new workers.
2713      *
2714      * @return the factory used for constructing new workers
2715      */
2716     public ForkJoinWorkerThreadFactory getFactory() {
2717         return factory;
2718     }
2719 
2720     /**
2721      * Returns the handler for internal worker threads that terminate
2722      * due to unrecoverable errors encountered while executing tasks.
2723      *
2724      * @return the handler, or {@code null} if none
2725      */
2726     public UncaughtExceptionHandler getUncaughtExceptionHandler() {
2727         return ueh;
2728     }
2729 
2730     /**
2731      * Returns the targeted parallelism level of this pool.
2732      *
2733      * @return the targeted parallelism level of this pool
2734      */
2735     public int getParallelism() {
2736         int par = (config & SMASK);
2737         return (par > 0) ? par : 1;
2738     }
2739 
2740     /**
2741      * Returns the targeted parallelism level of the common pool.
2742      *
2743      * @return the targeted parallelism level of the common pool
2744      * @since 1.8
2745      */
2746     public static int getCommonPoolParallelism() {
2747         return commonParallelism;
2748     }
2749 
2750     /**
2751      * Returns the number of worker threads that have started but not
2752      * yet terminated.  The result returned by this method may differ
2753      * from {@link #getParallelism} when threads are created to
2754      * maintain parallelism when others are cooperatively blocked.
2755      *
2756      * @return the number of worker threads
2757      */
2758     public int getPoolSize() {
2759         return (config & SMASK) + (short)(ctl >>> TC_SHIFT);
2760     }
2761 
2762     /**
2763      * Returns {@code true} if this pool uses local first-in-first-out
2764      * scheduling mode for forked tasks that are never joined.
2765      *
2766      * @return {@code true} if this pool uses async mode
2767      */
2768     public boolean getAsyncMode() {
2769         return (config >>> 16) == FIFO_QUEUE;
2770     }
2771 
2772     /**
2773      * Returns an estimate of the number of worker threads that are
2774      * not blocked waiting to join tasks or for other managed
2775      * synchronization. This method may overestimate the
2776      * number of running threads.
2777      *
2778      * @return the number of worker threads
2779      */
2780     public int getRunningThreadCount() {
2781         int rc = 0;
2782         WorkQueue[] ws; WorkQueue w;
2783         if ((ws = workQueues) != null) {
2784             for (int i = 1; i < ws.length; i += 2) {
2785                 if ((w = ws[i]) != null && w.isApparentlyUnblocked())
2786                     ++rc;
2787             }
2788         }
2789         return rc;
2790     }
2791 
2792     /**
2793      * Returns an estimate of the number of threads that are currently
2794      * stealing or executing tasks. This method may overestimate the
2795      * number of active threads.
2796      *
2797      * @return the number of active threads
2798      */
2799     public int getActiveThreadCount() {
2800         int r = (config & SMASK) + (int)(ctl >> AC_SHIFT);
2801         return (r <= 0) ? 0 : r; // suppress momentarily negative values
2802     }
2803 
2804     /**
2805      * Returns {@code true} if all worker threads are currently idle.
2806      * An idle worker is one that cannot obtain a task to execute
2807      * because none are available to steal from other threads, and
2808      * there are no pending submissions to the pool. This method is
2809      * conservative; it might not return {@code true} immediately upon
2810      * idleness of all threads, but will eventually become true if
2811      * threads remain inactive.
2812      *
2813      * @return {@code true} if all threads are currently idle
2814      */
2815     public boolean isQuiescent() {
2816         return (int)(ctl >> AC_SHIFT) + (config & SMASK) == 0;
2817     }
2818 
2819     /**
2820      * Returns an estimate of the total number of tasks stolen from
2821      * one thread's work queue by another. The reported value
2822      * underestimates the actual total number of steals when the pool
2823      * is not quiescent. This value may be useful for monitoring and
2824      * tuning fork/join programs: in general, steal counts should be
2825      * high enough to keep threads busy, but low enough to avoid
2826      * overhead and contention across threads.
2827      *
2828      * @return the number of steals
2829      */
2830     public long getStealCount() {
2831         long count = stealCount;
2832         WorkQueue[] ws; WorkQueue w;
2833         if ((ws = workQueues) != null) {
2834             for (int i = 1; i < ws.length; i += 2) {
2835                 if ((w = ws[i]) != null)
2836                     count += w.nsteals;
2837             }
2838         }
2839         return count;
2840     }
2841 
2842     /**
2843      * Returns an estimate of the total number of tasks currently held
2844      * in queues by worker threads (but not including tasks submitted
2845      * to the pool that have not begun executing). This value is only
2846      * an approximation, obtained by iterating across all threads in
2847      * the pool. This method may be useful for tuning task
2848      * granularities.
2849      *
2850      * @return the number of queued tasks
2851      */
2852     public long getQueuedTaskCount() {
2853         long count = 0;
2854         WorkQueue[] ws; WorkQueue w;
2855         if ((ws = workQueues) != null) {
2856             for (int i = 1; i < ws.length; i += 2) {
2857                 if ((w = ws[i]) != null)
2858                     count += w.queueSize();
2859             }
2860         }
2861         return count;
2862     }
2863 
2864     /**
2865      * Returns an estimate of the number of tasks submitted to this
2866      * pool that have not yet begun executing.  This method may take
2867      * time proportional to the number of submissions.
2868      *
2869      * @return the number of queued submissions
2870      */
2871     public int getQueuedSubmissionCount() {
2872         int count = 0;
2873         WorkQueue[] ws; WorkQueue w;
2874         if ((ws = workQueues) != null) {
2875             for (int i = 0; i < ws.length; i += 2) {
2876                 if ((w = ws[i]) != null)
2877                     count += w.queueSize();
2878             }
2879         }
2880         return count;
2881     }
2882 
2883     /**
2884      * Returns {@code true} if there are any tasks submitted to this
2885      * pool that have not yet begun executing.
2886      *
2887      * @return {@code true} if there are any queued submissions
2888      */
2889     public boolean hasQueuedSubmissions() {
2890         WorkQueue[] ws; WorkQueue w;
2891         if ((ws = workQueues) != null) {
2892             for (int i = 0; i < ws.length; i += 2) {
2893                 if ((w = ws[i]) != null && !w.isEmpty())
2894                     return true;
2895             }
2896         }
2897         return false;
2898     }
2899 
2900     /**
2901      * Removes and returns the next unexecuted submission if one is
2902      * available.  This method may be useful in extensions to this
2903      * class that re-assign work in systems with multiple pools.
2904      *
2905      * @return the next submission, or {@code null} if none
2906      */
2907     protected ForkJoinTask<?> pollSubmission() {
2908         WorkQueue[] ws; WorkQueue w; ForkJoinTask<?> t;
2909         if ((ws = workQueues) != null) {
2910             for (int i = 0; i < ws.length; i += 2) {
2911                 if ((w = ws[i]) != null && (t = w.poll()) != null)
2912                     return t;
2913             }
2914         }
2915         return null;
2916     }
2917 
2918     /**
2919      * Removes all available unexecuted submitted and forked tasks
2920      * from scheduling queues and adds them to the given collection,
2921      * without altering their execution status. These may include
2922      * artificially generated or wrapped tasks. This method is
2923      * designed to be invoked only when the pool is known to be
2924      * quiescent. Invocations at other times may not remove all
2925      * tasks. A failure encountered while attempting to add elements
2926      * to collection {@code c} may result in elements being in
2927      * neither, either or both collections when the associated
2928      * exception is thrown.  The behavior of this operation is
2929      * undefined if the specified collection is modified while the
2930      * operation is in progress.
2931      *
2932      * @param c the collection to transfer elements into
2933      * @return the number of elements transferred
2934      */
2935     protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
2936         int count = 0;
2937         WorkQueue[] ws; WorkQueue w; ForkJoinTask<?> t;
2938         if ((ws = workQueues) != null) {
2939             for (int i = 0; i < ws.length; ++i) {
2940                 if ((w = ws[i]) != null) {
2941                     while ((t = w.poll()) != null) {
2942                         c.add(t);
2943                         ++count;
2944                     }
2945                 }
2946             }
2947         }
2948         return count;
2949     }
2950 
2951     /**
2952      * Returns a string identifying this pool, as well as its state,
2953      * including indications of run state, parallelism level, and
2954      * worker and task counts.
2955      *
2956      * @return a string identifying this pool, as well as its state
2957      */
2958     public String toString() {
2959         // Use a single pass through workQueues to collect counts
2960         long qt = 0L, qs = 0L; int rc = 0;
2961         long st = stealCount;
2962         long c = ctl;
2963         WorkQueue[] ws; WorkQueue w;
2964         if ((ws = workQueues) != null) {
2965             for (int i = 0; i < ws.length; ++i) {
2966                 if ((w = ws[i]) != null) {
2967                     int size = w.queueSize();
2968                     if ((i & 1) == 0)
2969                         qs += size;
2970                     else {
2971                         qt += size;
2972                         st += w.nsteals;
2973                         if (w.isApparentlyUnblocked())
2974                             ++rc;
2975                     }
2976                 }
2977             }
2978         }
2979         int pc = (config & SMASK);
2980         int tc = pc + (short)(c >>> TC_SHIFT);
2981         int ac = pc + (int)(c >> AC_SHIFT);
2982         if (ac < 0) // ignore transient negative
2983             ac = 0;
2984         String level;
2985         if ((c & STOP_BIT) != 0)
2986             level = (tc == 0) ? "Terminated" : "Terminating";
2987         else
2988             level = plock < 0 ? "Shutting down" : "Running";
2989         return super.toString() +
2990             "[" + level +
2991             ", parallelism = " + pc +
2992             ", size = " + tc +
2993             ", active = " + ac +
2994             ", running = " + rc +
2995             ", steals = " + st +
2996             ", tasks = " + qt +
2997             ", submissions = " + qs +
2998             "]";
2999     }
3000 
3001     /**
3002      * Possibly initiates an orderly shutdown in which previously
3003      * submitted tasks are executed, but no new tasks will be
3004      * accepted. Invocation has no effect on execution state if this
3005      * is the {@link #commonPool()}, and no additional effect if
3006      * already shut down.  Tasks that are in the process of being
3007      * submitted concurrently during the course of this method may or
3008      * may not be rejected.
3009      *
3010      * @throws SecurityException if a security manager exists and
3011      *         the caller is not permitted to modify threads
3012      *         because it does not hold {@link
3013      *         java.lang.RuntimePermission}{@code ("modifyThread")}
3014      */
3015     public void shutdown() {
3016         checkPermission();
3017         tryTerminate(false, true);
3018     }
3019 
3020     /**
3021      * Possibly attempts to cancel and/or stop all tasks, and reject
3022      * all subsequently submitted tasks.  Invocation has no effect on
3023      * execution state if this is the {@link #commonPool()}, and no
3024      * additional effect if already shut down. Otherwise, tasks that
3025      * are in the process of being submitted or executed concurrently
3026      * during the course of this method may or may not be
3027      * rejected. This method cancels both existing and unexecuted
3028      * tasks, in order to permit termination in the presence of task
3029      * dependencies. So the method always returns an empty list
3030      * (unlike the case for some other Executors).
3031      *
3032      * @return an empty list
3033      * @throws SecurityException if a security manager exists and
3034      *         the caller is not permitted to modify threads
3035      *         because it does not hold {@link
3036      *         java.lang.RuntimePermission}{@code ("modifyThread")}
3037      */
3038     public List<Runnable> shutdownNow() {
3039         checkPermission();
3040         tryTerminate(true, true);
3041         return Collections.emptyList();
3042     }
3043 
3044     /**
3045      * Returns {@code true} if all tasks have completed following shut down.
3046      *
3047      * @return {@code true} if all tasks have completed following shut down
3048      */
3049     public boolean isTerminated() {
3050         long c = ctl;
3051         return ((c & STOP_BIT) != 0L &&
3052                 (short)(c >>> TC_SHIFT) == -(config & SMASK));
3053     }
3054 
3055     /**
3056      * Returns {@code true} if the process of termination has
3057      * commenced but not yet completed.  This method may be useful for
3058      * debugging. A return of {@code true} reported a sufficient
3059      * period after shutdown may indicate that submitted tasks have
3060      * ignored or suppressed interruption, or are waiting for I/O,
3061      * causing this executor not to properly terminate. (See the
3062      * advisory notes for class {@link ForkJoinTask} stating that
3063      * tasks should not normally entail blocking operations.  But if
3064      * they do, they must abort them on interrupt.)
3065      *
3066      * @return {@code true} if terminating but not yet terminated
3067      */
3068     public boolean isTerminating() {
3069         long c = ctl;
3070         return ((c & STOP_BIT) != 0L &&
3071                 (short)(c >>> TC_SHIFT) != -(config & SMASK));
3072     }
3073 
3074     /**
3075      * Returns {@code true} if this pool has been shut down.
3076      *
3077      * @return {@code true} if this pool has been shut down
3078      */
3079     public boolean isShutdown() {
3080         return plock < 0;
3081     }
3082 
3083     /**
3084      * Blocks until all tasks have completed execution after a
3085      * shutdown request, or the timeout occurs, or the current thread
3086      * is interrupted, whichever happens first. Because the {@link
3087      * #commonPool()} never terminates until program shutdown, when
3088      * applied to the common pool, this method is equivalent to {@link
3089      * #awaitQuiescence(long, TimeUnit)} but always returns {@code false}.
3090      *
3091      * @param timeout the maximum time to wait
3092      * @param unit the time unit of the timeout argument
3093      * @return {@code true} if this executor terminated and
3094      *         {@code false} if the timeout elapsed before termination
3095      * @throws InterruptedException if interrupted while waiting
3096      */
3097     public boolean awaitTermination(long timeout, TimeUnit unit)
3098         throws InterruptedException {
3099         if (Thread.interrupted())
3100             throw new InterruptedException();
3101         if (this == common) {
3102             awaitQuiescence(timeout, unit);
3103             return false;
3104         }
3105         long nanos = unit.toNanos(timeout);
3106         if (isTerminated())
3107             return true;
3108         long startTime = System.nanoTime();
3109         boolean terminated = false;
3110         synchronized (this) {
3111             for (long waitTime = nanos, millis = 0L;;) {
3112                 if (terminated = isTerminated() ||
3113                     waitTime <= 0L ||
3114                     (millis = unit.toMillis(waitTime)) <= 0L)
3115                     break;
3116                 wait(millis);
3117                 waitTime = nanos - (System.nanoTime() - startTime);
3118             }
3119         }
3120         return terminated;
3121     }
3122 
3123     /**
3124      * If called by a ForkJoinTask operating in this pool, equivalent
3125      * in effect to {@link ForkJoinTask#helpQuiesce}. Otherwise,
3126      * waits and/or attempts to assist performing tasks until this
3127      * pool {@link #isQuiescent} or the indicated timeout elapses.
3128      *
3129      * @param timeout the maximum time to wait
3130      * @param unit the time unit of the timeout argument
3131      * @return {@code true} if quiescent; {@code false} if the
3132      * timeout elapsed.
3133      */
3134     public boolean awaitQuiescence(long timeout, TimeUnit unit) {
3135         long nanos = unit.toNanos(timeout);
3136         ForkJoinWorkerThread wt;
3137         Thread thread = Thread.currentThread();
3138         if ((thread instanceof ForkJoinWorkerThread) &&
3139             (wt = (ForkJoinWorkerThread)thread).pool == this) {
3140             helpQuiescePool(wt.workQueue);
3141             return true;
3142         }
3143         long startTime = System.nanoTime();
3144         WorkQueue[] ws;
3145         int r = 0, m;
3146         boolean found = true;
3147         while (!isQuiescent() && (ws = workQueues) != null &&
3148                (m = ws.length - 1) >= 0) {
3149             if (!found) {
3150                 if ((System.nanoTime() - startTime) > nanos)
3151                     return false;
3152                 Thread.yield(); // cannot block
3153             }
3154             found = false;
3155             for (int j = (m + 1) << 2; j >= 0; --j) {
3156                 ForkJoinTask<?> t; WorkQueue q; int b;
3157                 if ((q = ws[r++ & m]) != null && (b = q.base) - q.top < 0) {
3158                     found = true;
3159                     if ((t = q.pollAt(b)) != null) {
3160                         if (q.base - q.top < 0)
3161                             signalWork(q);
3162                         t.doExec();
3163                     }
3164                     break;
3165                 }
3166             }
3167         }
3168         return true;
3169     }
3170 
3171     /**
3172      * Waits and/or attempts to assist performing tasks indefinitely
3173      * until the {@link #commonPool()} {@link #isQuiescent}.
3174      */
3175     static void quiesceCommonPool() {
3176         common.awaitQuiescence(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
3177     }
3178 
3179     /**
3180      * Interface for extending managed parallelism for tasks running
3181      * in {@link ForkJoinPool}s.
3182      *
3183      * <p>A {@code ManagedBlocker} provides two methods.  Method
3184      * {@code isReleasable} must return {@code true} if blocking is
3185      * not necessary. Method {@code block} blocks the current thread
3186      * if necessary (perhaps internally invoking {@code isReleasable}
3187      * before actually blocking). These actions are performed by any
3188      * thread invoking {@link ForkJoinPool#managedBlock(ManagedBlocker)}.
3189      * The unusual methods in this API accommodate synchronizers that
3190      * may, but don't usually, block for long periods. Similarly, they
3191      * allow more efficient internal handling of cases in which
3192      * additional workers may be, but usually are not, needed to
3193      * ensure sufficient parallelism.  Toward this end,
3194      * implementations of method {@code isReleasable} must be amenable
3195      * to repeated invocation.
3196      *
3197      * <p>For example, here is a ManagedBlocker based on a
3198      * ReentrantLock:
3199      *  <pre> {@code
3200      * class ManagedLocker implements ManagedBlocker {
3201      *   final ReentrantLock lock;
3202      *   boolean hasLock = false;
3203      *   ManagedLocker(ReentrantLock lock) { this.lock = lock; }
3204      *   public boolean block() {
3205      *     if (!hasLock)
3206      *       lock.lock();
3207      *     return true;
3208      *   }
3209      *   public boolean isReleasable() {
3210      *     return hasLock || (hasLock = lock.tryLock());
3211      *   }
3212      * }}</pre>
3213      *
3214      * <p>Here is a class that possibly blocks waiting for an
3215      * item on a given queue:
3216      *  <pre> {@code
3217      * class QueueTaker<E> implements ManagedBlocker {
3218      *   final BlockingQueue<E> queue;
3219      *   volatile E item = null;
3220      *   QueueTaker(BlockingQueue<E> q) { this.queue = q; }
3221      *   public boolean block() throws InterruptedException {
3222      *     if (item == null)
3223      *       item = queue.take();
3224      *     return true;
3225      *   }
3226      *   public boolean isReleasable() {
3227      *     return item != null || (item = queue.poll()) != null;
3228      *   }
3229      *   public E getItem() { // call after pool.managedBlock completes
3230      *     return item;
3231      *   }
3232      * }}</pre>
3233      */
3234     public static interface ManagedBlocker {
3235         /**
3236          * Possibly blocks the current thread, for example waiting for
3237          * a lock or condition.
3238          *
3239          * @return {@code true} if no additional blocking is necessary
3240          * (i.e., if isReleasable would return true)
3241          * @throws InterruptedException if interrupted while waiting
3242          * (the method is not required to do so, but is allowed to)
3243          */
3244         boolean block() throws InterruptedException;
3245 
3246         /**
3247          * Returns {@code true} if blocking is unnecessary.
3248          * @return {@code true} if blocking is unnecessary
3249          */
3250         boolean isReleasable();
3251     }
3252 
3253     /**
3254      * Blocks in accord with the given blocker.  If the current thread
3255      * is a {@link ForkJoinWorkerThread}, this method possibly
3256      * arranges for a spare thread to be activated if necessary to
3257      * ensure sufficient parallelism while the current thread is blocked.
3258      *
3259      * <p>If the caller is not a {@link ForkJoinTask}, this method is
3260      * behaviorally equivalent to
3261      *  <pre> {@code
3262      * while (!blocker.isReleasable())
3263      *   if (blocker.block())
3264      *     return;
3265      * }</pre>
3266      *
3267      * If the caller is a {@code ForkJoinTask}, then the pool may
3268      * first be expanded to ensure parallelism, and later adjusted.
3269      *
3270      * @param blocker the blocker
3271      * @throws InterruptedException if blocker.block did so
3272      */
3273     public static void managedBlock(ManagedBlocker blocker)
3274         throws InterruptedException {
3275         Thread t = Thread.currentThread();
3276         if (t instanceof ForkJoinWorkerThread) {
3277             ForkJoinPool p = ((ForkJoinWorkerThread)t).pool;
3278             while (!blocker.isReleasable()) { // variant of helpSignal
3279                 WorkQueue[] ws; WorkQueue q; int m, u;
3280                 if ((ws = p.workQueues) != null && (m = ws.length - 1) >= 0) {
3281                     for (int i = 0; i <= m; ++i) {
3282                         if (blocker.isReleasable())
3283                             return;
3284                         if ((q = ws[i]) != null && q.base - q.top < 0) {
3285                             p.signalWork(q);
3286                             if ((u = (int)(p.ctl >>> 32)) >= 0 ||
3287                                 (u >> UAC_SHIFT) >= 0)
3288                                 break;
3289                         }
3290                     }
3291                 }
3292                 if (p.tryCompensate()) {
3293                     try {
3294                         do {} while (!blocker.isReleasable() &&
3295                                      !blocker.block());
3296                     } finally {
3297                         p.incrementActiveCount();
3298                     }
3299                     break;
3300                 }
3301             }
3302         }
3303         else {
3304             do {} while (!blocker.isReleasable() &&
3305                          !blocker.block());
3306         }
3307     }
3308 
3309     // AbstractExecutorService overrides.  These rely on undocumented
3310     // fact that ForkJoinTask.adapt returns ForkJoinTasks that also
3311     // implement RunnableFuture.
3312 
3313     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
3314         return new ForkJoinTask.AdaptedRunnable<T>(runnable, value);
3315     }
3316 
3317     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
3318         return new ForkJoinTask.AdaptedCallable<T>(callable);
3319     }
3320 
3321     // Unsafe mechanics
3322     private static final sun.misc.Unsafe U;
3323     private static final long CTL;
3324     private static final long PARKBLOCKER;
3325     private static final int ABASE;
3326     private static final int ASHIFT;
3327     private static final long STEALCOUNT;
3328     private static final long PLOCK;
3329     private static final long INDEXSEED;
3330     private static final long QLOCK;
3331 
3332     static {
3333         // initialize field offsets for CAS etc
3334         try {
3335             U = sun.misc.Unsafe.getUnsafe();
3336             Class<?> k = ForkJoinPool.class;
3337             CTL = U.objectFieldOffset
3338                 (k.getDeclaredField("ctl"));
3339             STEALCOUNT = U.objectFieldOffset
3340                 (k.getDeclaredField("stealCount"));
3341             PLOCK = U.objectFieldOffset
3342                 (k.getDeclaredField("plock"));
3343             INDEXSEED = U.objectFieldOffset
3344                 (k.getDeclaredField("indexSeed"));
3345             Class<?> tk = Thread.class;
3346             PARKBLOCKER = U.objectFieldOffset
3347                 (tk.getDeclaredField("parkBlocker"));
3348             Class<?> wk = WorkQueue.class;
3349             QLOCK = U.objectFieldOffset
3350                 (wk.getDeclaredField("qlock"));
3351             Class<?> ak = ForkJoinTask[].class;
3352             ABASE = U.arrayBaseOffset(ak);
3353             int scale = U.arrayIndexScale(ak);
3354             if ((scale & (scale - 1)) != 0)
3355                 throw new Error("data type scale not a power of two");
3356             ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
3357         } catch (Exception e) {
3358             throw new Error(e);
3359         }


3360 
3361         defaultForkJoinWorkerThreadFactory =

3362             new DefaultForkJoinWorkerThreadFactory();
3363         modifyThreadPermission = new RuntimePermission("modifyThread");
3364 
3365         common = java.security.AccessController.doPrivileged
3366             (new java.security.PrivilegedAction<ForkJoinPool>() {
3367                 public ForkJoinPool run() { return makeCommonPool(); }});
3368         int par = common.config; // report 1 even if threads disabled
3369         commonParallelism = par > 0 ? par : 1;
3370     }
3371 
3372     /**
3373      * Creates and returns the common pool, respecting user settings
3374      * specified via system properties.
3375      */
3376     private static ForkJoinPool makeCommonPool() {
3377         int parallelism = -1;
3378         ForkJoinWorkerThreadFactory factory
3379             = defaultForkJoinWorkerThreadFactory;
3380         UncaughtExceptionHandler handler = null;
3381         try {  // ignore exceptions in accesing/parsing properties
3382             String pp = System.getProperty
3383                 ("java.util.concurrent.ForkJoinPool.common.parallelism");


3384             String fp = System.getProperty
3385                 ("java.util.concurrent.ForkJoinPool.common.threadFactory");
3386             String hp = System.getProperty
3387                 ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
3388             if (pp != null)
3389                 parallelism = Integer.parseInt(pp);
3390             if (fp != null)
3391                 factory = ((ForkJoinWorkerThreadFactory)ClassLoader.
3392                            getSystemClassLoader().loadClass(fp).newInstance());
3393             if (hp != null)
3394                 handler = ((UncaughtExceptionHandler)ClassLoader.
3395                            getSystemClassLoader().loadClass(hp).newInstance());


3396         } catch (Exception ignore) {
3397         }
3398 
3399         if (parallelism < 0)
3400             parallelism = Runtime.getRuntime().availableProcessors();
3401         if (parallelism > MAX_CAP)
3402             parallelism = MAX_CAP;
3403         return new ForkJoinPool(parallelism, factory, handler, false,
3404                                 "ForkJoinPool.commonPool-worker-");



3405     }
3406 
3407 }
--- EOF ---