1 /*
   2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   3  *
   4  * This code is free software; you can redistribute it and/or modify it
   5  * under the terms of the GNU General Public License version 2 only, as
   6  * published by the Free Software Foundation.  Oracle designates this
   7  * particular file as subject to the "Classpath" exception as provided
   8  * by Oracle in the LICENSE file that accompanied this code.
   9  *
  10  * This code is distributed in the hope that it will be useful, but WITHOUT
  11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  13  * version 2 for more details (a copy is included in the LICENSE file that
  14  * accompanied this code).
  15  *
  16  * You should have received a copy of the GNU General Public License version
  17  * 2 along with this work; if not, write to the Free Software Foundation,
  18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  19  *
  20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  21  * or visit www.oracle.com if you need additional information or have any
  22  * questions.
  23  */
  24 
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/publicdomain/zero/1.0/
  34  */
  35 
  36 package java.util.concurrent;
  37 
  38 import java.util.ArrayList;
  39 import java.util.Arrays;
  40 import java.util.Collection;
  41 import java.util.Collections;
  42 import java.util.List;
  43 import java.util.Random;
  44 import java.util.concurrent.AbstractExecutorService;
  45 import java.util.concurrent.Callable;
  46 import java.util.concurrent.ExecutorService;
  47 import java.util.concurrent.Future;
  48 import java.util.concurrent.RejectedExecutionException;
  49 import java.util.concurrent.RunnableFuture;
  50 import java.util.concurrent.TimeUnit;
  51 import java.util.concurrent.TimeoutException;
  52 import java.util.concurrent.atomic.AtomicInteger;
  53 import java.util.concurrent.locks.LockSupport;
  54 import java.util.concurrent.locks.ReentrantLock;
  55 import java.util.concurrent.locks.Condition;
  56 
  57 /**
  58  * An {@link ExecutorService} for running {@link ForkJoinTask}s.
  59  * A {@code ForkJoinPool} provides the entry point for submissions
  60  * from non-{@code ForkJoinTask} clients, as well as management and
  61  * monitoring operations.
  62  *
  63  * <p>A {@code ForkJoinPool} differs from other kinds of {@link
  64  * ExecutorService} mainly by virtue of employing
  65  * <em>work-stealing</em>: all threads in the pool attempt to find and
  66  * execute subtasks created by other active tasks (eventually blocking
  67  * waiting for work if none exist). This enables efficient processing
  68  * when most tasks spawn other subtasks (as do most {@code
  69  * ForkJoinTask}s). When setting <em>asyncMode</em> to true in
  70  * constructors, {@code ForkJoinPool}s may also be appropriate for use
  71  * with event-style tasks that are never joined.
  72  *
  73  * <p>A {@code ForkJoinPool} is constructed with a given target
  74  * parallelism level; by default, equal to the number of available
  75  * processors. The pool attempts to maintain enough active (or
  76  * available) threads by dynamically adding, suspending, or resuming
  77  * internal worker threads, even if some tasks are stalled waiting to
  78  * join others. However, no such adjustments are guaranteed in the
  79  * face of blocked IO or other unmanaged synchronization. The nested
  80  * {@link ManagedBlocker} interface enables extension of the kinds of
  81  * synchronization accommodated.
  82  *
  83  * <p>In addition to execution and lifecycle control methods, this
  84  * class provides status check methods (for example
  85  * {@link #getStealCount}) that are intended to aid in developing,
  86  * tuning, and monitoring fork/join applications. Also, method
  87  * {@link #toString} returns indications of pool state in a
  88  * convenient form for informal monitoring.
  89  *
  90  * <p> As is the case with other ExecutorServices, there are three
  91  * main task execution methods summarized in the following
  92  * table. These are designed to be used by clients not already engaged
  93  * in fork/join computations in the current pool.  The main forms of
  94  * these methods accept instances of {@code ForkJoinTask}, but
  95  * overloaded forms also allow mixed execution of plain {@code
  96  * Runnable}- or {@code Callable}- based activities as well.  However,
  97  * tasks that are already executing in a pool should normally
  98  * <em>NOT</em> use these pool execution methods, but instead use the
  99  * within-computation forms listed in the table.
 100  *
 101  * <table BORDER CELLPADDING=3 CELLSPACING=1>
 102  *  <tr>
 103  *    <td></td>
 104  *    <td ALIGN=CENTER> <b>Call from non-fork/join clients</b></td>
 105  *    <td ALIGN=CENTER> <b>Call from within fork/join computations</b></td>
 106  *  </tr>
 107  *  <tr>
 108  *    <td> <b>Arrange async execution</td>
 109  *    <td> {@link #execute(ForkJoinTask)}</td>
 110  *    <td> {@link ForkJoinTask#fork}</td>
 111  *  </tr>
 112  *  <tr>
 113  *    <td> <b>Await and obtain result</td>
 114  *    <td> {@link #invoke(ForkJoinTask)}</td>
 115  *    <td> {@link ForkJoinTask#invoke}</td>
 116  *  </tr>
 117  *  <tr>
 118  *    <td> <b>Arrange exec and obtain Future</td>
 119  *    <td> {@link #submit(ForkJoinTask)}</td>
 120  *    <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td>
 121  *  </tr>
 122  * </table>
 123  *
 124  * <p><b>Sample Usage.</b> Normally a single {@code ForkJoinPool} is
 125  * used for all parallel task execution in a program or subsystem.
 126  * Otherwise, use would not usually outweigh the construction and
 127  * bookkeeping overhead of creating a large set of threads. For
 128  * example, a common pool could be used for the {@code SortTasks}
 129  * illustrated in {@link RecursiveAction}. Because {@code
 130  * ForkJoinPool} uses threads in {@linkplain java.lang.Thread#isDaemon
 131  * daemon} mode, there is typically no need to explicitly {@link
 132  * #shutdown} such a pool upon program exit.
 133  *
 134  * <pre>
 135  * static final ForkJoinPool mainPool = new ForkJoinPool();
 136  * ...
 137  * public void sort(long[] array) {
 138  *   mainPool.invoke(new SortTask(array, 0, array.length));
 139  * }
 140  * </pre>
 141  *
 142  * <p><b>Implementation notes</b>: This implementation restricts the
 143  * maximum number of running threads to 32767. Attempts to create
 144  * pools with greater than the maximum number result in
 145  * {@code IllegalArgumentException}.
 146  *
 147  * <p>This implementation rejects submitted tasks (that is, by throwing
 148  * {@link RejectedExecutionException}) only when the pool is shut down
 149  * or internal resources have been exhausted.
 150  *
 151  * @since 1.7
 152  * @author Doug Lea
 153  */
 154 public class ForkJoinPool extends AbstractExecutorService {
 155 
 156     /*
 157      * Implementation Overview
 158      *
 159      * This class provides the central bookkeeping and control for a
 160      * set of worker threads: Submissions from non-FJ threads enter
 161      * into a submission queue. Workers take these tasks and typically
 162      * split them into subtasks that may be stolen by other workers.
 163      * Preference rules give first priority to processing tasks from
 164      * their own queues (LIFO or FIFO, depending on mode), then to
 165      * randomized FIFO steals of tasks in other worker queues, and
 166      * lastly to new submissions.
 167      *
 168      * The main throughput advantages of work-stealing stem from
 169      * decentralized control -- workers mostly take tasks from
 170      * themselves or each other. We cannot negate this in the
 171      * implementation of other management responsibilities. The main
 172      * tactic for avoiding bottlenecks is packing nearly all
 173      * essentially atomic control state into a single 64bit volatile
 174      * variable ("ctl"). This variable is read on the order of 10-100
 175      * times as often as it is modified (always via CAS). (There is
 176      * some additional control state, for example variable "shutdown"
 177      * for which we can cope with uncoordinated updates.)  This
 178      * streamlines synchronization and control at the expense of messy
 179      * constructions needed to repack status bits upon updates.
 180      * Updates tend not to contend with each other except during
 181      * bursts while submitted tasks begin or end.  In some cases when
 182      * they do contend, threads can instead do something else
 183      * (usually, scan for tasks) until contention subsides.
 184      *
 185      * To enable packing, we restrict maximum parallelism to (1<<15)-1
 186      * (which is far in excess of normal operating range) to allow
 187      * ids, counts, and their negations (used for thresholding) to fit
 188      * into 16bit fields.
 189      *
 190      * Recording Workers.  Workers are recorded in the "workers" array
 191      * that is created upon pool construction and expanded if (rarely)
 192      * necessary.  This is an array as opposed to some other data
 193      * structure to support index-based random steals by workers.
 194      * Updates to the array recording new workers and unrecording
 195      * terminated ones are protected from each other by a seqLock
 196      * (scanGuard) but the array is otherwise concurrently readable,
 197      * and accessed directly by workers. To simplify index-based
 198      * operations, the array size is always a power of two, and all
 199      * readers must tolerate null slots. To avoid flailing during
 200      * start-up, the array is presized to hold twice #parallelism
 201      * workers (which is unlikely to need further resizing during
 202      * execution). But to avoid dealing with so many null slots,
 203      * variable scanGuard includes a mask for the nearest power of two
 204      * that contains all current workers.  All worker thread creation
 205      * is on-demand, triggered by task submissions, replacement of
 206      * terminated workers, and/or compensation for blocked
 207      * workers. However, all other support code is set up to work with
 208      * other policies.  To ensure that we do not hold on to worker
 209      * references that would prevent GC, ALL accesses to workers are
 210      * via indices into the workers array (which is one source of some
 211      * of the messy code constructions here). In essence, the workers
 212      * array serves as a weak reference mechanism. Thus for example
 213      * the wait queue field of ctl stores worker indices, not worker
 214      * references.  Access to the workers in associated methods (for
 215      * example signalWork) must both index-check and null-check the
 216      * IDs. All such accesses ignore bad IDs by returning out early
 217      * from what they are doing, since this can only be associated
 218      * with termination, in which case it is OK to give up.
 219      *
 220      * All uses of the workers array, as well as queue arrays, check
 221      * that the array is non-null (even if previously non-null). This
 222      * allows nulling during termination, which is currently not
 223      * necessary, but remains an option for resource-revocation-based
 224      * shutdown schemes.
 225      *
 226      * Wait Queuing. Unlike HPC work-stealing frameworks, we cannot
 227      * let workers spin indefinitely scanning for tasks when none can
 228      * be found immediately, and we cannot start/resume workers unless
 229      * there appear to be tasks available.  On the other hand, we must
 230      * quickly prod them into action when new tasks are submitted or
 231      * generated.  We park/unpark workers after placing in an event
 232      * wait queue when they cannot find work. This "queue" is actually
 233      * a simple Treiber stack, headed by the "id" field of ctl, plus a
 234      * 15bit counter value to both wake up waiters (by advancing their
 235      * count) and avoid ABA effects. Successors are held in worker
 236      * field "nextWait".  Queuing deals with several intrinsic races,
 237      * mainly that a task-producing thread can miss seeing (and
 238      * signalling) another thread that gave up looking for work but
 239      * has not yet entered the wait queue. We solve this by requiring
 240      * a full sweep of all workers both before (in scan()) and after
 241      * (in tryAwaitWork()) a newly waiting worker is added to the wait
 242      * queue. During a rescan, the worker might release some other
 243      * queued worker rather than itself, which has the same net
 244      * effect. Because enqueued workers may actually be rescanning
 245      * rather than waiting, we set and clear the "parked" field of
 246      * ForkJoinWorkerThread to reduce unnecessary calls to unpark.
 247      * (Use of the parked field requires a secondary recheck to avoid
 248      * missed signals.)
 249      *
 250      * Signalling.  We create or wake up workers only when there
 251      * appears to be at least one task they might be able to find and
 252      * execute.  When a submission is added or another worker adds a
 253      * task to a queue that previously had two or fewer tasks, they
 254      * signal waiting workers (or trigger creation of new ones if
 255      * fewer than the given parallelism level -- see signalWork).
 256      * These primary signals are buttressed by signals during rescans
 257      * as well as those performed when a worker steals a task and
 258      * notices that there are more tasks too; together these cover the
 259      * signals needed in cases when more than two tasks are pushed
 260      * but untaken.
 261      *
 262      * Trimming workers. To release resources after periods of lack of
 263      * use, a worker starting to wait when the pool is quiescent will
 264      * time out and terminate if the pool has remained quiescent for
 265      * SHRINK_RATE nanosecs. This will slowly propagate, eventually
 266      * terminating all workers after long periods of non-use.
 267      *
 268      * Submissions. External submissions are maintained in an
 269      * array-based queue that is structured identically to
 270      * ForkJoinWorkerThread queues except for the use of
 271      * submissionLock in method addSubmission. Unlike the case for
 272      * worker queues, multiple external threads can add new
 273      * submissions, so adding requires a lock.
 274      *
 275      * Compensation. Beyond work-stealing support and lifecycle
 276      * control, the main responsibility of this framework is to take
 277      * actions when one worker is waiting to join a task stolen (or
 278      * always held by) another.  Because we are multiplexing many
 279      * tasks on to a pool of workers, we can't just let them block (as
 280      * in Thread.join).  We also cannot just reassign the joiner's
 281      * run-time stack with another and replace it later, which would
 282      * be a form of "continuation", that even if possible is not
 283      * necessarily a good idea since we sometimes need both an
 284      * unblocked task and its continuation to progress. Instead we
 285      * combine two tactics:
 286      *
 287      *   Helping: Arranging for the joiner to execute some task that it
 288      *      would be running if the steal had not occurred.  Method
 289      *      ForkJoinWorkerThread.joinTask tracks joining->stealing
 290      *      links to try to find such a task.
 291      *
 292      *   Compensating: Unless there are already enough live threads,
 293      *      method tryPreBlock() may create or re-activate a spare
 294      *      thread to compensate for blocked joiners until they
 295      *      unblock.
 296      *
 297      * The ManagedBlocker extension API can't use helping so relies
 298      * only on compensation in method awaitBlocker.
 299      *
 300      * It is impossible to keep exactly the target parallelism number
 301      * of threads running at any given time.  Determining the
 302      * existence of conservatively safe helping targets, the
 303      * availability of already-created spares, and the apparent need
 304      * to create new spares are all racy and require heuristic
 305      * guidance, so we rely on multiple retries of each.  Currently,
 306      * in keeping with on-demand signalling policy, we compensate only
 307      * if blocking would leave less than one active (non-waiting,
 308      * non-blocked) worker. Additionally, to avoid some false alarms
 309      * due to GC, lagging counters, system activity, etc, compensated
 310      * blocking for joins is only attempted after rechecks stabilize
 311      * (retries are interspersed with Thread.yield, for good
 312      * citizenship).  The variable blockedCount, incremented before
 313      * blocking and decremented after, is sometimes needed to
 314      * distinguish cases of waiting for work vs blocking on joins or
 315      * other managed sync. Both cases are equivalent for most pool
 316      * control, so we can update non-atomically. (Additionally,
 317      * contention on blockedCount alleviates some contention on ctl).
 318      *
 319      * Shutdown and Termination. A call to shutdownNow atomically sets
 320      * the ctl stop bit and then (non-atomically) sets each workers
 321      * "terminate" status, cancels all unprocessed tasks, and wakes up
 322      * all waiting workers.  Detecting whether termination should
 323      * commence after a non-abrupt shutdown() call requires more work
 324      * and bookkeeping. We need consensus about quiesence (i.e., that
 325      * there is no more work) which is reflected in active counts so
 326      * long as there are no current blockers, as well as possible
 327      * re-evaluations during independent changes in blocking or
 328      * quiescing workers.
 329      *
 330      * Style notes: There is a lot of representation-level coupling
 331      * among classes ForkJoinPool, ForkJoinWorkerThread, and
 332      * ForkJoinTask.  Most fields of ForkJoinWorkerThread maintain
 333      * data structures managed by ForkJoinPool, so are directly
 334      * accessed.  Conversely we allow access to "workers" array by
 335      * workers, and direct access to ForkJoinTask.status by both
 336      * ForkJoinPool and ForkJoinWorkerThread.  There is little point
 337      * trying to reduce this, since any associated future changes in
 338      * representations will need to be accompanied by algorithmic
 339      * changes anyway. All together, these low-level implementation
 340      * choices produce as much as a factor of 4 performance
 341      * improvement compared to naive implementations, and enable the
 342      * processing of billions of tasks per second, at the expense of
 343      * some ugliness.
 344      *
 345      * Methods signalWork() and scan() are the main bottlenecks so are
 346      * especially heavily micro-optimized/mangled.  There are lots of
 347      * inline assignments (of form "while ((local = field) != 0)")
 348      * which are usually the simplest way to ensure the required read
 349      * orderings (which are sometimes critical). This leads to a
 350      * "C"-like style of listing declarations of these locals at the
 351      * heads of methods or blocks.  There are several occurrences of
 352      * the unusual "do {} while (!cas...)"  which is the simplest way
 353      * to force an update of a CAS'ed variable. There are also other
 354      * coding oddities that help some methods perform reasonably even
 355      * when interpreted (not compiled).
 356      *
 357      * The order of declarations in this file is: (1) declarations of
 358      * statics (2) fields (along with constants used when unpacking
 359      * some of them), listed in an order that tends to reduce
 360      * contention among them a bit under most JVMs.  (3) internal
 361      * control methods (4) callbacks and other support for
 362      * ForkJoinTask and ForkJoinWorkerThread classes, (5) exported
 363      * methods (plus a few little helpers). (6) static block
 364      * initializing all statics in a minimally dependent order.
 365      */
 366 
 367     /**
 368      * Factory for creating new {@link ForkJoinWorkerThread}s.
 369      * A {@code ForkJoinWorkerThreadFactory} must be defined and used
 370      * for {@code ForkJoinWorkerThread} subclasses that extend base
 371      * functionality or initialize threads with different contexts.
 372      */
 373     public static interface ForkJoinWorkerThreadFactory {
 374         /**
 375          * Returns a new worker thread operating in the given pool.
 376          *
 377          * @param pool the pool this thread works in
 378          * @throws NullPointerException if the pool is null
 379          */
 380         public ForkJoinWorkerThread newThread(ForkJoinPool pool);
 381     }
 382 
 383     /**
 384      * Default ForkJoinWorkerThreadFactory implementation; creates a
 385      * new ForkJoinWorkerThread.
 386      */
 387     static class DefaultForkJoinWorkerThreadFactory
 388         implements ForkJoinWorkerThreadFactory {
 389         public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
 390             return new ForkJoinWorkerThread(pool);
 391         }
 392     }
 393 
 394     /**
 395      * Creates a new ForkJoinWorkerThread. This factory is used unless
 396      * overridden in ForkJoinPool constructors.
 397      */
 398     public static final ForkJoinWorkerThreadFactory
 399         defaultForkJoinWorkerThreadFactory;
 400 
 401     /**
 402      * Permission required for callers of methods that may start or
 403      * kill threads.
 404      */
 405     private static final RuntimePermission modifyThreadPermission;
 406 
 407     /**
 408      * If there is a security manager, makes sure caller has
 409      * permission to modify threads.
 410      */
 411     private static void checkPermission() {
 412         SecurityManager security = System.getSecurityManager();
 413         if (security != null)
 414             security.checkPermission(modifyThreadPermission);
 415     }
 416 
 417     /**
 418      * Generator for assigning sequence numbers as pool names.
 419      */
 420     private static final AtomicInteger poolNumberGenerator;
 421 
 422     /**
 423      * Generator for initial random seeds for worker victim
 424      * selection. This is used only to create initial seeds. Random
 425      * steals use a cheaper xorshift generator per steal attempt. We
 426      * don't expect much contention on seedGenerator, so just use a
 427      * plain Random.
 428      */
 429     static final Random workerSeedGenerator;
 430 
 431     /**
 432      * Array holding all worker threads in the pool.  Initialized upon
 433      * construction. Array size must be a power of two.  Updates and
 434      * replacements are protected by scanGuard, but the array is
 435      * always kept in a consistent enough state to be randomly
 436      * accessed without locking by workers performing work-stealing,
 437      * as well as other traversal-based methods in this class, so long
 438      * as reads memory-acquire by first reading ctl. All readers must
 439      * tolerate that some array slots may be null.
 440      */
 441     ForkJoinWorkerThread[] workers;
 442 
 443     /**
 444      * Initial size for submission queue array. Must be a power of
 445      * two.  In many applications, these always stay small so we use a
 446      * small initial cap.
 447      */
 448     private static final int INITIAL_QUEUE_CAPACITY = 8;
 449 
 450     /**
 451      * Maximum size for submission queue array. Must be a power of two
 452      * less than or equal to 1 << (31 - width of array entry) to
 453      * ensure lack of index wraparound, but is capped at a lower
 454      * value to help users trap runaway computations.
 455      */
 456     private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 24; // 16M
 457 
 458     /**
 459      * Array serving as submission queue. Initialized upon construction.
 460      */
 461     private ForkJoinTask<?>[] submissionQueue;
 462 
 463     /**
 464      * Lock protecting submissions array for addSubmission
 465      */
 466     private final ReentrantLock submissionLock;
 467 
 468     /**
 469      * Condition for awaitTermination, using submissionLock for
 470      * convenience.
 471      */
 472     private final Condition termination;
 473 
 474     /**
 475      * Creation factory for worker threads.
 476      */
 477     private final ForkJoinWorkerThreadFactory factory;
 478 
 479     /**
 480      * The uncaught exception handler used when any worker abruptly
 481      * terminates.
 482      */
 483     final Thread.UncaughtExceptionHandler ueh;
 484 
 485     /**
 486      * Prefix for assigning names to worker threads
 487      */
 488     private final String workerNamePrefix;
 489 
 490     /**
 491      * Sum of per-thread steal counts, updated only when threads are
 492      * idle or terminating.
 493      */
 494     private volatile long stealCount;
 495 
 496     /**
 497      * Main pool control -- a long packed with:
 498      * AC: Number of active running workers minus target parallelism (16 bits)
 499      * TC: Number of total workers minus target parallelism (16bits)
 500      * ST: true if pool is terminating (1 bit)
 501      * EC: the wait count of top waiting thread (15 bits)
 502      * ID: ~poolIndex of top of Treiber stack of waiting threads (16 bits)
 503      *
 504      * When convenient, we can extract the upper 32 bits of counts and
 505      * the lower 32 bits of queue state, u = (int)(ctl >>> 32) and e =
 506      * (int)ctl.  The ec field is never accessed alone, but always
 507      * together with id and st. The offsets of counts by the target
 508      * parallelism and the positionings of fields makes it possible to
 509      * perform the most common checks via sign tests of fields: When
 510      * ac is negative, there are not enough active workers, when tc is
 511      * negative, there are not enough total workers, when id is
 512      * negative, there is at least one waiting worker, and when e is
 513      * negative, the pool is terminating.  To deal with these possibly
 514      * negative fields, we use casts in and out of "short" and/or
 515      * signed shifts to maintain signedness.
 516      */
 517     volatile long ctl;
 518 
 519     // bit positions/shifts for fields
 520     private static final int  AC_SHIFT   = 48;
 521     private static final int  TC_SHIFT   = 32;
 522     private static final int  ST_SHIFT   = 31;
 523     private static final int  EC_SHIFT   = 16;
 524 
 525     // bounds
 526     private static final int  MAX_ID     = 0x7fff;  // max poolIndex
 527     private static final int  SMASK      = 0xffff;  // mask short bits
 528     private static final int  SHORT_SIGN = 1 << 15;
 529     private static final int  INT_SIGN   = 1 << 31;
 530 
 531     // masks
 532     private static final long STOP_BIT   = 0x0001L << ST_SHIFT;
 533     private static final long AC_MASK    = ((long)SMASK) << AC_SHIFT;
 534     private static final long TC_MASK    = ((long)SMASK) << TC_SHIFT;
 535 
 536     // units for incrementing and decrementing
 537     private static final long TC_UNIT    = 1L << TC_SHIFT;
 538     private static final long AC_UNIT    = 1L << AC_SHIFT;
 539 
 540     // masks and units for dealing with u = (int)(ctl >>> 32)
 541     private static final int  UAC_SHIFT  = AC_SHIFT - 32;
 542     private static final int  UTC_SHIFT  = TC_SHIFT - 32;
 543     private static final int  UAC_MASK   = SMASK << UAC_SHIFT;
 544     private static final int  UTC_MASK   = SMASK << UTC_SHIFT;
 545     private static final int  UAC_UNIT   = 1 << UAC_SHIFT;
 546     private static final int  UTC_UNIT   = 1 << UTC_SHIFT;
 547 
 548     // masks and units for dealing with e = (int)ctl
 549     private static final int  E_MASK     = 0x7fffffff; // no STOP_BIT
 550     private static final int  EC_UNIT    = 1 << EC_SHIFT;
 551 
 552     /**
 553      * The target parallelism level.
 554      */
 555     final int parallelism;
 556 
 557     /**
 558      * Index (mod submission queue length) of next element to take
 559      * from submission queue. Usage is identical to that for
 560      * per-worker queues -- see ForkJoinWorkerThread internal
 561      * documentation.
 562      */
 563     volatile int queueBase;
 564 
 565     /**
 566      * Index (mod submission queue length) of next element to add
 567      * in submission queue. Usage is identical to that for
 568      * per-worker queues -- see ForkJoinWorkerThread internal
 569      * documentation.
 570      */
 571     int queueTop;
 572 
 573     /**
 574      * True when shutdown() has been called.
 575      */
 576     volatile boolean shutdown;
 577 
 578     /**
 579      * True if use local fifo, not default lifo, for local polling
 580      * Read by, and replicated by ForkJoinWorkerThreads
 581      */
 582     final boolean locallyFifo;
 583 
 584     /**
 585      * The number of threads in ForkJoinWorkerThreads.helpQuiescePool.
 586      * When non-zero, suppresses automatic shutdown when active
 587      * counts become zero.
 588      */
 589     volatile int quiescerCount;
 590 
 591     /**
 592      * The number of threads blocked in join.
 593      */
 594     volatile int blockedCount;
 595 
 596     /**
 597      * Counter for worker Thread names (unrelated to their poolIndex)
 598      */
 599     private volatile int nextWorkerNumber;
 600 
 601     /**
 602      * The index for the next created worker. Accessed under scanGuard.
 603      */
 604     private int nextWorkerIndex;
 605 
 606     /**
 607      * SeqLock and index masking for updates to workers array.  Locked
 608      * when SG_UNIT is set. Unlocking clears bit by adding
 609      * SG_UNIT. Staleness of read-only operations can be checked by
 610      * comparing scanGuard to value before the reads. The low 16 bits
 611      * (i.e, anding with SMASK) hold (the smallest power of two
 612      * covering all worker indices, minus one, and is used to avoid
 613      * dealing with large numbers of null slots when the workers array
 614      * is overallocated.
 615      */
 616     volatile int scanGuard;
 617 
 618     private static final int SG_UNIT = 1 << 16;
 619 
 620     /**
 621      * The wakeup interval (in nanoseconds) for a worker waiting for a
 622      * task when the pool is quiescent to instead try to shrink the
 623      * number of workers.  The exact value does not matter too
 624      * much. It must be short enough to release resources during
 625      * sustained periods of idleness, but not so short that threads
 626      * are continually re-created.
 627      */
 628     private static final long SHRINK_RATE =
 629         4L * 1000L * 1000L * 1000L; // 4 seconds
 630 
 631     /**
 632      * Top-level loop for worker threads: On each step: if the
 633      * previous step swept through all queues and found no tasks, or
 634      * there are excess threads, then possibly blocks. Otherwise,
 635      * scans for and, if found, executes a task. Returns when pool
 636      * and/or worker terminate.
 637      *
 638      * @param w the worker
 639      */
 640     final void work(ForkJoinWorkerThread w) {
 641         boolean swept = false;                // true on empty scans
 642         long c;
 643         while (!w.terminate && (int)(c = ctl) >= 0) {
 644             int a;                            // active count
 645             if (!swept && (a = (int)(c >> AC_SHIFT)) <= 0)
 646                 swept = scan(w, a);
 647             else if (tryAwaitWork(w, c))
 648                 swept = false;
 649         }
 650     }
 651 
 652     // Signalling
 653 
 654     /**
 655      * Wakes up or creates a worker.
 656      */
 657     final void signalWork() {
 658         /*
 659          * The while condition is true if: (there is are too few total
 660          * workers OR there is at least one waiter) AND (there are too
 661          * few active workers OR the pool is terminating).  The value
 662          * of e distinguishes the remaining cases: zero (no waiters)
 663          * for create, negative if terminating (in which case do
 664          * nothing), else release a waiter. The secondary checks for
 665          * release (non-null array etc) can fail if the pool begins
 666          * terminating after the test, and don't impose any added cost
 667          * because JVMs must perform null and bounds checks anyway.
 668          */
 669         long c; int e, u;
 670         while ((((e = (int)(c = ctl)) | (u = (int)(c >>> 32))) &
 671                 (INT_SIGN|SHORT_SIGN)) == (INT_SIGN|SHORT_SIGN) && e >= 0) {
 672             if (e > 0) {                         // release a waiting worker
 673                 int i; ForkJoinWorkerThread w; ForkJoinWorkerThread[] ws;
 674                 if ((ws = workers) == null ||
 675                     (i = ~e & SMASK) >= ws.length ||
 676                     (w = ws[i]) == null)
 677                     break;
 678                 long nc = (((long)(w.nextWait & E_MASK)) |
 679                            ((long)(u + UAC_UNIT) << 32));
 680                 if (w.eventCount == e &&
 681                     UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) {
 682                     w.eventCount = (e + EC_UNIT) & E_MASK;
 683                     if (w.parked)
 684                         UNSAFE.unpark(w);
 685                     break;
 686                 }
 687             }
 688             else if (UNSAFE.compareAndSwapLong
 689                      (this, ctlOffset, c,
 690                       (long)(((u + UTC_UNIT) & UTC_MASK) |
 691                              ((u + UAC_UNIT) & UAC_MASK)) << 32)) {
 692                 addWorker();
 693                 break;
 694             }
 695         }
 696     }
 697 
 698     /**
 699      * Variant of signalWork to help release waiters on rescans.
 700      * Tries once to release a waiter if active count < 0.
 701      *
 702      * @return false if failed due to contention, else true
 703      */
 704     private boolean tryReleaseWaiter() {
 705         long c; int e, i; ForkJoinWorkerThread w; ForkJoinWorkerThread[] ws;
 706         if ((e = (int)(c = ctl)) > 0 &&
 707             (int)(c >> AC_SHIFT) < 0 &&
 708             (ws = workers) != null &&
 709             (i = ~e & SMASK) < ws.length &&
 710             (w = ws[i]) != null) {
 711             long nc = ((long)(w.nextWait & E_MASK) |
 712                        ((c + AC_UNIT) & (AC_MASK|TC_MASK)));
 713             if (w.eventCount != e ||
 714                 !UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc))
 715                 return false;
 716             w.eventCount = (e + EC_UNIT) & E_MASK;
 717             if (w.parked)
 718                 UNSAFE.unpark(w);
 719         }
 720         return true;
 721     }
 722 
 723     // Scanning for tasks
 724 
 725     /**
 726      * Scans for and, if found, executes one task. Scans start at a
 727      * random index of workers array, and randomly select the first
 728      * (2*#workers)-1 probes, and then, if all empty, resort to 2
 729      * circular sweeps, which is necessary to check quiescence. and
 730      * taking a submission only if no stealable tasks were found.  The
 731      * steal code inside the loop is a specialized form of
 732      * ForkJoinWorkerThread.deqTask, followed bookkeeping to support
 733      * helpJoinTask and signal propagation. The code for submission
 734      * queues is almost identical. On each steal, the worker completes
 735      * not only the task, but also all local tasks that this task may
 736      * have generated. On detecting staleness or contention when
 737      * trying to take a task, this method returns without finishing
 738      * sweep, which allows global state rechecks before retry.
 739      *
 740      * @param w the worker
 741      * @param a the number of active workers
 742      * @return true if swept all queues without finding a task
 743      */
 744     private boolean scan(ForkJoinWorkerThread w, int a) {
 745         int g = scanGuard; // mask 0 avoids useless scans if only one active
 746         int m = (parallelism == 1 - a && blockedCount == 0) ? 0 : g & SMASK;
 747         ForkJoinWorkerThread[] ws = workers;
 748         if (ws == null || ws.length <= m)         // staleness check
 749             return false;
 750         for (int r = w.seed, k = r, j = -(m + m); j <= m + m; ++j) {
 751             ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
 752             ForkJoinWorkerThread v = ws[k & m];
 753             if (v != null && (b = v.queueBase) != v.queueTop &&
 754                 (q = v.queue) != null && (i = (q.length - 1) & b) >= 0) {
 755                 long u = (i << ASHIFT) + ABASE;
 756                 if ((t = q[i]) != null && v.queueBase == b &&
 757                     UNSAFE.compareAndSwapObject(q, u, t, null)) {
 758                     int d = (v.queueBase = b + 1) - v.queueTop;
 759                     v.stealHint = w.poolIndex;
 760                     if (d != 0)
 761                         signalWork();             // propagate if nonempty
 762                     w.execTask(t);
 763                 }
 764                 r ^= r << 13; r ^= r >>> 17; w.seed = r ^ (r << 5);
 765                 return false;                     // store next seed
 766             }
 767             else if (j < 0) {                     // xorshift
 768                 r ^= r << 13; r ^= r >>> 17; k = r ^= r << 5;
 769             }
 770             else
 771                 ++k;
 772         }
 773         if (scanGuard != g)                       // staleness check
 774             return false;
 775         else {                                    // try to take submission
 776             ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
 777             if ((b = queueBase) != queueTop &&
 778                 (q = submissionQueue) != null &&
 779                 (i = (q.length - 1) & b) >= 0) {
 780                 long u = (i << ASHIFT) + ABASE;
 781                 if ((t = q[i]) != null && queueBase == b &&
 782                     UNSAFE.compareAndSwapObject(q, u, t, null)) {
 783                     queueBase = b + 1;
 784                     w.execTask(t);
 785                 }
 786                 return false;
 787             }
 788             return true;                         // all queues empty
 789         }
 790     }
 791 
 792     /**
 793      * Tries to enqueue worker w in wait queue and await change in
 794      * worker's eventCount.  If the pool is quiescent and there is
 795      * more than one worker, possibly terminates worker upon exit.
 796      * Otherwise, before blocking, rescans queues to avoid missed
 797      * signals.  Upon finding work, releases at least one worker
 798      * (which may be the current worker). Rescans restart upon
 799      * detected staleness or failure to release due to
 800      * contention. Note the unusual conventions about Thread.interrupt
 801      * here and elsewhere: Because interrupts are used solely to alert
 802      * threads to check termination, which is checked here anyway, we
 803      * clear status (using Thread.interrupted) before any call to
 804      * park, so that park does not immediately return due to status
 805      * being set via some other unrelated call to interrupt in user
 806      * code.
 807      *
 808      * @param w the calling worker
 809      * @param c the ctl value on entry
 810      * @return true if waited or another thread was released upon enq
 811      */
 812     private boolean tryAwaitWork(ForkJoinWorkerThread w, long c) {
 813         int v = w.eventCount;
 814         w.nextWait = (int)c;                      // w's successor record
 815         long nc = (long)(v & E_MASK) | ((c - AC_UNIT) & (AC_MASK|TC_MASK));
 816         if (ctl != c || !UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) {
 817             long d = ctl; // return true if lost to a deq, to force scan
 818             return (int)d != (int)c && ((d - c) & AC_MASK) >= 0L;
 819         }
 820         for (int sc = w.stealCount; sc != 0;) {   // accumulate stealCount
 821             long s = stealCount;
 822             if (UNSAFE.compareAndSwapLong(this, stealCountOffset, s, s + sc))
 823                 sc = w.stealCount = 0;
 824             else if (w.eventCount != v)
 825                 return true;                      // update next time
 826         }
 827         if ((!shutdown || !tryTerminate(false)) &&
 828             (int)c != 0 && parallelism + (int)(nc >> AC_SHIFT) == 0 &&
 829             blockedCount == 0 && quiescerCount == 0)
 830             idleAwaitWork(w, nc, c, v);           // quiescent
 831         for (boolean rescanned = false;;) {
 832             if (w.eventCount != v)
 833                 return true;
 834             if (!rescanned) {
 835                 int g = scanGuard, m = g & SMASK;
 836                 ForkJoinWorkerThread[] ws = workers;
 837                 if (ws != null && m < ws.length) {
 838                     rescanned = true;
 839                     for (int i = 0; i <= m; ++i) {
 840                         ForkJoinWorkerThread u = ws[i];
 841                         if (u != null) {
 842                             if (u.queueBase != u.queueTop &&
 843                                 !tryReleaseWaiter())
 844                                 rescanned = false; // contended
 845                             if (w.eventCount != v)
 846                                 return true;
 847                         }
 848                     }
 849                 }
 850                 if (scanGuard != g ||              // stale
 851                     (queueBase != queueTop && !tryReleaseWaiter()))
 852                     rescanned = false;
 853                 if (!rescanned)
 854                     Thread.yield();                // reduce contention
 855                 else
 856                     Thread.interrupted();          // clear before park
 857             }
 858             else {
 859                 w.parked = true;                   // must recheck
 860                 if (w.eventCount != v) {
 861                     w.parked = false;
 862                     return true;
 863                 }
 864                 LockSupport.park(this);
 865                 rescanned = w.parked = false;
 866             }
 867         }
 868     }
 869 
 870     /**
 871      * If inactivating worker w has caused pool to become
 872      * quiescent, check for pool termination, and wait for event
 873      * for up to SHRINK_RATE nanosecs (rescans are unnecessary in
 874      * this case because quiescence reflects consensus about lack
 875      * of work). On timeout, if ctl has not changed, terminate the
 876      * worker. Upon its termination (see deregisterWorker), it may
 877      * wake up another worker to possibly repeat this process.
 878      *
 879      * @param w the calling worker
 880      * @param currentCtl the ctl value after enqueuing w
 881      * @param prevCtl the ctl value if w terminated
 882      * @param v the eventCount w awaits change
 883      */
 884     private void idleAwaitWork(ForkJoinWorkerThread w, long currentCtl,
 885                                long prevCtl, int v) {
 886         if (w.eventCount == v) {
 887             if (shutdown)
 888                 tryTerminate(false);
 889             ForkJoinTask.helpExpungeStaleExceptions(); // help clean weak refs
 890             while (ctl == currentCtl) {
 891                 long startTime = System.nanoTime();
 892                 w.parked = true;
 893                 if (w.eventCount == v)             // must recheck
 894                     LockSupport.parkNanos(this, SHRINK_RATE);
 895                 w.parked = false;
 896                 if (w.eventCount != v)
 897                     break;
 898                 else if (System.nanoTime() - startTime <
 899                          SHRINK_RATE - (SHRINK_RATE / 10)) // timing slop
 900                     Thread.interrupted();          // spurious wakeup
 901                 else if (UNSAFE.compareAndSwapLong(this, ctlOffset,
 902                                                    currentCtl, prevCtl)) {
 903                     w.terminate = true;            // restore previous
 904                     w.eventCount = ((int)currentCtl + EC_UNIT) & E_MASK;
 905                     break;
 906                 }
 907             }
 908         }
 909     }
 910 
 911     // Submissions
 912 
 913     /**
 914      * Enqueues the given task in the submissionQueue.  Same idea as
 915      * ForkJoinWorkerThread.pushTask except for use of submissionLock.
 916      *
 917      * @param t the task
 918      */
 919     private void addSubmission(ForkJoinTask<?> t) {
 920         final ReentrantLock lock = this.submissionLock;
 921         lock.lock();
 922         try {
 923             ForkJoinTask<?>[] q; int s, m;
 924             if ((q = submissionQueue) != null) {    // ignore if queue removed
 925                 long u = (((s = queueTop) & (m = q.length-1)) << ASHIFT)+ABASE;
 926                 UNSAFE.putOrderedObject(q, u, t);
 927                 queueTop = s + 1;
 928                 if (s - queueBase == m)
 929                     growSubmissionQueue();
 930             }
 931         } finally {
 932             lock.unlock();
 933         }
 934         signalWork();
 935     }
 936 
 937     //  (pollSubmission is defined below with exported methods)
 938 
 939     /**
 940      * Creates or doubles submissionQueue array.
 941      * Basically identical to ForkJoinWorkerThread version.
 942      */
 943     private void growSubmissionQueue() {
 944         ForkJoinTask<?>[] oldQ = submissionQueue;
 945         int size = oldQ != null ? oldQ.length << 1 : INITIAL_QUEUE_CAPACITY;
 946         if (size > MAXIMUM_QUEUE_CAPACITY)
 947             throw new RejectedExecutionException("Queue capacity exceeded");
 948         if (size < INITIAL_QUEUE_CAPACITY)
 949             size = INITIAL_QUEUE_CAPACITY;
 950         ForkJoinTask<?>[] q = submissionQueue = new ForkJoinTask<?>[size];
 951         int mask = size - 1;
 952         int top = queueTop;
 953         int oldMask;
 954         if (oldQ != null && (oldMask = oldQ.length - 1) >= 0) {
 955             for (int b = queueBase; b != top; ++b) {
 956                 long u = ((b & oldMask) << ASHIFT) + ABASE;
 957                 Object x = UNSAFE.getObjectVolatile(oldQ, u);
 958                 if (x != null && UNSAFE.compareAndSwapObject(oldQ, u, x, null))
 959                     UNSAFE.putObjectVolatile
 960                         (q, ((b & mask) << ASHIFT) + ABASE, x);
 961             }
 962         }
 963     }
 964 
 965     // Blocking support
 966 
 967     /**
 968      * Tries to increment blockedCount, decrement active count
 969      * (sometimes implicitly) and possibly release or create a
 970      * compensating worker in preparation for blocking. Fails
 971      * on contention or termination.
 972      *
 973      * @return true if the caller can block, else should recheck and retry
 974      */
 975     private boolean tryPreBlock() {
 976         int b = blockedCount;
 977         if (UNSAFE.compareAndSwapInt(this, blockedCountOffset, b, b + 1)) {
 978             int pc = parallelism;
 979             do {
 980                 ForkJoinWorkerThread[] ws; ForkJoinWorkerThread w;
 981                 int e, ac, tc, rc, i;
 982                 long c = ctl;
 983                 int u = (int)(c >>> 32);
 984                 if ((e = (int)c) < 0) {
 985                                                  // skip -- terminating
 986                 }
 987                 else if ((ac = (u >> UAC_SHIFT)) <= 0 && e != 0 &&
 988                          (ws = workers) != null &&
 989                          (i = ~e & SMASK) < ws.length &&
 990                          (w = ws[i]) != null) {
 991                     long nc = ((long)(w.nextWait & E_MASK) |
 992                                (c & (AC_MASK|TC_MASK)));
 993                     if (w.eventCount == e &&
 994                         UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) {
 995                         w.eventCount = (e + EC_UNIT) & E_MASK;
 996                         if (w.parked)
 997                             UNSAFE.unpark(w);
 998                         return true;             // release an idle worker
 999                     }
1000                 }
1001                 else if ((tc = (short)(u >>> UTC_SHIFT)) >= 0 && ac + pc > 1) {
1002                     long nc = ((c - AC_UNIT) & AC_MASK) | (c & ~AC_MASK);
1003                     if (UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc))
1004                         return true;             // no compensation needed
1005                 }
1006                 else if (tc + pc < MAX_ID) {
1007                     long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK);
1008                     if (UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) {
1009                         addWorker();
1010                         return true;            // create a replacement
1011                     }
1012                 }
1013                 // try to back out on any failure and let caller retry
1014             } while (!UNSAFE.compareAndSwapInt(this, blockedCountOffset,
1015                                                b = blockedCount, b - 1));
1016         }
1017         return false;
1018     }
1019 
1020     /**
1021      * Decrements blockedCount and increments active count
1022      */
1023     private void postBlock() {
1024         long c;
1025         do {} while (!UNSAFE.compareAndSwapLong(this, ctlOffset,  // no mask
1026                                                 c = ctl, c + AC_UNIT));
1027         int b;
1028         do {} while (!UNSAFE.compareAndSwapInt(this, blockedCountOffset,
1029                                                b = blockedCount, b - 1));
1030     }
1031 
1032     /**
1033      * Possibly blocks waiting for the given task to complete, or
1034      * cancels the task if terminating.  Fails to wait if contended.
1035      *
1036      * @param joinMe the task
1037      */
1038     final void tryAwaitJoin(ForkJoinTask<?> joinMe) {
1039         int s;
1040         Thread.interrupted(); // clear interrupts before checking termination
1041         if (joinMe.status >= 0) {
1042             if (tryPreBlock()) {
1043                 joinMe.tryAwaitDone(0L);
1044                 postBlock();
1045             }
1046             else if ((ctl & STOP_BIT) != 0L)
1047                 joinMe.cancelIgnoringExceptions();
1048         }
1049     }
1050 
1051     /**
1052      * Possibly blocks the given worker waiting for joinMe to
1053      * complete or timeout
1054      *
1055      * @param joinMe the task
1056      * @param millis the wait time for underlying Object.wait
1057      */
1058     final void timedAwaitJoin(ForkJoinTask<?> joinMe, long nanos) {
1059         while (joinMe.status >= 0) {
1060             Thread.interrupted();
1061             if ((ctl & STOP_BIT) != 0L) {
1062                 joinMe.cancelIgnoringExceptions();
1063                 break;
1064             }
1065             if (tryPreBlock()) {
1066                 long last = System.nanoTime();
1067                 while (joinMe.status >= 0) {
1068                     long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
1069                     if (millis <= 0)
1070                         break;
1071                     joinMe.tryAwaitDone(millis);
1072                     if (joinMe.status < 0)
1073                         break;
1074                     if ((ctl & STOP_BIT) != 0L) {
1075                         joinMe.cancelIgnoringExceptions();
1076                         break;
1077                     }
1078                     long now = System.nanoTime();
1079                     nanos -= now - last;
1080                     last = now;
1081                 }
1082                 postBlock();
1083                 break;
1084             }
1085         }
1086     }
1087 
1088     /**
1089      * If necessary, compensates for blocker, and blocks
1090      */
1091     private void awaitBlocker(ManagedBlocker blocker)
1092         throws InterruptedException {
1093         while (!blocker.isReleasable()) {
1094             if (tryPreBlock()) {
1095                 try {
1096                     do {} while (!blocker.isReleasable() && !blocker.block());
1097                 } finally {
1098                     postBlock();
1099                 }
1100                 break;
1101             }
1102         }
1103     }
1104 
1105     // Creating, registering and deregistring workers
1106 
1107     /**
1108      * Tries to create and start a worker; minimally rolls back counts
1109      * on failure.
1110      */
1111     private void addWorker() {
1112         Throwable ex = null;
1113         ForkJoinWorkerThread t = null;
1114         try {
1115             t = factory.newThread(this);
1116         } catch (Throwable e) {
1117             ex = e;
1118         }
1119         if (t == null) {  // null or exceptional factory return
1120             long c;       // adjust counts
1121             do {} while (!UNSAFE.compareAndSwapLong
1122                          (this, ctlOffset, c = ctl,
1123                           (((c - AC_UNIT) & AC_MASK) |
1124                            ((c - TC_UNIT) & TC_MASK) |
1125                            (c & ~(AC_MASK|TC_MASK)))));
1126             // Propagate exception if originating from an external caller
1127             if (!tryTerminate(false) && ex != null &&
1128                 !(Thread.currentThread() instanceof ForkJoinWorkerThread))
1129                 UNSAFE.throwException(ex);
1130         }
1131         else
1132             t.start();
1133     }
1134 
1135     /**
1136      * Callback from ForkJoinWorkerThread constructor to assign a
1137      * public name
1138      */
1139     final String nextWorkerName() {
1140         for (int n;;) {
1141             if (UNSAFE.compareAndSwapInt(this, nextWorkerNumberOffset,
1142                                          n = nextWorkerNumber, ++n))
1143                 return workerNamePrefix + n;
1144         }
1145     }
1146 
1147     /**
1148      * Callback from ForkJoinWorkerThread constructor to
1149      * determine its poolIndex and record in workers array.
1150      *
1151      * @param w the worker
1152      * @return the worker's pool index
1153      */
1154     final int registerWorker(ForkJoinWorkerThread w) {
1155         /*
1156          * In the typical case, a new worker acquires the lock, uses
1157          * next available index and returns quickly.  Since we should
1158          * not block callers (ultimately from signalWork or
1159          * tryPreBlock) waiting for the lock needed to do this, we
1160          * instead help release other workers while waiting for the
1161          * lock.
1162          */
1163         for (int g;;) {
1164             ForkJoinWorkerThread[] ws;
1165             if (((g = scanGuard) & SG_UNIT) == 0 &&
1166                 UNSAFE.compareAndSwapInt(this, scanGuardOffset,
1167                                          g, g | SG_UNIT)) {
1168                 int k = nextWorkerIndex;
1169                 try {
1170                     if ((ws = workers) != null) { // ignore on shutdown
1171                         int n = ws.length;
1172                         if (k < 0 || k >= n || ws[k] != null) {
1173                             for (k = 0; k < n && ws[k] != null; ++k)
1174                                 ;
1175                             if (k == n)
1176                                 ws = workers = Arrays.copyOf(ws, n << 1);
1177                         }
1178                         ws[k] = w;
1179                         nextWorkerIndex = k + 1;
1180                         int m = g & SMASK;
1181                         g = (k > m) ? ((m << 1) + 1) & SMASK : g + (SG_UNIT<<1);
1182                     }
1183                 } finally {
1184                     scanGuard = g;
1185                 }
1186                 return k;
1187             }
1188             else if ((ws = workers) != null) { // help release others
1189                 for (ForkJoinWorkerThread u : ws) {
1190                     if (u != null && u.queueBase != u.queueTop) {
1191                         if (tryReleaseWaiter())
1192                             break;
1193                     }
1194                 }
1195             }
1196         }
1197     }
1198 
1199     /**
1200      * Final callback from terminating worker.  Removes record of
1201      * worker from array, and adjusts counts. If pool is shutting
1202      * down, tries to complete termination.
1203      *
1204      * @param w the worker
1205      */
1206     final void deregisterWorker(ForkJoinWorkerThread w, Throwable ex) {
1207         int idx = w.poolIndex;
1208         int sc = w.stealCount;
1209         int steps = 0;
1210         // Remove from array, adjust worker counts and collect steal count.
1211         // We can intermix failed removes or adjusts with steal updates
1212         do {
1213             long s, c;
1214             int g;
1215             if (steps == 0 && ((g = scanGuard) & SG_UNIT) == 0 &&
1216                 UNSAFE.compareAndSwapInt(this, scanGuardOffset,
1217                                          g, g |= SG_UNIT)) {
1218                 ForkJoinWorkerThread[] ws = workers;
1219                 if (ws != null && idx >= 0 &&
1220                     idx < ws.length && ws[idx] == w)
1221                     ws[idx] = null;    // verify
1222                 nextWorkerIndex = idx;
1223                 scanGuard = g + SG_UNIT;
1224                 steps = 1;
1225             }
1226             if (steps == 1 &&
1227                 UNSAFE.compareAndSwapLong(this, ctlOffset, c = ctl,
1228                                           (((c - AC_UNIT) & AC_MASK) |
1229                                            ((c - TC_UNIT) & TC_MASK) |
1230                                            (c & ~(AC_MASK|TC_MASK)))))
1231                 steps = 2;
1232             if (sc != 0 &&
1233                 UNSAFE.compareAndSwapLong(this, stealCountOffset,
1234                                           s = stealCount, s + sc))
1235                 sc = 0;
1236         } while (steps != 2 || sc != 0);
1237         if (!tryTerminate(false)) {
1238             if (ex != null)   // possibly replace if died abnormally
1239                 signalWork();
1240             else
1241                 tryReleaseWaiter();
1242         }
1243     }
1244 
1245     // Shutdown and termination
1246 
1247     /**
1248      * Possibly initiates and/or completes termination.
1249      *
1250      * @param now if true, unconditionally terminate, else only
1251      * if shutdown and empty queue and no active workers
1252      * @return true if now terminating or terminated
1253      */
1254     private boolean tryTerminate(boolean now) {
1255         long c;
1256         while (((c = ctl) & STOP_BIT) == 0) {
1257             if (!now) {
1258                 if ((int)(c >> AC_SHIFT) != -parallelism)
1259                     return false;
1260                 if (!shutdown || blockedCount != 0 || quiescerCount != 0 ||
1261                     queueBase != queueTop) {
1262                     if (ctl == c) // staleness check
1263                         return false;
1264                     continue;
1265                 }
1266             }
1267             if (UNSAFE.compareAndSwapLong(this, ctlOffset, c, c | STOP_BIT))
1268                 startTerminating();
1269         }
1270         if ((short)(c >>> TC_SHIFT) == -parallelism) { // signal when 0 workers
1271             final ReentrantLock lock = this.submissionLock;
1272             lock.lock();
1273             try {
1274                 termination.signalAll();
1275             } finally {
1276                 lock.unlock();
1277             }
1278         }
1279         return true;
1280     }
1281 
1282     /**
1283      * Runs up to three passes through workers: (0) Setting
1284      * termination status for each worker, followed by wakeups up to
1285      * queued workers; (1) helping cancel tasks; (2) interrupting
1286      * lagging threads (likely in external tasks, but possibly also
1287      * blocked in joins).  Each pass repeats previous steps because of
1288      * potential lagging thread creation.
1289      */
1290     private void startTerminating() {
1291         cancelSubmissions();
1292         for (int pass = 0; pass < 3; ++pass) {
1293             ForkJoinWorkerThread[] ws = workers;
1294             if (ws != null) {
1295                 for (ForkJoinWorkerThread w : ws) {
1296                     if (w != null) {
1297                         w.terminate = true;
1298                         if (pass > 0) {
1299                             w.cancelTasks();
1300                             if (pass > 1 && !w.isInterrupted()) {
1301                                 try {
1302                                     w.interrupt();
1303                                 } catch (SecurityException ignore) {
1304                                 }
1305                             }
1306                         }
1307                     }
1308                 }
1309                 terminateWaiters();
1310             }
1311         }
1312     }
1313 
1314     /**
1315      * Polls and cancels all submissions. Called only during termination.
1316      */
1317     private void cancelSubmissions() {
1318         while (queueBase != queueTop) {
1319             ForkJoinTask<?> task = pollSubmission();
1320             if (task != null) {
1321                 try {
1322                     task.cancel(false);
1323                 } catch (Throwable ignore) {
1324                 }
1325             }
1326         }
1327     }
1328 
1329     /**
1330      * Tries to set the termination status of waiting workers, and
1331      * then wakes them up (after which they will terminate).
1332      */
1333     private void terminateWaiters() {
1334         ForkJoinWorkerThread[] ws = workers;
1335         if (ws != null) {
1336             ForkJoinWorkerThread w; long c; int i, e;
1337             int n = ws.length;
1338             while ((i = ~(e = (int)(c = ctl)) & SMASK) < n &&
1339                    (w = ws[i]) != null && w.eventCount == (e & E_MASK)) {
1340                 if (UNSAFE.compareAndSwapLong(this, ctlOffset, c,
1341                                               (long)(w.nextWait & E_MASK) |
1342                                               ((c + AC_UNIT) & AC_MASK) |
1343                                               (c & (TC_MASK|STOP_BIT)))) {
1344                     w.terminate = true;
1345                     w.eventCount = e + EC_UNIT;
1346                     if (w.parked)
1347                         UNSAFE.unpark(w);
1348                 }
1349             }
1350         }
1351     }
1352 
1353     // misc ForkJoinWorkerThread support
1354 
1355     /**
1356      * Increment or decrement quiescerCount. Needed only to prevent
1357      * triggering shutdown if a worker is transiently inactive while
1358      * checking quiescence.
1359      *
1360      * @param delta 1 for increment, -1 for decrement
1361      */
1362     final void addQuiescerCount(int delta) {
1363         int c;
1364         do {} while (!UNSAFE.compareAndSwapInt(this, quiescerCountOffset,
1365                                                c = quiescerCount, c + delta));
1366     }
1367 
1368     /**
1369      * Directly increment or decrement active count without
1370      * queuing. This method is used to transiently assert inactivation
1371      * while checking quiescence.
1372      *
1373      * @param delta 1 for increment, -1 for decrement
1374      */
1375     final void addActiveCount(int delta) {
1376         long d = delta < 0 ? -AC_UNIT : AC_UNIT;
1377         long c;
1378         do {} while (!UNSAFE.compareAndSwapLong(this, ctlOffset, c = ctl,
1379                                                 ((c + d) & AC_MASK) |
1380                                                 (c & ~AC_MASK)));
1381     }
1382 
1383     /**
1384      * Returns the approximate (non-atomic) number of idle threads per
1385      * active thread.
1386      */
1387     final int idlePerActive() {
1388         // Approximate at powers of two for small values, saturate past 4
1389         int p = parallelism;
1390         int a = p + (int)(ctl >> AC_SHIFT);
1391         return (a > (p >>>= 1) ? 0 :
1392                 a > (p >>>= 1) ? 1 :
1393                 a > (p >>>= 1) ? 2 :
1394                 a > (p >>>= 1) ? 4 :
1395                 8);
1396     }
1397 
1398     // Exported methods
1399 
1400     // Constructors
1401 
1402     /**
1403      * Creates a {@code ForkJoinPool} with parallelism equal to {@link
1404      * java.lang.Runtime#availableProcessors}, using the {@linkplain
1405      * #defaultForkJoinWorkerThreadFactory default thread factory},
1406      * no UncaughtExceptionHandler, and non-async LIFO processing mode.
1407      *
1408      * @throws SecurityException if a security manager exists and
1409      *         the caller is not permitted to modify threads
1410      *         because it does not hold {@link
1411      *         java.lang.RuntimePermission}{@code ("modifyThread")}
1412      */
1413     public ForkJoinPool() {
1414         this(Runtime.getRuntime().availableProcessors(),
1415              defaultForkJoinWorkerThreadFactory, null, false);
1416     }
1417 
1418     /**
1419      * Creates a {@code ForkJoinPool} with the indicated parallelism
1420      * level, the {@linkplain
1421      * #defaultForkJoinWorkerThreadFactory default thread factory},
1422      * no UncaughtExceptionHandler, and non-async LIFO processing mode.
1423      *
1424      * @param parallelism the parallelism level
1425      * @throws IllegalArgumentException if parallelism less than or
1426      *         equal to zero, or greater than implementation limit
1427      * @throws SecurityException if a security manager exists and
1428      *         the caller is not permitted to modify threads
1429      *         because it does not hold {@link
1430      *         java.lang.RuntimePermission}{@code ("modifyThread")}
1431      */
1432     public ForkJoinPool(int parallelism) {
1433         this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
1434     }
1435 
1436     /**
1437      * Creates a {@code ForkJoinPool} with the given parameters.
1438      *
1439      * @param parallelism the parallelism level. For default value,
1440      * use {@link java.lang.Runtime#availableProcessors}.
1441      * @param factory the factory for creating new threads. For default value,
1442      * use {@link #defaultForkJoinWorkerThreadFactory}.
1443      * @param handler the handler for internal worker threads that
1444      * terminate due to unrecoverable errors encountered while executing
1445      * tasks. For default value, use {@code null}.
1446      * @param asyncMode if true,
1447      * establishes local first-in-first-out scheduling mode for forked
1448      * tasks that are never joined. This mode may be more appropriate
1449      * than default locally stack-based mode in applications in which
1450      * worker threads only process event-style asynchronous tasks.
1451      * For default value, use {@code false}.
1452      * @throws IllegalArgumentException if parallelism less than or
1453      *         equal to zero, or greater than implementation limit
1454      * @throws NullPointerException if the factory is null
1455      * @throws SecurityException if a security manager exists and
1456      *         the caller is not permitted to modify threads
1457      *         because it does not hold {@link
1458      *         java.lang.RuntimePermission}{@code ("modifyThread")}
1459      */
1460     public ForkJoinPool(int parallelism,
1461                         ForkJoinWorkerThreadFactory factory,
1462                         Thread.UncaughtExceptionHandler handler,
1463                         boolean asyncMode) {
1464         checkPermission();
1465         if (factory == null)
1466             throw new NullPointerException();
1467         if (parallelism <= 0 || parallelism > MAX_ID)
1468             throw new IllegalArgumentException();
1469         this.parallelism = parallelism;
1470         this.factory = factory;
1471         this.ueh = handler;
1472         this.locallyFifo = asyncMode;
1473         long np = (long)(-parallelism); // offset ctl counts
1474         this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
1475         this.submissionQueue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
1476         // initialize workers array with room for 2*parallelism if possible
1477         int n = parallelism << 1;
1478         if (n >= MAX_ID)
1479             n = MAX_ID;
1480         else { // See Hackers Delight, sec 3.2, where n < (1 << 16)
1481             n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8;
1482         }
1483         workers = new ForkJoinWorkerThread[n + 1];
1484         this.submissionLock = new ReentrantLock();
1485         this.termination = submissionLock.newCondition();
1486         StringBuilder sb = new StringBuilder("ForkJoinPool-");
1487         sb.append(poolNumberGenerator.incrementAndGet());
1488         sb.append("-worker-");
1489         this.workerNamePrefix = sb.toString();
1490     }
1491 
1492     // Execution methods
1493 
1494     /**
1495      * Performs the given task, returning its result upon completion.
1496      * If the computation encounters an unchecked Exception or Error,
1497      * it is rethrown as the outcome of this invocation.  Rethrown
1498      * exceptions behave in the same way as regular exceptions, but,
1499      * when possible, contain stack traces (as displayed for example
1500      * using {@code ex.printStackTrace()}) of both the current thread
1501      * as well as the thread actually encountering the exception;
1502      * minimally only the latter.
1503      *
1504      * @param task the task
1505      * @return the task's result
1506      * @throws NullPointerException if the task is null
1507      * @throws RejectedExecutionException if the task cannot be
1508      *         scheduled for execution
1509      */
1510     public <T> T invoke(ForkJoinTask<T> task) {
1511         Thread t = Thread.currentThread();
1512         if (task == null)
1513             throw new NullPointerException();
1514         if (shutdown)
1515             throw new RejectedExecutionException();
1516         if ((t instanceof ForkJoinWorkerThread) &&
1517             ((ForkJoinWorkerThread)t).pool == this)
1518             return task.invoke();  // bypass submit if in same pool
1519         else {
1520             addSubmission(task);
1521             return task.join();
1522         }
1523     }
1524 
1525     /**
1526      * Unless terminating, forks task if within an ongoing FJ
1527      * computation in the current pool, else submits as external task.
1528      */
1529     private <T> void forkOrSubmit(ForkJoinTask<T> task) {
1530         ForkJoinWorkerThread w;
1531         Thread t = Thread.currentThread();
1532         if (shutdown)
1533             throw new RejectedExecutionException();
1534         if ((t instanceof ForkJoinWorkerThread) &&
1535             (w = (ForkJoinWorkerThread)t).pool == this)
1536             w.pushTask(task);
1537         else
1538             addSubmission(task);
1539     }
1540 
1541     /**
1542      * Arranges for (asynchronous) execution of the given task.
1543      *
1544      * @param task the task
1545      * @throws NullPointerException if the task is null
1546      * @throws RejectedExecutionException if the task cannot be
1547      *         scheduled for execution
1548      */
1549     public void execute(ForkJoinTask<?> task) {
1550         if (task == null)
1551             throw new NullPointerException();
1552         forkOrSubmit(task);
1553     }
1554 
1555     // AbstractExecutorService methods
1556 
1557     /**
1558      * @throws NullPointerException if the task is null
1559      * @throws RejectedExecutionException if the task cannot be
1560      *         scheduled for execution
1561      */
1562     public void execute(Runnable task) {
1563         if (task == null)
1564             throw new NullPointerException();
1565         ForkJoinTask<?> job;
1566         if (task instanceof ForkJoinTask<?>) // avoid re-wrap
1567             job = (ForkJoinTask<?>) task;
1568         else
1569             job = ForkJoinTask.adapt(task, null);
1570         forkOrSubmit(job);
1571     }
1572 
1573     /**
1574      * Submits a ForkJoinTask for execution.
1575      *
1576      * @param task the task to submit
1577      * @return the task
1578      * @throws NullPointerException if the task is null
1579      * @throws RejectedExecutionException if the task cannot be
1580      *         scheduled for execution
1581      */
1582     public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
1583         if (task == null)
1584             throw new NullPointerException();
1585         forkOrSubmit(task);
1586         return task;
1587     }
1588 
1589     /**
1590      * @throws NullPointerException if the task is null
1591      * @throws RejectedExecutionException if the task cannot be
1592      *         scheduled for execution
1593      */
1594     public <T> ForkJoinTask<T> submit(Callable<T> task) {
1595         if (task == null)
1596             throw new NullPointerException();
1597         ForkJoinTask<T> job = ForkJoinTask.adapt(task);
1598         forkOrSubmit(job);
1599         return job;
1600     }
1601 
1602     /**
1603      * @throws NullPointerException if the task is null
1604      * @throws RejectedExecutionException if the task cannot be
1605      *         scheduled for execution
1606      */
1607     public <T> ForkJoinTask<T> submit(Runnable task, T result) {
1608         if (task == null)
1609             throw new NullPointerException();
1610         ForkJoinTask<T> job = ForkJoinTask.adapt(task, result);
1611         forkOrSubmit(job);
1612         return job;
1613     }
1614 
1615     /**
1616      * @throws NullPointerException if the task is null
1617      * @throws RejectedExecutionException if the task cannot be
1618      *         scheduled for execution
1619      */
1620     public ForkJoinTask<?> submit(Runnable task) {
1621         if (task == null)
1622             throw new NullPointerException();
1623         ForkJoinTask<?> job;
1624         if (task instanceof ForkJoinTask<?>) // avoid re-wrap
1625             job = (ForkJoinTask<?>) task;
1626         else
1627             job = ForkJoinTask.adapt(task, null);
1628         forkOrSubmit(job);
1629         return job;
1630     }
1631 
1632     /**
1633      * @throws NullPointerException       {@inheritDoc}
1634      * @throws RejectedExecutionException {@inheritDoc}
1635      */
1636     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
1637         ArrayList<ForkJoinTask<T>> forkJoinTasks =
1638             new ArrayList<ForkJoinTask<T>>(tasks.size());
1639         for (Callable<T> task : tasks)
1640             forkJoinTasks.add(ForkJoinTask.adapt(task));
1641         invoke(new InvokeAll<T>(forkJoinTasks));
1642 
1643         @SuppressWarnings({"unchecked", "rawtypes"})
1644             List<Future<T>> futures = (List<Future<T>>) (List) forkJoinTasks;
1645         return futures;
1646     }
1647 
1648     static final class InvokeAll<T> extends RecursiveAction {
1649         final ArrayList<ForkJoinTask<T>> tasks;
1650         InvokeAll(ArrayList<ForkJoinTask<T>> tasks) { this.tasks = tasks; }
1651         public void compute() {
1652             try { invokeAll(tasks); }
1653             catch (Exception ignore) {}
1654         }
1655         private static final long serialVersionUID = -7914297376763021607L;
1656     }
1657 
1658     /**
1659      * Returns the factory used for constructing new workers.
1660      *
1661      * @return the factory used for constructing new workers
1662      */
1663     public ForkJoinWorkerThreadFactory getFactory() {
1664         return factory;
1665     }
1666 
1667     /**
1668      * Returns the handler for internal worker threads that terminate
1669      * due to unrecoverable errors encountered while executing tasks.
1670      *
1671      * @return the handler, or {@code null} if none
1672      */
1673     public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
1674         return ueh;
1675     }
1676 
1677     /**
1678      * Returns the targeted parallelism level of this pool.
1679      *
1680      * @return the targeted parallelism level of this pool
1681      */
1682     public int getParallelism() {
1683         return parallelism;
1684     }
1685 
1686     /**
1687      * Returns the number of worker threads that have started but not
1688      * yet terminated.  The result returned by this method may differ
1689      * from {@link #getParallelism} when threads are created to
1690      * maintain parallelism when others are cooperatively blocked.
1691      *
1692      * @return the number of worker threads
1693      */
1694     public int getPoolSize() {
1695         return parallelism + (short)(ctl >>> TC_SHIFT);
1696     }
1697 
1698     /**
1699      * Returns {@code true} if this pool uses local first-in-first-out
1700      * scheduling mode for forked tasks that are never joined.
1701      *
1702      * @return {@code true} if this pool uses async mode
1703      */
1704     public boolean getAsyncMode() {
1705         return locallyFifo;
1706     }
1707 
1708     /**
1709      * Returns an estimate of the number of worker threads that are
1710      * not blocked waiting to join tasks or for other managed
1711      * synchronization. This method may overestimate the
1712      * number of running threads.
1713      *
1714      * @return the number of worker threads
1715      */
1716     public int getRunningThreadCount() {
1717         int r = parallelism + (int)(ctl >> AC_SHIFT);
1718         return (r <= 0) ? 0 : r; // suppress momentarily negative values
1719     }
1720 
1721     /**
1722      * Returns an estimate of the number of threads that are currently
1723      * stealing or executing tasks. This method may overestimate the
1724      * number of active threads.
1725      *
1726      * @return the number of active threads
1727      */
1728     public int getActiveThreadCount() {
1729         int r = parallelism + (int)(ctl >> AC_SHIFT) + blockedCount;
1730         return (r <= 0) ? 0 : r; // suppress momentarily negative values
1731     }
1732 
1733     /**
1734      * Returns {@code true} if all worker threads are currently idle.
1735      * An idle worker is one that cannot obtain a task to execute
1736      * because none are available to steal from other threads, and
1737      * there are no pending submissions to the pool. This method is
1738      * conservative; it might not return {@code true} immediately upon
1739      * idleness of all threads, but will eventually become true if
1740      * threads remain inactive.
1741      *
1742      * @return {@code true} if all threads are currently idle
1743      */
1744     public boolean isQuiescent() {
1745         return parallelism + (int)(ctl >> AC_SHIFT) + blockedCount == 0;
1746     }
1747 
1748     /**
1749      * Returns an estimate of the total number of tasks stolen from
1750      * one thread's work queue by another. The reported value
1751      * underestimates the actual total number of steals when the pool
1752      * is not quiescent. This value may be useful for monitoring and
1753      * tuning fork/join programs: in general, steal counts should be
1754      * high enough to keep threads busy, but low enough to avoid
1755      * overhead and contention across threads.
1756      *
1757      * @return the number of steals
1758      */
1759     public long getStealCount() {
1760         return stealCount;
1761     }
1762 
1763     /**
1764      * Returns an estimate of the total number of tasks currently held
1765      * in queues by worker threads (but not including tasks submitted
1766      * to the pool that have not begun executing). This value is only
1767      * an approximation, obtained by iterating across all threads in
1768      * the pool. This method may be useful for tuning task
1769      * granularities.
1770      *
1771      * @return the number of queued tasks
1772      */
1773     public long getQueuedTaskCount() {
1774         long count = 0;
1775         ForkJoinWorkerThread[] ws;
1776         if ((short)(ctl >>> TC_SHIFT) > -parallelism &&
1777             (ws = workers) != null) {
1778             for (ForkJoinWorkerThread w : ws)
1779                 if (w != null)
1780                     count -= w.queueBase - w.queueTop; // must read base first
1781         }
1782         return count;
1783     }
1784 
1785     /**
1786      * Returns an estimate of the number of tasks submitted to this
1787      * pool that have not yet begun executing.  This method may take
1788      * time proportional to the number of submissions.
1789      *
1790      * @return the number of queued submissions
1791      */
1792     public int getQueuedSubmissionCount() {
1793         return -queueBase + queueTop;
1794     }
1795 
1796     /**
1797      * Returns {@code true} if there are any tasks submitted to this
1798      * pool that have not yet begun executing.
1799      *
1800      * @return {@code true} if there are any queued submissions
1801      */
1802     public boolean hasQueuedSubmissions() {
1803         return queueBase != queueTop;
1804     }
1805 
1806     /**
1807      * Removes and returns the next unexecuted submission if one is
1808      * available.  This method may be useful in extensions to this
1809      * class that re-assign work in systems with multiple pools.
1810      *
1811      * @return the next submission, or {@code null} if none
1812      */
1813     protected ForkJoinTask<?> pollSubmission() {
1814         ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
1815         while ((b = queueBase) != queueTop &&
1816                (q = submissionQueue) != null &&
1817                (i = (q.length - 1) & b) >= 0) {
1818             long u = (i << ASHIFT) + ABASE;
1819             if ((t = q[i]) != null &&
1820                 queueBase == b &&
1821                 UNSAFE.compareAndSwapObject(q, u, t, null)) {
1822                 queueBase = b + 1;
1823                 return t;
1824             }
1825         }
1826         return null;
1827     }
1828 
1829     /**
1830      * Removes all available unexecuted submitted and forked tasks
1831      * from scheduling queues and adds them to the given collection,
1832      * without altering their execution status. These may include
1833      * artificially generated or wrapped tasks. This method is
1834      * designed to be invoked only when the pool is known to be
1835      * quiescent. Invocations at other times may not remove all
1836      * tasks. A failure encountered while attempting to add elements
1837      * to collection {@code c} may result in elements being in
1838      * neither, either or both collections when the associated
1839      * exception is thrown.  The behavior of this operation is
1840      * undefined if the specified collection is modified while the
1841      * operation is in progress.
1842      *
1843      * @param c the collection to transfer elements into
1844      * @return the number of elements transferred
1845      */
1846     protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
1847         int count = 0;
1848         while (queueBase != queueTop) {
1849             ForkJoinTask<?> t = pollSubmission();
1850             if (t != null) {
1851                 c.add(t);
1852                 ++count;
1853             }
1854         }
1855         ForkJoinWorkerThread[] ws;
1856         if ((short)(ctl >>> TC_SHIFT) > -parallelism &&
1857             (ws = workers) != null) {
1858             for (ForkJoinWorkerThread w : ws)
1859                 if (w != null)
1860                     count += w.drainTasksTo(c);
1861         }
1862         return count;
1863     }
1864 
1865     /**
1866      * Returns a string identifying this pool, as well as its state,
1867      * including indications of run state, parallelism level, and
1868      * worker and task counts.
1869      *
1870      * @return a string identifying this pool, as well as its state
1871      */
1872     public String toString() {
1873         long st = getStealCount();
1874         long qt = getQueuedTaskCount();
1875         long qs = getQueuedSubmissionCount();
1876         int pc = parallelism;
1877         long c = ctl;
1878         int tc = pc + (short)(c >>> TC_SHIFT);
1879         int rc = pc + (int)(c >> AC_SHIFT);
1880         if (rc < 0) // ignore transient negative
1881             rc = 0;
1882         int ac = rc + blockedCount;
1883         String level;
1884         if ((c & STOP_BIT) != 0)
1885             level = (tc == 0) ? "Terminated" : "Terminating";
1886         else
1887             level = shutdown ? "Shutting down" : "Running";
1888         return super.toString() +
1889             "[" + level +
1890             ", parallelism = " + pc +
1891             ", size = " + tc +
1892             ", active = " + ac +
1893             ", running = " + rc +
1894             ", steals = " + st +
1895             ", tasks = " + qt +
1896             ", submissions = " + qs +
1897             "]";
1898     }
1899 
1900     /**
1901      * Initiates an orderly shutdown in which previously submitted
1902      * tasks are executed, but no new tasks will be accepted.
1903      * Invocation has no additional effect if already shut down.
1904      * Tasks that are in the process of being submitted concurrently
1905      * during the course of this method may or may not be rejected.
1906      *
1907      * @throws SecurityException if a security manager exists and
1908      *         the caller is not permitted to modify threads
1909      *         because it does not hold {@link
1910      *         java.lang.RuntimePermission}{@code ("modifyThread")}
1911      */
1912     public void shutdown() {
1913         checkPermission();
1914         shutdown = true;
1915         tryTerminate(false);
1916     }
1917 
1918     /**
1919      * Attempts to cancel and/or stop all tasks, and reject all
1920      * subsequently submitted tasks.  Tasks that are in the process of
1921      * being submitted or executed concurrently during the course of
1922      * this method may or may not be rejected. This method cancels
1923      * both existing and unexecuted tasks, in order to permit
1924      * termination in the presence of task dependencies. So the method
1925      * always returns an empty list (unlike the case for some other
1926      * Executors).
1927      *
1928      * @return an empty list
1929      * @throws SecurityException if a security manager exists and
1930      *         the caller is not permitted to modify threads
1931      *         because it does not hold {@link
1932      *         java.lang.RuntimePermission}{@code ("modifyThread")}
1933      */
1934     public List<Runnable> shutdownNow() {
1935         checkPermission();
1936         shutdown = true;
1937         tryTerminate(true);
1938         return Collections.emptyList();
1939     }
1940 
1941     /**
1942      * Returns {@code true} if all tasks have completed following shut down.
1943      *
1944      * @return {@code true} if all tasks have completed following shut down
1945      */
1946     public boolean isTerminated() {
1947         long c = ctl;
1948         return ((c & STOP_BIT) != 0L &&
1949                 (short)(c >>> TC_SHIFT) == -parallelism);
1950     }
1951 
1952     /**
1953      * Returns {@code true} if the process of termination has
1954      * commenced but not yet completed.  This method may be useful for
1955      * debugging. A return of {@code true} reported a sufficient
1956      * period after shutdown may indicate that submitted tasks have
1957      * ignored or suppressed interruption, or are waiting for IO,
1958      * causing this executor not to properly terminate. (See the
1959      * advisory notes for class {@link ForkJoinTask} stating that
1960      * tasks should not normally entail blocking operations.  But if
1961      * they do, they must abort them on interrupt.)
1962      *
1963      * @return {@code true} if terminating but not yet terminated
1964      */
1965     public boolean isTerminating() {
1966         long c = ctl;
1967         return ((c & STOP_BIT) != 0L &&
1968                 (short)(c >>> TC_SHIFT) != -parallelism);
1969     }
1970 
1971     /**
1972      * Returns true if terminating or terminated. Used by ForkJoinWorkerThread.
1973      */
1974     final boolean isAtLeastTerminating() {
1975         return (ctl & STOP_BIT) != 0L;
1976     }
1977 
1978     /**
1979      * Returns {@code true} if this pool has been shut down.
1980      *
1981      * @return {@code true} if this pool has been shut down
1982      */
1983     public boolean isShutdown() {
1984         return shutdown;
1985     }
1986 
1987     /**
1988      * Blocks until all tasks have completed execution after a shutdown
1989      * request, or the timeout occurs, or the current thread is
1990      * interrupted, whichever happens first.
1991      *
1992      * @param timeout the maximum time to wait
1993      * @param unit the time unit of the timeout argument
1994      * @return {@code true} if this executor terminated and
1995      *         {@code false} if the timeout elapsed before termination
1996      * @throws InterruptedException if interrupted while waiting
1997      */
1998     public boolean awaitTermination(long timeout, TimeUnit unit)
1999         throws InterruptedException {
2000         long nanos = unit.toNanos(timeout);
2001         final ReentrantLock lock = this.submissionLock;
2002         lock.lock();
2003         try {
2004             for (;;) {
2005                 if (isTerminated())
2006                     return true;
2007                 if (nanos <= 0)
2008                     return false;
2009                 nanos = termination.awaitNanos(nanos);
2010             }
2011         } finally {
2012             lock.unlock();
2013         }
2014     }
2015 
2016     /**
2017      * Interface for extending managed parallelism for tasks running
2018      * in {@link ForkJoinPool}s.
2019      *
2020      * <p>A {@code ManagedBlocker} provides two methods.  Method
2021      * {@code isReleasable} must return {@code true} if blocking is
2022      * not necessary. Method {@code block} blocks the current thread
2023      * if necessary (perhaps internally invoking {@code isReleasable}
2024      * before actually blocking). These actions are performed by any
2025      * thread invoking {@link ForkJoinPool#managedBlock}.  The
2026      * unusual methods in this API accommodate synchronizers that may,
2027      * but don't usually, block for long periods. Similarly, they
2028      * allow more efficient internal handling of cases in which
2029      * additional workers may be, but usually are not, needed to
2030      * ensure sufficient parallelism.  Toward this end,
2031      * implementations of method {@code isReleasable} must be amenable
2032      * to repeated invocation.
2033      *
2034      * <p>For example, here is a ManagedBlocker based on a
2035      * ReentrantLock:
2036      *  <pre> {@code
2037      * class ManagedLocker implements ManagedBlocker {
2038      *   final ReentrantLock lock;
2039      *   boolean hasLock = false;
2040      *   ManagedLocker(ReentrantLock lock) { this.lock = lock; }
2041      *   public boolean block() {
2042      *     if (!hasLock)
2043      *       lock.lock();
2044      *     return true;
2045      *   }
2046      *   public boolean isReleasable() {
2047      *     return hasLock || (hasLock = lock.tryLock());
2048      *   }
2049      * }}</pre>
2050      *
2051      * <p>Here is a class that possibly blocks waiting for an
2052      * item on a given queue:
2053      *  <pre> {@code
2054      * class QueueTaker<E> implements ManagedBlocker {
2055      *   final BlockingQueue<E> queue;
2056      *   volatile E item = null;
2057      *   QueueTaker(BlockingQueue<E> q) { this.queue = q; }
2058      *   public boolean block() throws InterruptedException {
2059      *     if (item == null)
2060      *       item = queue.take();
2061      *     return true;
2062      *   }
2063      *   public boolean isReleasable() {
2064      *     return item != null || (item = queue.poll()) != null;
2065      *   }
2066      *   public E getItem() { // call after pool.managedBlock completes
2067      *     return item;
2068      *   }
2069      * }}</pre>
2070      */
2071     public static interface ManagedBlocker {
2072         /**
2073          * Possibly blocks the current thread, for example waiting for
2074          * a lock or condition.
2075          *
2076          * @return {@code true} if no additional blocking is necessary
2077          * (i.e., if isReleasable would return true)
2078          * @throws InterruptedException if interrupted while waiting
2079          * (the method is not required to do so, but is allowed to)
2080          */
2081         boolean block() throws InterruptedException;
2082 
2083         /**
2084          * Returns {@code true} if blocking is unnecessary.
2085          */
2086         boolean isReleasable();
2087     }
2088 
2089     /**
2090      * Blocks in accord with the given blocker.  If the current thread
2091      * is a {@link ForkJoinWorkerThread}, this method possibly
2092      * arranges for a spare thread to be activated if necessary to
2093      * ensure sufficient parallelism while the current thread is blocked.
2094      *
2095      * <p>If the caller is not a {@link ForkJoinTask}, this method is
2096      * behaviorally equivalent to
2097      *  <pre> {@code
2098      * while (!blocker.isReleasable())
2099      *   if (blocker.block())
2100      *     return;
2101      * }</pre>
2102      *
2103      * If the caller is a {@code ForkJoinTask}, then the pool may
2104      * first be expanded to ensure parallelism, and later adjusted.
2105      *
2106      * @param blocker the blocker
2107      * @throws InterruptedException if blocker.block did so
2108      */
2109     public static void managedBlock(ManagedBlocker blocker)
2110         throws InterruptedException {
2111         Thread t = Thread.currentThread();
2112         if (t instanceof ForkJoinWorkerThread) {
2113             ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
2114             w.pool.awaitBlocker(blocker);
2115         }
2116         else {
2117             do {} while (!blocker.isReleasable() && !blocker.block());
2118         }
2119     }
2120 
2121     // AbstractExecutorService overrides.  These rely on undocumented
2122     // fact that ForkJoinTask.adapt returns ForkJoinTasks that also
2123     // implement RunnableFuture.
2124 
2125     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
2126         return (RunnableFuture<T>) ForkJoinTask.adapt(runnable, value);
2127     }
2128 
2129     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
2130         return (RunnableFuture<T>) ForkJoinTask.adapt(callable);
2131     }
2132 
2133     // Unsafe mechanics
2134     private static final sun.misc.Unsafe UNSAFE;
2135     private static final long ctlOffset;
2136     private static final long stealCountOffset;
2137     private static final long blockedCountOffset;
2138     private static final long quiescerCountOffset;
2139     private static final long scanGuardOffset;
2140     private static final long nextWorkerNumberOffset;
2141     private static final long ABASE;
2142     private static final int ASHIFT;
2143 
2144     static {
2145         poolNumberGenerator = new AtomicInteger();
2146         workerSeedGenerator = new Random();
2147         modifyThreadPermission = new RuntimePermission("modifyThread");
2148         defaultForkJoinWorkerThreadFactory =
2149             new DefaultForkJoinWorkerThreadFactory();
2150         int s;
2151         try {
2152             UNSAFE = sun.misc.Unsafe.getUnsafe();
2153             Class<?> k = ForkJoinPool.class;
2154             ctlOffset = UNSAFE.objectFieldOffset
2155                 (k.getDeclaredField("ctl"));
2156             stealCountOffset = UNSAFE.objectFieldOffset
2157                 (k.getDeclaredField("stealCount"));
2158             blockedCountOffset = UNSAFE.objectFieldOffset
2159                 (k.getDeclaredField("blockedCount"));
2160             quiescerCountOffset = UNSAFE.objectFieldOffset
2161                 (k.getDeclaredField("quiescerCount"));
2162             scanGuardOffset = UNSAFE.objectFieldOffset
2163                 (k.getDeclaredField("scanGuard"));
2164             nextWorkerNumberOffset = UNSAFE.objectFieldOffset
2165                 (k.getDeclaredField("nextWorkerNumber"));
2166             Class<?> a = ForkJoinTask[].class;
2167             ABASE = UNSAFE.arrayBaseOffset(a);
2168             s = UNSAFE.arrayIndexScale(a);
2169         } catch (Exception e) {
2170             throw new Error(e);
2171         }
2172         if ((s & (s-1)) != 0)
2173             throw new Error("data type scale not a power of two");
2174         ASHIFT = 31 - Integer.numberOfLeadingZeros(s);
2175     }
2176 
2177 }