src/share/classes/java/util/concurrent/ForkJoinPool.java

Print this page




  18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  19  *
  20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  21  * or visit www.oracle.com if you need additional information or have any
  22  * questions.
  23  */
  24 
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/publicdomain/zero/1.0/
  34  */
  35 
  36 package java.util.concurrent;
  37 

  38 import java.util.ArrayList;
  39 import java.util.Arrays;
  40 import java.util.Collection;
  41 import java.util.Collections;
  42 import java.util.List;
  43 import java.util.concurrent.AbstractExecutorService;
  44 import java.util.concurrent.Callable;
  45 import java.util.concurrent.ExecutorService;
  46 import java.util.concurrent.Future;
  47 import java.util.concurrent.RejectedExecutionException;
  48 import java.util.concurrent.RunnableFuture;
  49 import java.util.concurrent.TimeUnit;
  50 
  51 /**
  52  * An {@link ExecutorService} for running {@link ForkJoinTask}s.
  53  * A {@code ForkJoinPool} provides the entry point for submissions
  54  * from non-{@code ForkJoinTask} clients, as well as management and
  55  * monitoring operations.
  56  *
  57  * <p>A {@code ForkJoinPool} differs from other kinds of {@link


  87  * <p>In addition to execution and lifecycle control methods, this
  88  * class provides status check methods (for example
  89  * {@link #getStealCount}) that are intended to aid in developing,
  90  * tuning, and monitoring fork/join applications. Also, method
  91  * {@link #toString} returns indications of pool state in a
  92  * convenient form for informal monitoring.
  93  *
  94  * <p>As is the case with other ExecutorServices, there are three
  95  * main task execution methods summarized in the following table.
  96  * These are designed to be used primarily by clients not already
  97  * engaged in fork/join computations in the current pool.  The main
  98  * forms of these methods accept instances of {@code ForkJoinTask},
  99  * but overloaded forms also allow mixed execution of plain {@code
 100  * Runnable}- or {@code Callable}- based activities as well.  However,
 101  * tasks that are already executing in a pool should normally instead
 102  * use the within-computation forms listed in the table unless using
 103  * async event-style tasks that are not usually joined, in which case
 104  * there is little difference among choice of methods.
 105  *
 106  * <table BORDER CELLPADDING=3 CELLSPACING=1>

 107  *  <tr>
 108  *    <td></td>
 109  *    <td ALIGN=CENTER> <b>Call from non-fork/join clients</b></td>
 110  *    <td ALIGN=CENTER> <b>Call from within fork/join computations</b></td>
 111  *  </tr>
 112  *  <tr>
 113  *    <td> <b>Arrange async execution</td>
 114  *    <td> {@link #execute(ForkJoinTask)}</td>
 115  *    <td> {@link ForkJoinTask#fork}</td>
 116  *  </tr>
 117  *  <tr>
 118  *    <td> <b>Await and obtain result</td>
 119  *    <td> {@link #invoke(ForkJoinTask)}</td>
 120  *    <td> {@link ForkJoinTask#invoke}</td>
 121  *  </tr>
 122  *  <tr>
 123  *    <td> <b>Arrange exec and obtain Future</td>
 124  *    <td> {@link #submit(ForkJoinTask)}</td>
 125  *    <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td>
 126  *  </tr>
 127  * </table>
 128  *
 129  * <p>The common pool is by default constructed with default
 130  * parameters, but these may be controlled by setting three {@link
 131  * System#getProperty system properties} with prefix {@code
 132  * java.util.concurrent.ForkJoinPool.common}: {@code parallelism} --
 133  * an integer greater than zero, {@code threadFactory} -- the class
 134  * name of a {@link ForkJoinWorkerThreadFactory}, and {@code
 135  * exceptionHandler} -- the class name of a {@link
 136  * java.lang.Thread.UncaughtExceptionHandler
 137  * Thread.UncaughtExceptionHandler}. Upon any error in establishing
 138  * these settings, default parameters are used.



 139  *
 140  * <p><b>Implementation notes</b>: This implementation restricts the
 141  * maximum number of running threads to 32767. Attempts to create
 142  * pools with greater than the maximum number result in
 143  * {@code IllegalArgumentException}.
 144  *
 145  * <p>This implementation rejects submitted tasks (that is, by throwing
 146  * {@link RejectedExecutionException}) only when the pool is shut down
 147  * or internal resources have been exhausted.
 148  *
 149  * @since 1.7
 150  * @author Doug Lea
 151  */
 152 public class ForkJoinPool extends AbstractExecutorService {
 153 
 154     /*
 155      * Implementation Overview
 156      *
 157      * This class and its nested classes provide the main
 158      * functionality and control for a set of worker threads:


 208      * base index, else consider alternative actions, rather than
 209      * method poll.)
 210      *
 211      * This approach also enables support of a user mode in which local
 212      * task processing is in FIFO, not LIFO order, simply by using
 213      * poll rather than pop.  This can be useful in message-passing
 214      * frameworks in which tasks are never joined.  However neither
 215      * mode considers affinities, loads, cache localities, etc, so
 216      * rarely provide the best possible performance on a given
 217      * machine, but portably provide good throughput by averaging over
 218      * these factors.  (Further, even if we did try to use such
 219      * information, we do not usually have a basis for exploiting it.
 220      * For example, some sets of tasks profit from cache affinities,
 221      * but others are harmed by cache pollution effects.)
 222      *
 223      * WorkQueues are also used in a similar way for tasks submitted
 224      * to the pool. We cannot mix these tasks in the same queues used
 225      * for work-stealing (this would contaminate lifo/fifo
 226      * processing). Instead, we randomly associate submission queues
 227      * with submitting threads, using a form of hashing.  The
 228      * ThreadLocal Submitter class contains a value initially used as
 229      * a hash code for choosing existing queues, but may be randomly
 230      * repositioned upon contention with other submitters.  In
 231      * essence, submitters act like workers except that they are
 232      * restricted to executing local tasks that they submitted (or in
 233      * the case of CountedCompleters, others with the same root task).
 234      * However, because most shared/external queue operations are more
 235      * expensive than internal, and because, at steady state, external
 236      * submitters will compete for CPU with workers, ForkJoinTask.join
 237      * and related methods disable them from repeatedly helping to
 238      * process tasks if all workers are active.  Insertion of tasks in
 239      * shared mode requires a lock (mainly to protect in the case of
 240      * resizing) but we use only a simple spinlock (using bits in
 241      * field qlock), because submitters encountering a busy queue move
 242      * on to try or create other queues -- they block only when
 243      * creating and registering new queues.
 244      *
 245      * Management
 246      * ==========
 247      *
 248      * The main throughput advantages of work-stealing stem from
 249      * decentralized control -- workers mostly take tasks from
 250      * themselves or each other. We cannot negate this in the
 251      * implementation of other management responsibilities. The main
 252      * tactic for avoiding bottlenecks is packing nearly all
 253      * essentially atomic control state into two volatile variables
 254      * that are by far most often read (not written) as status and
 255      * consistency checks.
 256      *
 257      * Field "ctl" contains 64 bits holding all the information needed
 258      * to atomically decide to add, inactivate, enqueue (on an event
 259      * queue), dequeue, and/or re-activate workers.  To enable this


 452      * helping opportunities is challenging to control on JVMs, where
 453      * GC and other activities can stall progress of tasks that in
 454      * turn stall out many other dependent tasks, without us being
 455      * able to determine whether they will ever require compensation.
 456      * Even though work-stealing otherwise encounters little
 457      * degradation in the presence of more threads than cores,
 458      * aggressively adding new threads in such cases entails risk of
 459      * unwanted positive feedback control loops in which more threads
 460      * cause more dependent stalls (as well as delayed progress of
 461      * unblocked threads to the point that we know they are available)
 462      * leading to more situations requiring more threads, and so
 463      * on. This aspect of control can be seen as an (analytically
 464      * intractable) game with an opponent that may choose the worst
 465      * (for us) active thread to stall at any time.  We take several
 466      * precautions to bound losses (and thus bound gains), mainly in
 467      * methods tryCompensate and awaitJoin.
 468      *
 469      * Common Pool
 470      * ===========
 471      *
 472      * The static commonPool always exists after static
 473      * initialization.  Since it (or any other created pool) need
 474      * never be used, we minimize initial construction overhead and
 475      * footprint to the setup of about a dozen fields, with no nested
 476      * allocation. Most bootstrapping occurs within method
 477      * fullExternalPush during the first submission to the pool.
 478      *
 479      * When external threads submit to the common pool, they can
 480      * perform some subtask processing (see externalHelpJoin and
 481      * related methods).  We do not need to record whether these
 482      * submissions are to the common pool -- if not, externalHelpJoin
 483      * returns quickly (at the most helping to signal some common pool
 484      * workers). These submitters would otherwise be blocked waiting
 485      * for completion, so the extra effort (with liberally sprinkled
 486      * task status checks) in inapplicable cases amounts to an odd
 487      * form of limited spin-wait before blocking in ForkJoinTask.join.
 488      *
 489      * Style notes
 490      * ===========
 491      *
 492      * There is a lot of representation-level coupling among classes


 531     private static void checkPermission() {
 532         SecurityManager security = System.getSecurityManager();
 533         if (security != null)
 534             security.checkPermission(modifyThreadPermission);
 535     }
 536 
 537     // Nested classes
 538 
 539     /**
 540      * Factory for creating new {@link ForkJoinWorkerThread}s.
 541      * A {@code ForkJoinWorkerThreadFactory} must be defined and used
 542      * for {@code ForkJoinWorkerThread} subclasses that extend base
 543      * functionality or initialize threads with different contexts.
 544      */
 545     public static interface ForkJoinWorkerThreadFactory {
 546         /**
 547          * Returns a new worker thread operating in the given pool.
 548          *
 549          * @param pool the pool this thread works in
 550          * @throws NullPointerException if the pool is null

 551          */
 552         public ForkJoinWorkerThread newThread(ForkJoinPool pool);
 553     }
 554 
 555     /**
 556      * Default ForkJoinWorkerThreadFactory implementation; creates a
 557      * new ForkJoinWorkerThread.
 558      */
 559     static final class DefaultForkJoinWorkerThreadFactory
 560         implements ForkJoinWorkerThreadFactory {
 561         public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
 562             return new ForkJoinWorkerThread(pool);
 563         }
 564     }
 565 
 566     /**
 567      * Per-thread records for threads that submit to pools. Currently
 568      * holds only pseudo-random seed / index that is used to choose
 569      * submission queues in method externalPush. In the future, this may
 570      * also incorporate a means to implement different task rejection
 571      * and resubmission policies.
 572      *
 573      * Seeds for submitters and workers/workQueues work in basically
 574      * the same way but are initialized and updated using slightly
 575      * different mechanics. Both are initialized using the same
 576      * approach as in class ThreadLocal, where successive values are
 577      * unlikely to collide with previous values. Seeds are then
 578      * randomly modified upon collisions using xorshifts, which
 579      * requires a non-zero seed.
 580      */
 581     static final class Submitter {
 582         int seed;
 583         Submitter(int s) { seed = s; }
 584     }
 585 
 586     /**
 587      * Class for artificial tasks that are used to replace the target
 588      * of local joins if they are removed from an interior queue slot
 589      * in WorkQueue.tryRemoveAndExec. We don't need the proxy to
 590      * actually do anything beyond having a unique identity.
 591      */
 592     static final class EmptyTask extends ForkJoinTask<Void> {
 593         private static final long serialVersionUID = -7721805057305804111L;
 594         EmptyTask() { status = ForkJoinTask.NORMAL; } // force done
 595         public final Void getRawResult() { return null; }
 596         public final void setRawResult(Void x) {}
 597         public final boolean exec() { return true; }
 598     }
 599 
 600     /**
 601      * Queues supporting work-stealing as well as external task
 602      * submission. See above for main rationale and algorithms.
 603      * Implementation relies heavily on "Unsafe" intrinsics
 604      * and selective use of "volatile":
 605      *
 606      * Field "base" is the index (mod array.length) of the least valid


 720          * Provides a more accurate estimate of whether this queue has
 721          * any tasks than does queueSize, by checking whether a
 722          * near-empty queue has at least one unclaimed task.
 723          */
 724         final boolean isEmpty() {
 725             ForkJoinTask<?>[] a; int m, s;
 726             int n = base - (s = top);
 727             return (n >= 0 ||
 728                     (n == -1 &&
 729                      ((a = array) == null ||
 730                       (m = a.length - 1) < 0 ||
 731                       U.getObject
 732                       (a, (long)((m & (s - 1)) << ASHIFT) + ABASE) == null)));
 733         }
 734 
 735         /**
 736          * Pushes a task. Call only by owner in unshared queues.  (The
 737          * shared-queue version is embedded in method externalPush.)
 738          *
 739          * @param task the task. Caller must ensure non-null.
 740          * @throw RejectedExecutionException if array cannot be resized
 741          */
 742         final void push(ForkJoinTask<?> task) {
 743             ForkJoinTask<?>[] a; ForkJoinPool p;
 744             int s = top, m, n;
 745             if ((a = array) != null) {    // ignore if queue removed
 746                 int j = (((m = a.length - 1) & s) << ASHIFT) + ABASE;
 747                 U.putOrderedObject(a, j, task);
 748                 if ((n = (top = s + 1) - base) <= 2) {
 749                     if ((p = pool) != null)
 750                         p.signalWork(this);
 751                 }
 752                 else if (n >= m)
 753                     growArray();
 754             }
 755         }
 756 
 757        /**
 758          * Initializes or doubles the capacity of array. Call either
 759          * by owner or with lock held -- it is OK for base, but not
 760          * top, to move while resizings are in progress.


 919                 if (U.compareAndSwapObject(a, j, t, null)) {
 920                     top = s;
 921                     t.doExec();
 922                 }
 923             }
 924         }
 925 
 926         /**
 927          * Polls and runs tasks until empty.
 928          */
 929         private void pollAndExecAll() {
 930             for (ForkJoinTask<?> t; (t = poll()) != null;)
 931                 t.doExec();
 932         }
 933 
 934         /**
 935          * If present, removes from queue and executes the given task,
 936          * or any other cancelled task. Returns (true) on any CAS
 937          * or consistency check failure so caller can retry.
 938          *
 939          * @return false if no progress can be made, else true;
 940          */
 941         final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
 942             boolean stat = true, removed = false, empty = true;
 943             ForkJoinTask<?>[] a; int m, s, b, n;
 944             if ((a = array) != null && (m = a.length - 1) >= 0 &&
 945                 (n = (s = top) - (b = base)) > 0) {
 946                 for (ForkJoinTask<?> t;;) {           // traverse from s to b
 947                     int j = ((--s & m) << ASHIFT) + ABASE;
 948                     t = (ForkJoinTask<?>)U.getObjectVolatile(a, j);
 949                     if (t == null)                    // inconsistent length
 950                         break;
 951                     else if (t == task) {
 952                         if (s + 1 == top) {           // pop
 953                             if (!U.compareAndSwapObject(a, j, task, null))
 954                                 break;
 955                             top = s;
 956                             removed = true;
 957                         }
 958                         else if (base == b)           // replace with proxy
 959                             removed = U.compareAndSwapObject(a, j, task,


 964                         empty = false;
 965                     else if (s + 1 == top) {          // pop and throw away
 966                         if (U.compareAndSwapObject(a, j, t, null))
 967                             top = s;
 968                         break;
 969                     }
 970                     if (--n == 0) {
 971                         if (!empty && base == b)
 972                             stat = false;
 973                         break;
 974                     }
 975                 }
 976             }
 977             if (removed)
 978                 task.doExec();
 979             return stat;
 980         }
 981 
 982         /**
 983          * Polls for and executes the given task or any other task in
 984          * its CountedCompleter computation
 985          */
 986         final boolean pollAndExecCC(ForkJoinTask<?> root) {
 987             ForkJoinTask<?>[] a; int b; Object o;
 988             outer: while ((b = base) - top < 0 && (a = array) != null) {
 989                 long j = (((a.length - 1) & b) << ASHIFT) + ABASE;
 990                 if ((o = U.getObject(a, j)) == null ||
 991                     !(o instanceof CountedCompleter))
 992                     break;
 993                 for (CountedCompleter<?> t = (CountedCompleter<?>)o, r = t;;) {
 994                     if (r == root) {
 995                         if (base == b &&
 996                             U.compareAndSwapObject(a, j, t, null)) {
 997                             base = b + 1;
 998                             t.doExec();
 999                             return true;
1000                         }
1001                         else
1002                             break; // restart
1003                     }
1004                     if ((r = r.completer) == null)


1038         }
1039 
1040         /**
1041          * Returns true if owned and not known to be blocked.
1042          */
1043         final boolean isApparentlyUnblocked() {
1044             Thread wt; Thread.State s;
1045             return (eventCount >= 0 &&
1046                     (wt = owner) != null &&
1047                     (s = wt.getState()) != Thread.State.BLOCKED &&
1048                     s != Thread.State.WAITING &&
1049                     s != Thread.State.TIMED_WAITING);
1050         }
1051 
1052         // Unsafe mechanics
1053         private static final sun.misc.Unsafe U;
1054         private static final long QLOCK;
1055         private static final int ABASE;
1056         private static final int ASHIFT;
1057         static {
1058             int s;
1059             try {
1060                 U = sun.misc.Unsafe.getUnsafe();
1061                 Class<?> k = WorkQueue.class;
1062                 Class<?> ak = ForkJoinTask[].class;
1063                 QLOCK = U.objectFieldOffset
1064                     (k.getDeclaredField("qlock"));
1065                 ABASE = U.arrayBaseOffset(ak);
1066                 s = U.arrayIndexScale(ak);



1067             } catch (Exception e) {
1068                 throw new Error(e);
1069             }
1070             if ((s & (s-1)) != 0)
1071                 throw new Error("data type scale not a power of two");
1072             ASHIFT = 31 - Integer.numberOfLeadingZeros(s);
1073         }
1074     }
1075 
1076     // static fields (initialized in static initializer below)
1077 
1078     /**
1079      * Creates a new ForkJoinWorkerThread. This factory is used unless
1080      * overridden in ForkJoinPool constructors.
1081      */
1082     public static final ForkJoinWorkerThreadFactory
1083         defaultForkJoinWorkerThreadFactory;
1084 
1085     /**
1086      * Per-thread submission bookkeeping. Shared across all pools
1087      * to reduce ThreadLocal pollution and because random motion
1088      * to avoid contention in one pool is likely to hold for others.
1089      * Lazily initialized on first submission (but null-checked
1090      * in other contexts to avoid unnecessary initialization).
1091      */
1092     static final ThreadLocal<Submitter> submitters;
1093 
1094     /**
1095      * Permission required for callers of methods that may start or
1096      * kill threads.
1097      */
1098     private static final RuntimePermission modifyThreadPermission;
1099 
1100     /**
1101      * Common (static) pool. Non-null for public use unless a static
1102      * construction exception, but internal usages null-check on use
1103      * to paranoically avoid potential initialization circularities
1104      * as well as to simplify generated code.
1105      */
1106     static final ForkJoinPool commonPool;
1107 
1108     /**
1109      * Common pool parallelism. Must equal commonPool.parallelism.



1110      */
1111     static final int commonPoolParallelism;
1112 
1113     /**
1114      * Sequence number for creating workerNamePrefix.
1115      */
1116     private static int poolNumberSequence;
1117 
1118     /**
1119      * Return the next sequence number. We don't expect this to
1120      * ever contend so use simple builtin sync.
1121      */
1122     private static final synchronized int nextPoolId() {
1123         return ++poolNumberSequence;
1124     }
1125 
1126     // static constants
1127 
1128     /**
1129      * Initial timeout value (in nanoseconds) for the thread
1130      * triggering quiescence to park waiting for new work. On timeout,
1131      * the thread will instead try to shrink the number of
1132      * workers. The value should be large enough to avoid overly
1133      * aggressive shrinkage during most transient stalls (long GCs
1134      * etc).
1135      */
1136     private static final long IDLE_TIMEOUT      = 2000L * 1000L * 1000L; // 2sec
1137 
1138     /**
1139      * Timeout value when there are more threads than parallelism level
1140      */


1251 
1252     // Instance fields
1253 
1254     /*
1255      * Field layout of this class tends to matter more than one would
1256      * like. Runtime layout order is only loosely related to
1257      * declaration order and may differ across JVMs, but the following
1258      * empirically works OK on current JVMs.
1259      */
1260 
1261     // Heuristic padding to ameliorate unfortunate memory placements
1262     volatile long pad00, pad01, pad02, pad03, pad04, pad05, pad06;
1263 
1264     volatile long stealCount;                  // collects worker counts
1265     volatile long ctl;                         // main pool control
1266     volatile int plock;                        // shutdown status and seqLock
1267     volatile int indexSeed;                    // worker/submitter index seed
1268     final int config;                          // mode and parallelism level
1269     WorkQueue[] workQueues;                    // main registry
1270     final ForkJoinWorkerThreadFactory factory;
1271     final Thread.UncaughtExceptionHandler ueh; // per-worker UEH
1272     final String workerNamePrefix;             // to create worker name string
1273 
1274     volatile Object pad10, pad11, pad12, pad13, pad14, pad15, pad16, pad17;
1275     volatile Object pad18, pad19, pad1a, pad1b;
1276 
1277     /*
1278      * Acquires the plock lock to protect worker array and related
1279      * updates. This method is called only if an initial CAS on plock
1280      * fails. This acts as a spinLock for normal cases, but falls back
1281      * to builtin monitor to block when (rarely) needed. This would be
1282      * a terrible idea for a highly contended lock, but works fine as
1283      * a more conservative alternative to a pure spinlock.
1284      */
1285     private int acquirePlock() {
1286         int spins = PL_SPINS, r = 0, ps, nps;
1287         for (;;) {
1288             if (((ps = plock) & PL_LOCK) == 0 &&
1289                 U.compareAndSwapInt(this, PLOCK, ps, nps = ps + PL_LOCK))
1290                 return nps;
1291             else if (r == 0) { // randomize spins if possible
1292                 Thread t = Thread.currentThread(); WorkQueue w; Submitter z;
1293                 if ((t instanceof ForkJoinWorkerThread) &&
1294                     (w = ((ForkJoinWorkerThread)t).workQueue) != null)
1295                     r = w.seed;
1296                 else if ((z = submitters.get()) != null)
1297                     r = z.seed;
1298                 else
1299                     r = 1;
1300             }
1301             else if (spins >= 0) {
1302                 r ^= r << 1; r ^= r >>> 3; r ^= r << 10; // xorshift
1303                 if (r >= 0)
1304                     --spins;
1305             }
1306             else if (U.compareAndSwapInt(this, PLOCK, ps, ps | PL_SIGNAL)) {
1307                 synchronized (this) {
1308                     if ((plock & PL_SIGNAL) != 0) {
1309                         try {
1310                             wait();
1311                         } catch (InterruptedException ie) {
1312                             try {
1313                                 Thread.currentThread().interrupt();
1314                             } catch (SecurityException ignore) {
1315                             }
1316                         }
1317                     }
1318                     else
1319                         notifyAll();
1320                 }
1321             }
1322         }
1323     }
1324 
1325     /**
1326      * Unlocks and signals any thread waiting for plock. Called only
1327      * when CAS of seq value for unlock fails.
1328      */
1329     private void releasePlock(int ps) {
1330         plock = ps;
1331         synchronized (this) { notifyAll(); }
1332     }
1333 
1334     /**
1335      * Performs secondary initialization, called when plock is zero.
1336      * Creates workQueue array and sets plock to a valid value.  The
1337      * lock body must be exception-free (so no try/finally) so we
1338      * optimistically allocate new array outside the lock and throw
1339      * away if (very rarely) not needed. (A similar tactic is used in
1340      * fullExternalPush.)  Because the plock seq value can eventually
1341      * wrap around zero, this method harmlessly fails to reinitialize
1342      * if workQueues exists, while still advancing plock.
1343      *
1344      * Additionally tries to create the first worker.
1345      */
1346     private void initWorkers() {
1347         WorkQueue[] ws, nws; int ps;
1348         int p = config & SMASK;        // find power of two table size
1349         int n = (p > 1) ? p - 1 : 1;   // ensure at least 2 slots
1350         n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16;
1351         n = (n + 1) << 1;
1352         if ((ws = workQueues) == null || ws.length == 0)
1353             nws = new WorkQueue[n];
1354         else
1355             nws = null;
1356         if (((ps = plock) & PL_LOCK) != 0 ||
1357             !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
1358             ps = acquirePlock();
1359         if (((ws = workQueues) == null || ws.length == 0) && nws != null)
1360             workQueues = nws;
1361         int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
1362         if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
1363             releasePlock(nps);
1364         tryAddWorker();
1365     }
1366 
1367     /**
1368      * Tries to create and start one worker if fewer than target
1369      * parallelism level exist. Adjusts counts etc on failure.
1370      */
1371     private void tryAddWorker() {
1372         long c; int u;
1373         while ((u = (int)((c = ctl) >>> 32)) < 0 &&
1374                (u & SHORT_SIGN) != 0 && (int)c == 0) {
1375             long nc = (long)(((u + UTC_UNIT) & UTC_MASK) |
1376                              ((u + UAC_UNIT) & UAC_MASK)) << 32;
1377             if (U.compareAndSwapLong(this, CTL, c, nc)) {
1378                 ForkJoinWorkerThreadFactory fac;
1379                 Throwable ex = null;
1380                 ForkJoinWorkerThread wt = null;
1381                 try {
1382                     if ((fac = factory) != null &&
1383                         (wt = fac.newThread(this)) != null) {
1384                         wt.start();
1385                         break;
1386                     }
1387                 } catch (Throwable e) {


1389                 }
1390                 deregisterWorker(wt, ex);
1391                 break;
1392             }
1393         }
1394     }
1395 
1396     //  Registering and deregistering workers
1397 
1398     /**
1399      * Callback from ForkJoinWorkerThread to establish and record its
1400      * WorkQueue. To avoid scanning bias due to packing entries in
1401      * front of the workQueues array, we treat the array as a simple
1402      * power-of-two hash table using per-thread seed as hash,
1403      * expanding as needed.
1404      *
1405      * @param wt the worker thread
1406      * @return the worker's queue
1407      */
1408     final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
1409         Thread.UncaughtExceptionHandler handler; WorkQueue[] ws; int s, ps;
1410         wt.setDaemon(true);
1411         if ((handler = ueh) != null)
1412             wt.setUncaughtExceptionHandler(handler);
1413         do {} while (!U.compareAndSwapInt(this, INDEXSEED, s = indexSeed,
1414                                           s += SEED_INCREMENT) ||
1415                      s == 0); // skip 0
1416         WorkQueue w = new WorkQueue(this, wt, config >>> 16, s);
1417         if (((ps = plock) & PL_LOCK) != 0 ||
1418             !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
1419             ps = acquirePlock();
1420         int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
1421         try {
1422             if ((ws = workQueues) != null) {    // skip if shutting down
1423                 int n = ws.length, m = n - 1;
1424                 int r = (s << 1) | 1;           // use odd-numbered indices
1425                 if (ws[r &= m] != null) {       // collision
1426                     int probes = 0;             // step by approx half size
1427                     int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
1428                     while (ws[r = (r + step) & m] != null) {
1429                         if (++probes >= n) {


1433                         }
1434                     }
1435                 }
1436                 w.eventCount = w.poolIndex = r; // volatile write orders
1437                 ws[r] = w;
1438             }
1439         } finally {
1440             if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
1441                 releasePlock(nps);
1442         }
1443         wt.setName(workerNamePrefix.concat(Integer.toString(w.poolIndex)));
1444         return w;
1445     }
1446 
1447     /**
1448      * Final callback from terminating worker, as well as upon failure
1449      * to construct or start a worker.  Removes record of worker from
1450      * array, and adjusts counts. If pool is shutting down, tries to
1451      * complete termination.
1452      *
1453      * @param wt the worker thread or null if construction failed
1454      * @param ex the exception causing failure, or null if none
1455      */
1456     final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
1457         WorkQueue w = null;
1458         if (wt != null && (w = wt.workQueue) != null) {
1459             int ps;
1460             w.qlock = -1;                // ensure set
1461             long ns = w.nsteals, sc;     // collect steal count
1462             do {} while (!U.compareAndSwapLong(this, STEALCOUNT,
1463                                                sc = stealCount, sc + ns));
1464             if (((ps = plock) & PL_LOCK) != 0 ||
1465                 !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
1466                 ps = acquirePlock();
1467             int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
1468             try {
1469                 int idx = w.poolIndex;
1470                 WorkQueue[] ws = workQueues;
1471                 if (ws != null && idx >= 0 && idx < ws.length && ws[idx] == w)
1472                     ws[idx] = null;
1473             } finally {
1474                 if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
1475                     releasePlock(nps);
1476             }
1477         }
1478 
1479         long c;                          // adjust ctl counts
1480         do {} while (!U.compareAndSwapLong
1481                      (this, CTL, c = ctl, (((c - AC_UNIT) & AC_MASK) |
1482                                            ((c - TC_UNIT) & TC_MASK) |
1483                                            (c & ~(AC_MASK|TC_MASK)))));
1484 
1485         if (!tryTerminate(false, false) && w != null && w.array != null) {
1486             w.cancelAll();               // cancel remaining tasks
1487             WorkQueue[] ws; WorkQueue v; Thread p; int u, i, e;
1488             while ((u = (int)((c = ctl) >>> 32)) < 0 && (e = (int)c) >= 0) {
1489                 if (e > 0) {             // activate or create replacement
1490                     if ((ws = workQueues) == null ||
1491                         (i = e & SMASK) >= ws.length ||
1492                         (v = ws[i]) != null)
1493                         break;
1494                     long nc = (((long)(v.nextWait & E_MASK)) |
1495                                ((long)(u + UAC_UNIT) << 32));
1496                     if (v.eventCount != (e | INT_SIGN))
1497                         break;
1498                     if (U.compareAndSwapLong(this, CTL, c, nc)) {
1499                         v.eventCount = (e + E_SEQ) & E_MASK;
1500                         if ((p = v.parker) != null)
1501                             U.unpark(p);
1502                         break;
1503                     }
1504                 }
1505                 else {
1506                     if ((short)u < 0)
1507                         tryAddWorker();
1508                     break;
1509                 }
1510             }
1511         }
1512         if (ex == null)                     // help clean refs on way out
1513             ForkJoinTask.helpExpungeStaleExceptions();
1514         else                                // rethrow
1515             ForkJoinTask.rethrow(ex);
1516     }
1517 
1518     // Submissions
1519 
1520     /**
1521      * Unless shutting down, adds the given task to a submission queue
1522      * at submitter's current queue index (modulo submission
1523      * range). Only the most common path is directly handled in this
1524      * method. All others are relayed to fullExternalPush.
1525      *
1526      * @param task the task. Caller must ensure non-null.
1527      */
1528     final void externalPush(ForkJoinTask<?> task) {
1529         WorkQueue[] ws; WorkQueue q; Submitter z; int m; ForkJoinTask<?>[] a;
1530         if ((z = submitters.get()) != null && plock > 0 &&
1531             (ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
1532             (q = ws[m & z.seed & SQMASK]) != null &&
1533             U.compareAndSwapInt(q, QLOCK, 0, 1)) { // lock
1534             int b = q.base, s = q.top, n, an;
1535             if ((a = q.array) != null && (an = a.length) > (n = s + 1 - b)) {
1536                 int j = (((an - 1) & s) << ASHIFT) + ABASE;
1537                 U.putOrderedObject(a, j, task);
1538                 q.top = s + 1;                     // push on to deque
1539                 q.qlock = 0;
1540                 if (n <= 2)
1541                     signalWork(q);
1542                 return;
1543             }
1544             q.qlock = 0;
1545         }
1546         fullExternalPush(task);
1547     }
1548 
1549     /**
1550      * Full version of externalPush. This method is called, among
1551      * other times, upon the first submission of the first task to the
1552      * pool, so must perform secondary initialization (via
1553      * initWorkers). It also detects first submission by an external
1554      * thread by looking up its ThreadLocal, and creates a new shared
1555      * queue if the one at index if empty or contended. The plock lock
1556      * body must be exception-free (so no try/finally) so we
1557      * optimistically allocate new queues outside the lock and throw
1558      * them away if (very rarely) not needed.






1559      */
1560     private void fullExternalPush(ForkJoinTask<?> task) {
1561         int r = 0; // random index seed
1562         for (Submitter z = submitters.get();;) {
1563             WorkQueue[] ws; WorkQueue q; int ps, m, k;
1564             if (z == null) {
1565                 if (U.compareAndSwapInt(this, INDEXSEED, r = indexSeed,
1566                                         r += SEED_INCREMENT) && r != 0)
1567                     submitters.set(z = new Submitter(r));
1568             }
1569             else if (r == 0) {               // move to a different index
1570                 r = z.seed;
1571                 r ^= r << 13;                // same xorshift as WorkQueues
1572                 r ^= r >>> 17;
1573                 z.seed = r ^ (r << 5);
1574             }
1575             else if ((ps = plock) < 0)
1576                 throw new RejectedExecutionException();
1577             else if (ps == 0 || (ws = workQueues) == null ||
1578                      (m = ws.length - 1) < 0)
1579                 initWorkers();














1580             else if ((q = ws[k = r & m & SQMASK]) != null) {
1581                 if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
1582                     ForkJoinTask<?>[] a = q.array;
1583                     int s = q.top;
1584                     boolean submitted = false;
1585                     try {                      // locked version of push
1586                         if ((a != null && a.length > s + 1 - q.base) ||
1587                             (a = q.growArray()) != null) {   // must presize
1588                             int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
1589                             U.putOrderedObject(a, j, task);
1590                             q.top = s + 1;
1591                             submitted = true;
1592                         }
1593                     } finally {
1594                         q.qlock = 0;  // unlock
1595                     }
1596                     if (submitted) {
1597                         signalWork(q);
1598                         return;
1599                     }
1600                 }
1601                 r = 0; // move on failure
1602             }
1603             else if (((ps = plock) & PL_LOCK) == 0) { // create new queue
1604                 q = new WorkQueue(this, null, SHARED_QUEUE, r);
1605                 if (((ps = plock) & PL_LOCK) != 0 ||
1606                     !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
1607                     ps = acquirePlock();
1608                 if ((ws = workQueues) != null && k < ws.length && ws[k] == null)
1609                     ws[k] = q;
1610                 int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
1611                 if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
1612                     releasePlock(nps);
1613             }
1614             else
1615                 r = 0; // try elsewhere while lock held


1616         }
1617     }
1618 
1619     // Maintaining ctl counts
1620 
1621     /**
1622      * Increments active count; mainly called upon return from blocking.
1623      */
1624     final void incrementActiveCount() {
1625         long c;
1626         do {} while (!U.compareAndSwapLong(this, CTL, c = ctl, c + AC_UNIT));
1627     }
1628 
1629     /**
1630      * Tries to create or activate a worker if too few are active.
1631      *
1632      * @param q the (non-null) queue holding tasks to be signalled
1633      */
1634     final void signalWork(WorkQueue q) {
1635         int hint = q.poolIndex;


1686      * completing the sweep. If the worker is not inactivated, it
1687      * takes and returns a task from this queue. Otherwise, if not
1688      * activated, it signals workers (that may include itself) and
1689      * returns so caller can retry. Also returns for true if the
1690      * worker array may have changed during an empty scan.  On failure
1691      * to find a task, we take one of the following actions, after
1692      * which the caller will retry calling this method unless
1693      * terminated.
1694      *
1695      * * If pool is terminating, terminate the worker.
1696      *
1697      * * If not already enqueued, try to inactivate and enqueue the
1698      * worker on wait queue. Or, if inactivating has caused the pool
1699      * to be quiescent, relay to idleAwaitWork to possibly shrink
1700      * pool.
1701      *
1702      * * If already enqueued and none of the above apply, possibly
1703      * park awaiting signal, else lingering to help scan and signal.
1704      *
1705      * * If a non-empty queue discovered or left as a hint,
1706      * help wake up other workers before return
1707      *
1708      * @param w the worker (via its WorkQueue)
1709      * @return a task or null if none found
1710      */
1711     private final ForkJoinTask<?> scan(WorkQueue w) {
1712         WorkQueue[] ws; int m;
1713         int ps = plock;                          // read plock before ws
1714         if (w != null && (ws = workQueues) != null && (m = ws.length - 1) >= 0) {
1715             int ec = w.eventCount;               // ec is negative if inactive
1716             int r = w.seed; r ^= r << 13; r ^= r >>> 17; w.seed = r ^= r << 5;
1717             w.hint = -1;                         // update seed and clear hint
1718             int j = ((m + m + 1) | MIN_SCAN) & MAX_SCAN;
1719             do {
1720                 WorkQueue q; ForkJoinTask<?>[] a; int b;
1721                 if ((q = ws[(r + j) & m]) != null && (b = q.base) - q.top < 0 &&
1722                     (a = q.array) != null) {     // probably nonempty
1723                     int i = (((a.length - 1) & b) << ASHIFT) + ABASE;
1724                     ForkJoinTask<?> t = (ForkJoinTask<?>)
1725                         U.getObjectVolatile(a, i);
1726                     if (q.base == b && ec >= 0 && t != null &&


1741                 if (U.compareAndSwapLong(this, STEALCOUNT,
1742                                          sc = stealCount, sc + ns))
1743                     w.nsteals = 0;               // collect steals and rescan
1744             }
1745             else if (plock != ps)                // consistency check
1746                 ;                                // skip
1747             else if ((e = (int)(c = ctl)) < 0)
1748                 w.qlock = -1;                    // pool is terminating
1749             else {
1750                 if ((h = w.hint) < 0) {
1751                     if (ec >= 0) {               // try to enqueue/inactivate
1752                         long nc = (((long)ec |
1753                                     ((c - AC_UNIT) & (AC_MASK|TC_MASK))));
1754                         w.nextWait = e;          // link and mark inactive
1755                         w.eventCount = ec | INT_SIGN;
1756                         if (ctl != c || !U.compareAndSwapLong(this, CTL, c, nc))
1757                             w.eventCount = ec;   // unmark on CAS failure
1758                         else if ((int)(c >> AC_SHIFT) == 1 - (config & SMASK))
1759                             idleAwaitWork(w, nc, c);
1760                     }
1761                     else if (w.eventCount < 0 && !tryTerminate(false, false) &&
1762                              ctl == c) {         // block
1763                         Thread wt = Thread.currentThread();
1764                         Thread.interrupted();    // clear status
1765                         U.putObject(wt, PARKBLOCKER, this);
1766                         w.parker = wt;           // emulate LockSupport.park
1767                         if (w.eventCount < 0)    // recheck
1768                             U.park(false, 0L);
1769                         w.parker = null;
1770                         U.putObject(wt, PARKBLOCKER, null);
1771                     }
1772                 }
1773                 if ((h >= 0 || (h = w.hint) >= 0) &&
1774                     (ws = workQueues) != null && h < ws.length &&
1775                     (q = ws[h]) != null) {      // signal others before retry
1776                     WorkQueue v; Thread p; int u, i, s;
1777                     for (int n = (config & SMASK) >>> 1;;) {
1778                         int idleCount = (w.eventCount < 0) ? 0 : -1;
1779                         if (((s = idleCount - q.base + q.top) <= n &&
1780                              (n = s) <= 0) ||
1781                             (u = (int)((c = ctl) >>> 32)) >= 0 ||
1782                             (e = (int)c) <= 0 || m < (i = e & SMASK) ||
1783                             (v = ws[i]) == null)
1784                             break;
1785                         long nc = (((long)(v.nextWait & E_MASK)) |
1786                                    ((long)(u + UAC_UNIT) << 32));
1787                         if (v.eventCount != (e | INT_SIGN) ||
1788                             !U.compareAndSwapLong(this, CTL, c, nc))
1789                             break;
1790                         v.hint = h;
1791                         v.eventCount = (e + E_SEQ) & E_MASK;
1792                         if ((p = v.parker) != null)
1793                             U.unpark(p);
1794                         if (--n <= 0)
1795                             break;
1796                     }
1797                 }
1798             }
1799         }
1800         return null;
1801     }
1802 
1803     /**
1804      * If inactivating worker w has caused the pool to become
1805      * quiescent, checks for pool termination, and, so long as this is
1806      * not the only worker, waits for event for up to a given
1807      * duration.  On timeout, if ctl has not changed, terminates the
1808      * worker, which will in turn wake up another worker to possibly
1809      * repeat this process.
1810      *
1811      * @param w the calling worker
1812      * @param currentCtl the ctl value triggering possible quiescence
1813      * @param prevCtl the ctl value to restore if thread is terminated
1814      */
1815     private void idleAwaitWork(WorkQueue w, long currentCtl, long prevCtl) {
1816         if (w != null && w.eventCount < 0 &&
1817             !tryTerminate(false, false) && (int)prevCtl != 0) {

1818             int dc = -(short)(currentCtl >>> TC_SHIFT);
1819             long parkTime = dc < 0 ? FAST_IDLE_TIMEOUT: (dc + 1) * IDLE_TIMEOUT;
1820             long deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
1821             Thread wt = Thread.currentThread();
1822             while (ctl == currentCtl) {
1823                 Thread.interrupted();  // timed variant of version in scan()
1824                 U.putObject(wt, PARKBLOCKER, this);
1825                 w.parker = wt;
1826                 if (ctl == currentCtl)
1827                     U.park(false, parkTime);
1828                 w.parker = null;
1829                 U.putObject(wt, PARKBLOCKER, null);
1830                 if (ctl != currentCtl)
1831                     break;
1832                 if (deadline - System.nanoTime() <= 0L &&
1833                     U.compareAndSwapLong(this, CTL, currentCtl, prevCtl)) {
1834                     w.eventCount = (w.eventCount + E_SEQ) | E_MASK;

1835                     w.qlock = -1;   // shrink
1836                     break;
1837                 }
1838             }
1839         }
1840     }
1841 
1842     /**
1843      * Scans through queues looking for work while joining a task; if
1844      * any present, signals. May return early if more signalling is
1845      * detectably unneeded.
1846      *
1847      * @param task return early if done
1848      * @param origin an index to start scan
1849      */
1850     private void helpSignal(ForkJoinTask<?> task, int origin) {
1851         WorkQueue[] ws; WorkQueue w; Thread p; long c; int m, u, e, i, s;
1852         if (task != null && task.status >= 0 &&
1853             (u = (int)(ctl >>> 32)) < 0 && (u >> UAC_SHIFT) < 0 &&
1854             (ws = workQueues) != null && (m = ws.length - 1) >= 0) {


1956                             else {
1957                                 subtask = next;
1958                                 j = v;
1959                                 break;
1960                             }
1961                         }
1962                     }
1963                 }
1964             }
1965         }
1966         return stat;
1967     }
1968 
1969     /**
1970      * Analog of tryHelpStealer for CountedCompleters. Tries to steal
1971      * and run tasks within the target's computation.
1972      *
1973      * @param task the task to join
1974      * @param mode if shared, exit upon completing any task
1975      * if all workers are active
1976      *
1977      */
1978     private int helpComplete(ForkJoinTask<?> task, int mode) {
1979         WorkQueue[] ws; WorkQueue q; int m, n, s, u;
1980         if (task != null && (ws = workQueues) != null &&
1981             (m = ws.length - 1) >= 0) {
1982             for (int j = 1, origin = j;;) {
1983                 if ((s = task.status) < 0)
1984                     return s;
1985                 if ((q = ws[j & m]) != null && q.pollAndExecCC(task)) {
1986                     origin = j;
1987                     if (mode == SHARED_QUEUE &&
1988                         ((u = (int)(ctl >>> 32)) >= 0 || (u >> UAC_SHIFT) >= 0))
1989                         break;
1990                 }
1991                 else if ((j = (j + 2) & m) == origin)
1992                     break;
1993             }
1994         }
1995         return 0;
1996     }


2108             ForkJoinTask<?> prevJoin = joiner.currentJoin;
2109             joiner.currentJoin = task;
2110             do {} while ((s = task.status) >= 0 && !joiner.isEmpty() &&
2111                          joiner.tryRemoveAndExec(task));
2112             if (s >= 0 && (s = task.status) >= 0) {
2113                 helpSignal(task, joiner.poolIndex);
2114                 if ((s = task.status) >= 0 &&
2115                     (task instanceof CountedCompleter))
2116                     s = helpComplete(task, LIFO_QUEUE);
2117             }
2118             if (s >= 0 && joiner.isEmpty()) {
2119                 do {} while (task.status >= 0 &&
2120                              tryHelpStealer(joiner, task) > 0);
2121             }
2122             joiner.currentJoin = prevJoin;
2123         }
2124     }
2125 
2126     /**
2127      * Returns a (probably) non-empty steal queue, if one is found
2128      * during a random, then cyclic scan, else null.  This method must
2129      * be retried by caller if, by the time it tries to use the queue,
2130      * it is empty.
2131      * @param r a (random) seed for scanning
2132      */
2133     private WorkQueue findNonEmptyStealQueue(int r) {
2134         for (WorkQueue[] ws;;) {
2135             int ps = plock, m, n;
2136             if ((ws = workQueues) == null || (m = ws.length - 1) < 1)
2137                 return null;
2138             for (int j = (m + 1) << 2; ;) {
2139                 WorkQueue q = ws[(((r + j) << 1) | 1) & m];
2140                 if (q != null && (n = q.base - q.top) < 0) {
2141                     if (n < -1)
2142                         signalWork(q);
2143                     return q;
2144                 }
2145                 else if (--j < 0) {
2146                     if (plock == ps)
2147                         return null;
2148                     break;
2149                 }
2150             }
2151         }
2152     }
2153 
2154     /**
2155      * Runs tasks until {@code isQuiescent()}. We piggyback on
2156      * active count ctl maintenance, but rather than blocking
2157      * when tasks cannot be found, we rescan until all others cannot
2158      * find tasks either.
2159      */
2160     final void helpQuiescePool(WorkQueue w) {
2161         for (boolean active = true;;) {
2162             ForkJoinTask<?> localTask; // exhaust local queue
2163             while ((localTask = w.nextLocalTask()) != null)
2164                 localTask.doExec();
2165             // Similar to loop in scan(), but ignoring submissions
2166             WorkQueue q = findNonEmptyStealQueue(w.nextSeed());
2167             if (q != null) {
2168                 ForkJoinTask<?> t; int b;
2169                 if (!active) {      // re-establish active count
2170                     long c;
2171                     active = true;
2172                     do {} while (!U.compareAndSwapLong
2173                                  (this, CTL, c = ctl, c + AC_UNIT));
2174                 }
2175                 if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null)


2176                     w.runSubtask(t);
2177             }
2178             else {
2179                 long c;
2180                 if (active) {       // decrement active count without queuing



2181                     active = false;
2182                     do {} while (!U.compareAndSwapLong
2183                                  (this, CTL, c = ctl, c -= AC_UNIT));
2184                 }
2185                 else
2186                     c = ctl;        // re-increment on exit
2187                 if ((int)(c >> AC_SHIFT) + (config & SMASK) == 0) {
2188                     do {} while (!U.compareAndSwapLong
2189                                  (this, CTL, c = ctl, c + AC_UNIT));
2190                     break;
2191                 }
2192             }
2193         }
2194     }
2195 
2196     /**
2197      * Gets and removes a local or stolen task for the given worker.
2198      *
2199      * @return a task, if available
2200      */
2201     final ForkJoinTask<?> nextTaskFor(WorkQueue w) {
2202         for (ForkJoinTask<?> t;;) {
2203             WorkQueue q; int b;
2204             if ((t = w.nextLocalTask()) != null)
2205                 return t;
2206             if ((q = findNonEmptyStealQueue(w.nextSeed())) == null)
2207                 return null;
2208             if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null)


2209                 return t;
2210         }
2211     }

2212 
2213     /**
2214      * Returns a cheap heuristic guide for task partitioning when
2215      * programmers, frameworks, tools, or languages have little or no
2216      * idea about task granularity.  In essence by offering this
2217      * method, we ask users only about tradeoffs in overhead vs
2218      * expected throughput and its variance, rather than how finely to
2219      * partition tasks.
2220      *
2221      * In a steady state strict (tree-structured) computation, each
2222      * thread makes available for stealing enough tasks for other
2223      * threads to remain active. Inductively, if all threads play by
2224      * the same rules, each thread should make available only a
2225      * constant number of tasks.
2226      *
2227      * The minimum useful constant is just 1. But using a value of 1
2228      * would require immediate replenishment upon each steal to
2229      * maintain enough tasks, which is infeasible.  Further,
2230      * partitionings/granularities of offered tasks should minimize
2231      * steal rates, which in general means that threads nearer the top
2232      * of computation tree should generate more than those nearer the
2233      * bottom. In perfect steady state, each thread is at
2234      * approximately the same level of computation tree. However,
2235      * producing extra tasks amortizes the uncertainty of progress and
2236      * diffusion assumptions.
2237      *
2238      * So, users will want to use values larger, but not much larger
2239      * than 1 to both smooth over transient shortages and hedge
2240      * against uneven progress; as traded off against the cost of
2241      * extra task overhead. We leave the user to pick a threshold
2242      * value to compare with the results of this call to guide
2243      * decisions, but recommend values such as 3.
2244      *
2245      * When all threads are active, it is on average OK to estimate
2246      * surplus strictly locally. In steady-state, if one thread is
2247      * maintaining say 2 surplus tasks, then so are others. So we can
2248      * just use estimated queue length.  However, this strategy alone
2249      * leads to serious mis-estimates in some non-steady-state
2250      * conditions (ramp-up, ramp-down, other stalls). We can detect
2251      * many of these by further considering the number of "idle"
2252      * threads, that are known to have zero queued tasks, so
2253      * compensate by a factor of (#idle/#active) threads.
2254      *
2255      * Note: The approximation of #busy workers as #active workers is
2256      * not very good under current signalling scheme, and should be
2257      * improved.
2258      */


2271         return 0;
2272     }
2273 
2274     //  Termination
2275 
2276     /**
2277      * Possibly initiates and/or completes termination.  The caller
2278      * triggering termination runs three passes through workQueues:
2279      * (0) Setting termination status, followed by wakeups of queued
2280      * workers; (1) cancelling all tasks; (2) interrupting lagging
2281      * threads (likely in external tasks, but possibly also blocked in
2282      * joins).  Each pass repeats previous steps because of potential
2283      * lagging thread creation.
2284      *
2285      * @param now if true, unconditionally terminate, else only
2286      * if no work and no active workers
2287      * @param enable if true, enable shutdown when next possible
2288      * @return true if now terminating or terminated
2289      */
2290     private boolean tryTerminate(boolean now, boolean enable) {
2291         if (this == commonPool)                     // cannot shut down

2292             return false;










2293         for (long c;;) {
2294             if (((c = ctl) & STOP_BIT) != 0) {      // already terminating
2295                 if ((short)(c >>> TC_SHIFT) == -(config & SMASK)) {
2296                     synchronized (this) {
2297                         notifyAll();                // signal when 0 workers
2298                     }
2299                 }
2300                 return true;
2301             }
2302             if (plock >= 0) {                       // not yet enabled
2303                 int ps;
2304                 if (!enable)
2305                     return false;
2306                 if (((ps = plock) & PL_LOCK) != 0 ||
2307                     !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
2308                     ps = acquirePlock();
2309                 if (!U.compareAndSwapInt(this, PLOCK, ps, SHUTDOWN))
2310                     releasePlock(SHUTDOWN);
2311             }
2312             if (!now) {                             // check if idle & no tasks
2313                 if ((int)(c >> AC_SHIFT) != -(config & SMASK) ||
2314                     hasQueuedSubmissions())
2315                     return false;
2316                 // Check for unqueued inactive workers. One pass suffices.
2317                 WorkQueue[] ws = workQueues; WorkQueue w;
2318                 if (ws != null) {
2319                     for (int i = 1; i < ws.length; i += 2) {
2320                         if ((w = ws[i]) != null && w.eventCount >= 0)
2321                             return false;
2322                     }


2323                 }
2324             }


2325             if (U.compareAndSwapLong(this, CTL, c, c | STOP_BIT)) {
2326                 for (int pass = 0; pass < 3; ++pass) {
2327                     WorkQueue[] ws = workQueues;
2328                     if (ws != null) {
2329                         WorkQueue w; Thread wt;
2330                         int n = ws.length;
2331                         for (int i = 0; i < n; ++i) {
2332                             if ((w = ws[i]) != null) {
2333                                 w.qlock = -1;
2334                                 if (pass > 0) {
2335                                     w.cancelAll();
2336                                     if (pass > 1 && (wt = w.owner) != null) {
2337                                         if (!wt.isInterrupted()) {
2338                                             try {
2339                                                 wt.interrupt();
2340                                             } catch (SecurityException ignore) {
2341                                             }
2342                                         }
2343                                         U.unpark(wt);
2344                                     }
2345                                 }
2346                             }
2347                         }
2348                         // Wake up workers parked on event queue
2349                         int i, e; long cc; Thread p;
2350                         while ((e = (int)(cc = ctl) & E_MASK) != 0 &&
2351                                (i = e & SMASK) < n &&
2352                                (w = ws[i]) != null) {
2353                             long nc = ((long)(w.nextWait & E_MASK) |
2354                                        ((cc + AC_UNIT) & AC_MASK) |
2355                                        (cc & (TC_MASK|STOP_BIT)));
2356                             if (w.eventCount == (e | INT_SIGN) &&
2357                                 U.compareAndSwapLong(this, CTL, cc, nc)) {
2358                                 w.eventCount = (e + E_SEQ) & E_MASK;
2359                                 w.qlock = -1;
2360                                 if ((p = w.parker) != null)
2361                                     U.unpark(p);
2362                             }
2363                         }
2364                     }
2365                 }
2366             }
2367         }
2368     }
2369 
2370     // external operations on common pool
2371 
2372     /**
2373      * Returns common pool queue for a thread that has submitted at
2374      * least one task.
2375      */
2376     static WorkQueue commonSubmitterQueue() {
2377         ForkJoinPool p; WorkQueue[] ws; int m; Submitter z;
2378         return ((z = submitters.get()) != null &&
2379                 (p = commonPool) != null &&
2380                 (ws = p.workQueues) != null &&
2381                 (m = ws.length - 1) >= 0) ?
2382             ws[m & z.seed & SQMASK] : null;
2383     }
2384 
2385     /**
2386      * Tries to pop the given task from submitter's queue in common pool.
2387      */
2388     static boolean tryExternalUnpush(ForkJoinTask<?> t) {
2389         ForkJoinPool p; WorkQueue[] ws; WorkQueue q; Submitter z;
2390         ForkJoinTask<?>[] a;  int m, s;
2391         if (t != null &&
2392             (z = submitters.get()) != null &&
2393             (p = commonPool) != null &&
2394             (ws = p.workQueues) != null &&
2395             (m = ws.length - 1) >= 0 &&
2396             (q = ws[m & z.seed & SQMASK]) != null &&
2397             (s = q.top) != q.base &&
2398             (a = q.array) != null) {
2399             long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE;
2400             if (U.getObject(a, j) == t &&
2401                 U.compareAndSwapInt(q, QLOCK, 0, 1)) {
2402                 if (q.array == a && q.top == s && // recheck
2403                     U.compareAndSwapObject(a, j, t, null)) {
2404                     q.top = s - 1;
2405                     q.qlock = 0;
2406                     return true;
2407                 }
2408                 q.qlock = 0;
2409             }
2410         }
2411         return false;
2412     }
2413 
2414     /**
2415      * Tries to pop and run local tasks within the same computation
2416      * as the given root. On failure, tries to help complete from


2428                         (o instanceof CountedCompleter)) {
2429                         CountedCompleter<?> t = (CountedCompleter<?>)o, r = t;
2430                         do {
2431                             if (r == root) {
2432                                 if (U.compareAndSwapInt(q, QLOCK, 0, 1)) {
2433                                     if (q.array == a && q.top == s &&
2434                                         U.compareAndSwapObject(a, j, t, null)) {
2435                                         q.top = s - 1;
2436                                         task = t;
2437                                     }
2438                                     q.qlock = 0;
2439                                 }
2440                                 break;
2441                             }
2442                         } while ((r = r.completer) != null);
2443                     }
2444                 }
2445                 if (task != null)
2446                     task.doExec();
2447                 if (root.status < 0 ||
2448                     (u = (int)(ctl >>> 32)) >= 0 || (u >> UAC_SHIFT) >= 0)

2449                     break;
2450                 if (task == null) {
2451                     helpSignal(root, q.poolIndex);
2452                     if (root.status >= 0)
2453                         helpComplete(root, SHARED_QUEUE);
2454                     break;
2455                 }
2456             }
2457         }
2458     }
2459 
2460     /**
2461      * Tries to help execute or signal availability of the given task
2462      * from submitter's queue in common pool.
2463      */
2464     static void externalHelpJoin(ForkJoinTask<?> t) {
2465         // Some hard-to-avoid overlap with tryExternalUnpush
2466         ForkJoinPool p; WorkQueue[] ws; WorkQueue q, w; Submitter z;
2467         ForkJoinTask<?>[] a;  int m, s, n;
2468         if (t != null &&
2469             (z = submitters.get()) != null &&
2470             (p = commonPool) != null &&
2471             (ws = p.workQueues) != null &&
2472             (m = ws.length - 1) >= 0 &&
2473             (q = ws[m & z.seed & SQMASK]) != null &&
2474             (a = q.array) != null) {
2475             int am = a.length - 1;
2476             if ((s = q.top) != q.base) {
2477                 long j = ((am & (s - 1)) << ASHIFT) + ABASE;
2478                 if (U.getObject(a, j) == t &&
2479                     U.compareAndSwapInt(q, QLOCK, 0, 1)) {
2480                     if (q.array == a && q.top == s &&
2481                         U.compareAndSwapObject(a, j, t, null)) {
2482                         q.top = s - 1;
2483                         q.qlock = 0;
2484                         t.doExec();
2485                     }
2486                     else
2487                         q.qlock = 0;
2488                 }
2489             }
2490             if (t.status >= 0) {
2491                 if (t instanceof CountedCompleter)
2492                     p.externalHelpComplete(q, t);
2493                 else
2494                     p.helpSignal(t, q.poolIndex);
2495             }
2496         }
2497     }
2498 
2499     /**
2500      * Restricted version of helpQuiescePool for external callers
2501      */
2502     static void externalHelpQuiescePool() {
2503         ForkJoinPool p; ForkJoinTask<?> t; WorkQueue q; int b;
2504         if ((p = commonPool) != null &&
2505             (q = p.findNonEmptyStealQueue(1)) != null &&
2506             (b = q.base) - q.top < 0 &&
2507             (t = q.pollAt(b)) != null)
2508             t.doExec();
2509     }
2510 
2511     // Exported methods
2512 
2513     // Constructors
2514 
2515     /**
2516      * Creates a {@code ForkJoinPool} with parallelism equal to {@link
2517      * java.lang.Runtime#availableProcessors}, using the {@linkplain
2518      * #defaultForkJoinWorkerThreadFactory default thread factory},
2519      * no UncaughtExceptionHandler, and non-async LIFO processing mode.
2520      *
2521      * @throws SecurityException if a security manager exists and
2522      *         the caller is not permitted to modify threads
2523      *         because it does not hold {@link
2524      *         java.lang.RuntimePermission}{@code ("modifyThread")}
2525      */
2526     public ForkJoinPool() {
2527         this(Runtime.getRuntime().availableProcessors(),
2528              defaultForkJoinWorkerThreadFactory, null, false);
2529     }
2530 
2531     /**
2532      * Creates a {@code ForkJoinPool} with the indicated parallelism
2533      * level, the {@linkplain
2534      * #defaultForkJoinWorkerThreadFactory default thread factory},
2535      * no UncaughtExceptionHandler, and non-async LIFO processing mode.
2536      *
2537      * @param parallelism the parallelism level
2538      * @throws IllegalArgumentException if parallelism less than or
2539      *         equal to zero, or greater than implementation limit
2540      * @throws SecurityException if a security manager exists and
2541      *         the caller is not permitted to modify threads
2542      *         because it does not hold {@link
2543      *         java.lang.RuntimePermission}{@code ("modifyThread")}
2544      */
2545     public ForkJoinPool(int parallelism) {
2546         this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
2547     }


2555      * use {@link #defaultForkJoinWorkerThreadFactory}.
2556      * @param handler the handler for internal worker threads that
2557      * terminate due to unrecoverable errors encountered while executing
2558      * tasks. For default value, use {@code null}.
2559      * @param asyncMode if true,
2560      * establishes local first-in-first-out scheduling mode for forked
2561      * tasks that are never joined. This mode may be more appropriate
2562      * than default locally stack-based mode in applications in which
2563      * worker threads only process event-style asynchronous tasks.
2564      * For default value, use {@code false}.
2565      * @throws IllegalArgumentException if parallelism less than or
2566      *         equal to zero, or greater than implementation limit
2567      * @throws NullPointerException if the factory is null
2568      * @throws SecurityException if a security manager exists and
2569      *         the caller is not permitted to modify threads
2570      *         because it does not hold {@link
2571      *         java.lang.RuntimePermission}{@code ("modifyThread")}
2572      */
2573     public ForkJoinPool(int parallelism,
2574                         ForkJoinWorkerThreadFactory factory,
2575                         Thread.UncaughtExceptionHandler handler,
2576                         boolean asyncMode) {





2577         checkPermission();
2578         if (factory == null)
2579             throw new NullPointerException();

2580         if (parallelism <= 0 || parallelism > MAX_CAP)
2581             throw new IllegalArgumentException();
2582         this.factory = factory;
2583         this.ueh = handler;
2584         this.config = parallelism | (asyncMode ? (FIFO_QUEUE << 16) : 0);
2585         long np = (long)(-parallelism); // offset ctl counts
2586         this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
2587         int pn = nextPoolId();
2588         StringBuilder sb = new StringBuilder("ForkJoinPool-");
2589         sb.append(Integer.toString(pn));
2590         sb.append("-worker-");
2591         this.workerNamePrefix = sb.toString();
2592     }
2593 







2594     /**
2595      * Constructor for common pool, suitable only for static initialization.
2596      * Basically the same as above, but uses smallest possible initial footprint.

2597      */
2598     ForkJoinPool(int parallelism, long ctl,
2599                  ForkJoinWorkerThreadFactory factory,
2600                  Thread.UncaughtExceptionHandler handler) {
2601         this.config = parallelism;
2602         this.ctl = ctl;

2603         this.factory = factory;
2604         this.ueh = handler;
2605         this.workerNamePrefix = "ForkJoinPool.commonPool-worker-";


2606     }
2607 
2608     /**
2609      * Returns the common pool instance. This pool is statically
2610      * constructed; its run state is unaffected by attempts to
2611      * {@link #shutdown} or {@link #shutdownNow}.





2612      *
2613      * @return the common pool instance
2614      * @since 1.8
2615      */
2616     public static ForkJoinPool commonPool() {
2617         // assert commonPool != null : "static init error";
2618         return commonPool;
2619     }
2620 
2621     // Execution methods
2622 
2623     /**
2624      * Performs the given task, returning its result upon completion.
2625      * If the computation encounters an unchecked Exception or Error,
2626      * it is rethrown as the outcome of this invocation.  Rethrown
2627      * exceptions behave in the same way as regular exceptions, but,
2628      * when possible, contain stack traces (as displayed for example
2629      * using {@code ex.printStackTrace()}) of both the current thread
2630      * as well as the thread actually encountering the exception;
2631      * minimally only the latter.
2632      *
2633      * @param task the task
2634      * @return the task's result
2635      * @throws NullPointerException if the task is null
2636      * @throws RejectedExecutionException if the task cannot be
2637      *         scheduled for execution
2638      */


2654     public void execute(ForkJoinTask<?> task) {
2655         if (task == null)
2656             throw new NullPointerException();
2657         externalPush(task);
2658     }
2659 
2660     // AbstractExecutorService methods
2661 
2662     /**
2663      * @throws NullPointerException if the task is null
2664      * @throws RejectedExecutionException if the task cannot be
2665      *         scheduled for execution
2666      */
2667     public void execute(Runnable task) {
2668         if (task == null)
2669             throw new NullPointerException();
2670         ForkJoinTask<?> job;
2671         if (task instanceof ForkJoinTask<?>) // avoid re-wrap
2672             job = (ForkJoinTask<?>) task;
2673         else
2674             job = new ForkJoinTask.AdaptedRunnableAction(task);
2675         externalPush(job);
2676     }
2677 
2678     /**
2679      * Submits a ForkJoinTask for execution.
2680      *
2681      * @param task the task to submit
2682      * @return the task
2683      * @throws NullPointerException if the task is null
2684      * @throws RejectedExecutionException if the task cannot be
2685      *         scheduled for execution
2686      */
2687     public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
2688         if (task == null)
2689             throw new NullPointerException();
2690         externalPush(task);
2691         return task;
2692     }
2693 
2694     /**


2721     public ForkJoinTask<?> submit(Runnable task) {
2722         if (task == null)
2723             throw new NullPointerException();
2724         ForkJoinTask<?> job;
2725         if (task instanceof ForkJoinTask<?>) // avoid re-wrap
2726             job = (ForkJoinTask<?>) task;
2727         else
2728             job = new ForkJoinTask.AdaptedRunnableAction(task);
2729         externalPush(job);
2730         return job;
2731     }
2732 
2733     /**
2734      * @throws NullPointerException       {@inheritDoc}
2735      * @throws RejectedExecutionException {@inheritDoc}
2736      */
2737     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
2738         // In previous versions of this class, this method constructed
2739         // a task to run ForkJoinTask.invokeAll, but now external
2740         // invocation of multiple tasks is at least as efficient.
2741         List<ForkJoinTask<T>> fs = new ArrayList<ForkJoinTask<T>>(tasks.size());
2742         // Workaround needed because method wasn't declared with
2743         // wildcards in return type but should have been.
2744         @SuppressWarnings({"unchecked", "rawtypes"})
2745             List<Future<T>> futures = (List<Future<T>>) (List) fs;
2746 
2747         boolean done = false;
2748         try {
2749             for (Callable<T> t : tasks) {
2750                 ForkJoinTask<T> f = new ForkJoinTask.AdaptedCallable<T>(t);

2751                 externalPush(f);
2752                 fs.add(f);
2753             }
2754             for (ForkJoinTask<T> f : fs)
2755                 f.quietlyJoin();
2756             done = true;
2757             return futures;
2758         } finally {
2759             if (!done)
2760                 for (ForkJoinTask<T> f : fs)
2761                     f.cancel(false);
2762         }
2763     }
2764 
2765     /**
2766      * Returns the factory used for constructing new workers.
2767      *
2768      * @return the factory used for constructing new workers
2769      */
2770     public ForkJoinWorkerThreadFactory getFactory() {
2771         return factory;
2772     }
2773 
2774     /**
2775      * Returns the handler for internal worker threads that terminate
2776      * due to unrecoverable errors encountered while executing tasks.
2777      *
2778      * @return the handler, or {@code null} if none
2779      */
2780     public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
2781         return ueh;
2782     }
2783 
2784     /**
2785      * Returns the targeted parallelism level of this pool.
2786      *
2787      * @return the targeted parallelism level of this pool
2788      */
2789     public int getParallelism() {
2790         return config & SMASK;

2791     }
2792 
2793     /**
2794      * Returns the targeted parallelism level of the common pool.
2795      *
2796      * @return the targeted parallelism level of the common pool
2797      * @since 1.8
2798      */
2799     public static int getCommonPoolParallelism() {
2800         return commonPoolParallelism;
2801     }
2802 
2803     /**
2804      * Returns the number of worker threads that have started but not
2805      * yet terminated.  The result returned by this method may differ
2806      * from {@link #getParallelism} when threads are created to
2807      * maintain parallelism when others are cooperatively blocked.
2808      *
2809      * @return the number of worker threads
2810      */
2811     public int getPoolSize() {
2812         return (config & SMASK) + (short)(ctl >>> TC_SHIFT);
2813     }
2814 
2815     /**
2816      * Returns {@code true} if this pool uses local first-in-first-out
2817      * scheduling mode for forked tasks that are never joined.
2818      *
2819      * @return {@code true} if this pool uses async mode
2820      */


3038         if ((c & STOP_BIT) != 0)
3039             level = (tc == 0) ? "Terminated" : "Terminating";
3040         else
3041             level = plock < 0 ? "Shutting down" : "Running";
3042         return super.toString() +
3043             "[" + level +
3044             ", parallelism = " + pc +
3045             ", size = " + tc +
3046             ", active = " + ac +
3047             ", running = " + rc +
3048             ", steals = " + st +
3049             ", tasks = " + qt +
3050             ", submissions = " + qs +
3051             "]";
3052     }
3053 
3054     /**
3055      * Possibly initiates an orderly shutdown in which previously
3056      * submitted tasks are executed, but no new tasks will be
3057      * accepted. Invocation has no effect on execution state if this
3058      * is the {@link #commonPool}, and no additional effect if
3059      * already shut down.  Tasks that are in the process of being
3060      * submitted concurrently during the course of this method may or
3061      * may not be rejected.
3062      *
3063      * @throws SecurityException if a security manager exists and
3064      *         the caller is not permitted to modify threads
3065      *         because it does not hold {@link
3066      *         java.lang.RuntimePermission}{@code ("modifyThread")}
3067      */
3068     public void shutdown() {
3069         checkPermission();
3070         tryTerminate(false, true);
3071     }
3072 
3073     /**
3074      * Possibly attempts to cancel and/or stop all tasks, and reject
3075      * all subsequently submitted tasks.  Invocation has no effect on
3076      * execution state if this is the {@link #commonPool}, and no
3077      * additional effect if already shut down. Otherwise, tasks that
3078      * are in the process of being submitted or executed concurrently
3079      * during the course of this method may or may not be
3080      * rejected. This method cancels both existing and unexecuted
3081      * tasks, in order to permit termination in the presence of task
3082      * dependencies. So the method always returns an empty list
3083      * (unlike the case for some other Executors).
3084      *
3085      * @return an empty list
3086      * @throws SecurityException if a security manager exists and
3087      *         the caller is not permitted to modify threads
3088      *         because it does not hold {@link
3089      *         java.lang.RuntimePermission}{@code ("modifyThread")}
3090      */
3091     public List<Runnable> shutdownNow() {
3092         checkPermission();
3093         tryTerminate(true, true);
3094         return Collections.emptyList();
3095     }
3096 


3119      * @return {@code true} if terminating but not yet terminated
3120      */
3121     public boolean isTerminating() {
3122         long c = ctl;
3123         return ((c & STOP_BIT) != 0L &&
3124                 (short)(c >>> TC_SHIFT) != -(config & SMASK));
3125     }
3126 
3127     /**
3128      * Returns {@code true} if this pool has been shut down.
3129      *
3130      * @return {@code true} if this pool has been shut down
3131      */
3132     public boolean isShutdown() {
3133         return plock < 0;
3134     }
3135 
3136     /**
3137      * Blocks until all tasks have completed execution after a
3138      * shutdown request, or the timeout occurs, or the current thread
3139      * is interrupted, whichever happens first. Note that the {@link
3140      * #commonPool()} never terminates until program shutdown so
3141      * this method will always time out.

3142      *
3143      * @param timeout the maximum time to wait
3144      * @param unit the time unit of the timeout argument
3145      * @return {@code true} if this executor terminated and
3146      *         {@code false} if the timeout elapsed before termination
3147      * @throws InterruptedException if interrupted while waiting
3148      */
3149     public boolean awaitTermination(long timeout, TimeUnit unit)
3150         throws InterruptedException {






3151         long nanos = unit.toNanos(timeout);
3152         if (isTerminated())
3153             return true;
3154         long startTime = System.nanoTime();
3155         boolean terminated = false;
3156         synchronized (this) {
3157             for (long waitTime = nanos, millis = 0L;;) {
3158                 if (terminated = isTerminated() ||
3159                     waitTime <= 0L ||
3160                     (millis = unit.toMillis(waitTime)) <= 0L)
3161                     break;
3162                 wait(millis);
3163                 waitTime = nanos - (System.nanoTime() - startTime);
3164             }
3165         }
3166         return terminated;
3167     }
3168 
3169     /**
























































3170      * Interface for extending managed parallelism for tasks running
3171      * in {@link ForkJoinPool}s.
3172      *
3173      * <p>A {@code ManagedBlocker} provides two methods.  Method
3174      * {@code isReleasable} must return {@code true} if blocking is
3175      * not necessary. Method {@code block} blocks the current thread
3176      * if necessary (perhaps internally invoking {@code isReleasable}
3177      * before actually blocking). These actions are performed by any
3178      * thread invoking {@link ForkJoinPool#managedBlock}.  The
3179      * unusual methods in this API accommodate synchronizers that may,
3180      * but don't usually, block for long periods. Similarly, they
3181      * allow more efficient internal handling of cases in which
3182      * additional workers may be, but usually are not, needed to
3183      * ensure sufficient parallelism.  Toward this end,
3184      * implementations of method {@code isReleasable} must be amenable
3185      * to repeated invocation.
3186      *
3187      * <p>For example, here is a ManagedBlocker based on a
3188      * ReentrantLock:
3189      *  <pre> {@code
3190      * class ManagedLocker implements ManagedBlocker {
3191      *   final ReentrantLock lock;
3192      *   boolean hasLock = false;
3193      *   ManagedLocker(ReentrantLock lock) { this.lock = lock; }
3194      *   public boolean block() {
3195      *     if (!hasLock)
3196      *       lock.lock();
3197      *     return true;
3198      *   }
3199      *   public boolean isReleasable() {
3200      *     return hasLock || (hasLock = lock.tryLock());


3218      *   }
3219      *   public E getItem() { // call after pool.managedBlock completes
3220      *     return item;
3221      *   }
3222      * }}</pre>
3223      */
3224     public static interface ManagedBlocker {
3225         /**
3226          * Possibly blocks the current thread, for example waiting for
3227          * a lock or condition.
3228          *
3229          * @return {@code true} if no additional blocking is necessary
3230          * (i.e., if isReleasable would return true)
3231          * @throws InterruptedException if interrupted while waiting
3232          * (the method is not required to do so, but is allowed to)
3233          */
3234         boolean block() throws InterruptedException;
3235 
3236         /**
3237          * Returns {@code true} if blocking is unnecessary.

3238          */
3239         boolean isReleasable();
3240     }
3241 
3242     /**
3243      * Blocks in accord with the given blocker.  If the current thread
3244      * is a {@link ForkJoinWorkerThread}, this method possibly
3245      * arranges for a spare thread to be activated if necessary to
3246      * ensure sufficient parallelism while the current thread is blocked.
3247      *
3248      * <p>If the caller is not a {@link ForkJoinTask}, this method is
3249      * behaviorally equivalent to
3250      *  <pre> {@code
3251      * while (!blocker.isReleasable())
3252      *   if (blocker.block())
3253      *     return;
3254      * }</pre>
3255      *
3256      * If the caller is a {@code ForkJoinTask}, then the pool may
3257      * first be expanded to ensure parallelism, and later adjusted.


3302     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
3303         return new ForkJoinTask.AdaptedRunnable<T>(runnable, value);
3304     }
3305 
3306     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
3307         return new ForkJoinTask.AdaptedCallable<T>(callable);
3308     }
3309 
3310     // Unsafe mechanics
3311     private static final sun.misc.Unsafe U;
3312     private static final long CTL;
3313     private static final long PARKBLOCKER;
3314     private static final int ABASE;
3315     private static final int ASHIFT;
3316     private static final long STEALCOUNT;
3317     private static final long PLOCK;
3318     private static final long INDEXSEED;
3319     private static final long QLOCK;
3320 
3321     static {
3322         int s; // initialize field offsets for CAS etc
3323         try {
3324             U = sun.misc.Unsafe.getUnsafe();
3325             Class<?> k = ForkJoinPool.class;
3326             CTL = U.objectFieldOffset
3327                 (k.getDeclaredField("ctl"));
3328             STEALCOUNT = U.objectFieldOffset
3329                 (k.getDeclaredField("stealCount"));
3330             PLOCK = U.objectFieldOffset
3331                 (k.getDeclaredField("plock"));
3332             INDEXSEED = U.objectFieldOffset
3333                 (k.getDeclaredField("indexSeed"));
3334             Class<?> tk = Thread.class;
3335             PARKBLOCKER = U.objectFieldOffset
3336                 (tk.getDeclaredField("parkBlocker"));
3337             Class<?> wk = WorkQueue.class;
3338             QLOCK = U.objectFieldOffset
3339                 (wk.getDeclaredField("qlock"));
3340             Class<?> ak = ForkJoinTask[].class;
3341             ABASE = U.arrayBaseOffset(ak);
3342             s = U.arrayIndexScale(ak);
3343             ASHIFT = 31 - Integer.numberOfLeadingZeros(s);


3344         } catch (Exception e) {
3345             throw new Error(e);
3346         }
3347         if ((s & (s-1)) != 0)
3348             throw new Error("data type scale not a power of two");
3349 
3350         submitters = new ThreadLocal<Submitter>();
3351         ForkJoinWorkerThreadFactory fac = defaultForkJoinWorkerThreadFactory =
3352             new DefaultForkJoinWorkerThreadFactory();
3353         modifyThreadPermission = new RuntimePermission("modifyThread");
3354 
3355         /*
3356          * Establish common pool parameters.  For extra caution,
3357          * computations to set up common pool state are here; the
3358          * constructor just assigns these values to fields.
3359          */

3360 
3361         int par = 0;
3362         Thread.UncaughtExceptionHandler handler = null;
3363         try {  // TBD: limit or report ignored exceptions?







3364             String pp = System.getProperty
3365                 ("java.util.concurrent.ForkJoinPool.common.parallelism");
3366             String hp = System.getProperty
3367                 ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
3368             String fp = System.getProperty
3369                 ("java.util.concurrent.ForkJoinPool.common.threadFactory");




3370             if (fp != null)
3371                 fac = ((ForkJoinWorkerThreadFactory)ClassLoader.
3372                        getSystemClassLoader().loadClass(fp).newInstance());
3373             if (hp != null)
3374                 handler = ((Thread.UncaughtExceptionHandler)ClassLoader.
3375                            getSystemClassLoader().loadClass(hp).newInstance());
3376             if (pp != null)
3377                 par = Integer.parseInt(pp);
3378         } catch (Exception ignore) {
3379         }
3380 
3381         if (par <= 0)
3382             par = Runtime.getRuntime().availableProcessors();
3383         if (par > MAX_CAP)
3384             par = MAX_CAP;
3385         commonPoolParallelism = par;
3386         long np = (long)(-par); // precompute initial ctl value
3387         long ct = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
3388 
3389         commonPool = new ForkJoinPool(par, ct, fac, handler);
3390     }
3391 
3392 }


  18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  19  *
  20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  21  * or visit www.oracle.com if you need additional information or have any
  22  * questions.
  23  */
  24 
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/publicdomain/zero/1.0/
  34  */
  35 
  36 package java.util.concurrent;
  37 
  38 import java.lang.Thread.UncaughtExceptionHandler;
  39 import java.util.ArrayList;
  40 import java.util.Arrays;
  41 import java.util.Collection;
  42 import java.util.Collections;
  43 import java.util.List;
  44 import java.util.concurrent.AbstractExecutorService;
  45 import java.util.concurrent.Callable;
  46 import java.util.concurrent.ExecutorService;
  47 import java.util.concurrent.Future;
  48 import java.util.concurrent.RejectedExecutionException;
  49 import java.util.concurrent.RunnableFuture;
  50 import java.util.concurrent.TimeUnit;
  51 
  52 /**
  53  * An {@link ExecutorService} for running {@link ForkJoinTask}s.
  54  * A {@code ForkJoinPool} provides the entry point for submissions
  55  * from non-{@code ForkJoinTask} clients, as well as management and
  56  * monitoring operations.
  57  *
  58  * <p>A {@code ForkJoinPool} differs from other kinds of {@link


  88  * <p>In addition to execution and lifecycle control methods, this
  89  * class provides status check methods (for example
  90  * {@link #getStealCount}) that are intended to aid in developing,
  91  * tuning, and monitoring fork/join applications. Also, method
  92  * {@link #toString} returns indications of pool state in a
  93  * convenient form for informal monitoring.
  94  *
  95  * <p>As is the case with other ExecutorServices, there are three
  96  * main task execution methods summarized in the following table.
  97  * These are designed to be used primarily by clients not already
  98  * engaged in fork/join computations in the current pool.  The main
  99  * forms of these methods accept instances of {@code ForkJoinTask},
 100  * but overloaded forms also allow mixed execution of plain {@code
 101  * Runnable}- or {@code Callable}- based activities as well.  However,
 102  * tasks that are already executing in a pool should normally instead
 103  * use the within-computation forms listed in the table unless using
 104  * async event-style tasks that are not usually joined, in which case
 105  * there is little difference among choice of methods.
 106  *
 107  * <table BORDER CELLPADDING=3 CELLSPACING=1>
 108  * <caption>Summary of task execution methods</caption>
 109  *  <tr>
 110  *    <td></td>
 111  *    <td ALIGN=CENTER> <b>Call from non-fork/join clients</b></td>
 112  *    <td ALIGN=CENTER> <b>Call from within fork/join computations</b></td>
 113  *  </tr>
 114  *  <tr>
 115  *    <td> <b>Arrange async execution</b></td>
 116  *    <td> {@link #execute(ForkJoinTask)}</td>
 117  *    <td> {@link ForkJoinTask#fork}</td>
 118  *  </tr>
 119  *  <tr>
 120  *    <td> <b>Await and obtain result</b></td>
 121  *    <td> {@link #invoke(ForkJoinTask)}</td>
 122  *    <td> {@link ForkJoinTask#invoke}</td>
 123  *  </tr>
 124  *  <tr>
 125  *    <td> <b>Arrange exec and obtain Future</b></td>
 126  *    <td> {@link #submit(ForkJoinTask)}</td>
 127  *    <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td>
 128  *  </tr>
 129  * </table>
 130  *
 131  * <p>The common pool is by default constructed with default
 132  * parameters, but these may be controlled by setting three
 133  * {@linkplain System#getProperty system properties} with prefix
 134  * {@code "java.util.concurrent.ForkJoinPool.common."}:
 135  * {@code parallelism} -- a non-negative integer,
 136  * {@code threadFactory} -- the class name of a
 137  * {@link ForkJoinWorkerThreadFactory}, and
 138  * {@code exceptionHandler} --
 139  * the class name of a {@link UncaughtExceptionHandler}.
 140  * Upon any error in establishing these settings, default parameters
 141  * are used. It is possible to disable or limit the use of threads in
 142  * the common pool by setting the parallelism property to zero, and/or
 143  * using a factory that may return {@code null}.
 144  *
 145  * <p><b>Implementation notes</b>: This implementation restricts the
 146  * maximum number of running threads to 32767. Attempts to create
 147  * pools with greater than the maximum number result in
 148  * {@code IllegalArgumentException}.
 149  *
 150  * <p>This implementation rejects submitted tasks (that is, by throwing
 151  * {@link RejectedExecutionException}) only when the pool is shut down
 152  * or internal resources have been exhausted.
 153  *
 154  * @since 1.7
 155  * @author Doug Lea
 156  */
 157 public class ForkJoinPool extends AbstractExecutorService {
 158 
 159     /*
 160      * Implementation Overview
 161      *
 162      * This class and its nested classes provide the main
 163      * functionality and control for a set of worker threads:


 213      * base index, else consider alternative actions, rather than
 214      * method poll.)
 215      *
 216      * This approach also enables support of a user mode in which local
 217      * task processing is in FIFO, not LIFO order, simply by using
 218      * poll rather than pop.  This can be useful in message-passing
 219      * frameworks in which tasks are never joined.  However neither
 220      * mode considers affinities, loads, cache localities, etc, so
 221      * rarely provide the best possible performance on a given
 222      * machine, but portably provide good throughput by averaging over
 223      * these factors.  (Further, even if we did try to use such
 224      * information, we do not usually have a basis for exploiting it.
 225      * For example, some sets of tasks profit from cache affinities,
 226      * but others are harmed by cache pollution effects.)
 227      *
 228      * WorkQueues are also used in a similar way for tasks submitted
 229      * to the pool. We cannot mix these tasks in the same queues used
 230      * for work-stealing (this would contaminate lifo/fifo
 231      * processing). Instead, we randomly associate submission queues
 232      * with submitting threads, using a form of hashing.  The
 233      * ThreadLocalRandom probe value serves as a hash code for
 234      * choosing existing queues, and may be randomly repositioned upon
 235      * contention with other submitters.  In essence, submitters act
 236      * like workers except that they are restricted to executing local
 237      * tasks that they submitted (or in the case of CountedCompleters,
 238      * others with the same root task).  However, because most
 239      * shared/external queue operations are more expensive than
 240      * internal, and because, at steady state, external submitters
 241      * will compete for CPU with workers, ForkJoinTask.join and
 242      * related methods disable them from repeatedly helping to process
 243      * tasks if all workers are active.  Insertion of tasks in shared
 244      * mode requires a lock (mainly to protect in the case of
 245      * resizing) but we use only a simple spinlock (using bits in
 246      * field qlock), because submitters encountering a busy queue move
 247      * on to try or create other queues -- they block only when
 248      * creating and registering new queues.
 249      *
 250      * Management
 251      * ==========
 252      *
 253      * The main throughput advantages of work-stealing stem from
 254      * decentralized control -- workers mostly take tasks from
 255      * themselves or each other. We cannot negate this in the
 256      * implementation of other management responsibilities. The main
 257      * tactic for avoiding bottlenecks is packing nearly all
 258      * essentially atomic control state into two volatile variables
 259      * that are by far most often read (not written) as status and
 260      * consistency checks.
 261      *
 262      * Field "ctl" contains 64 bits holding all the information needed
 263      * to atomically decide to add, inactivate, enqueue (on an event
 264      * queue), dequeue, and/or re-activate workers.  To enable this


 457      * helping opportunities is challenging to control on JVMs, where
 458      * GC and other activities can stall progress of tasks that in
 459      * turn stall out many other dependent tasks, without us being
 460      * able to determine whether they will ever require compensation.
 461      * Even though work-stealing otherwise encounters little
 462      * degradation in the presence of more threads than cores,
 463      * aggressively adding new threads in such cases entails risk of
 464      * unwanted positive feedback control loops in which more threads
 465      * cause more dependent stalls (as well as delayed progress of
 466      * unblocked threads to the point that we know they are available)
 467      * leading to more situations requiring more threads, and so
 468      * on. This aspect of control can be seen as an (analytically
 469      * intractable) game with an opponent that may choose the worst
 470      * (for us) active thread to stall at any time.  We take several
 471      * precautions to bound losses (and thus bound gains), mainly in
 472      * methods tryCompensate and awaitJoin.
 473      *
 474      * Common Pool
 475      * ===========
 476      *
 477      * The static common Pool always exists after static
 478      * initialization.  Since it (or any other created pool) need
 479      * never be used, we minimize initial construction overhead and
 480      * footprint to the setup of about a dozen fields, with no nested
 481      * allocation. Most bootstrapping occurs within method
 482      * fullExternalPush during the first submission to the pool.
 483      *
 484      * When external threads submit to the common pool, they can
 485      * perform some subtask processing (see externalHelpJoin and
 486      * related methods).  We do not need to record whether these
 487      * submissions are to the common pool -- if not, externalHelpJoin
 488      * returns quickly (at the most helping to signal some common pool
 489      * workers). These submitters would otherwise be blocked waiting
 490      * for completion, so the extra effort (with liberally sprinkled
 491      * task status checks) in inapplicable cases amounts to an odd
 492      * form of limited spin-wait before blocking in ForkJoinTask.join.
 493      *
 494      * Style notes
 495      * ===========
 496      *
 497      * There is a lot of representation-level coupling among classes


 536     private static void checkPermission() {
 537         SecurityManager security = System.getSecurityManager();
 538         if (security != null)
 539             security.checkPermission(modifyThreadPermission);
 540     }
 541 
 542     // Nested classes
 543 
 544     /**
 545      * Factory for creating new {@link ForkJoinWorkerThread}s.
 546      * A {@code ForkJoinWorkerThreadFactory} must be defined and used
 547      * for {@code ForkJoinWorkerThread} subclasses that extend base
 548      * functionality or initialize threads with different contexts.
 549      */
 550     public static interface ForkJoinWorkerThreadFactory {
 551         /**
 552          * Returns a new worker thread operating in the given pool.
 553          *
 554          * @param pool the pool this thread works in
 555          * @throws NullPointerException if the pool is null
 556          * @return the new worker thread
 557          */
 558         public ForkJoinWorkerThread newThread(ForkJoinPool pool);
 559     }
 560 
 561     /**
 562      * Default ForkJoinWorkerThreadFactory implementation; creates a
 563      * new ForkJoinWorkerThread.
 564      */
 565     static final class DefaultForkJoinWorkerThreadFactory
 566         implements ForkJoinWorkerThreadFactory {
 567         public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
 568             return new ForkJoinWorkerThread(pool);
 569         }
 570     }
 571 
 572     /**




















 573      * Class for artificial tasks that are used to replace the target
 574      * of local joins if they are removed from an interior queue slot
 575      * in WorkQueue.tryRemoveAndExec. We don't need the proxy to
 576      * actually do anything beyond having a unique identity.
 577      */
 578     static final class EmptyTask extends ForkJoinTask<Void> {
 579         private static final long serialVersionUID = -7721805057305804111L;
 580         EmptyTask() { status = ForkJoinTask.NORMAL; } // force done
 581         public final Void getRawResult() { return null; }
 582         public final void setRawResult(Void x) {}
 583         public final boolean exec() { return true; }
 584     }
 585 
 586     /**
 587      * Queues supporting work-stealing as well as external task
 588      * submission. See above for main rationale and algorithms.
 589      * Implementation relies heavily on "Unsafe" intrinsics
 590      * and selective use of "volatile":
 591      *
 592      * Field "base" is the index (mod array.length) of the least valid


 706          * Provides a more accurate estimate of whether this queue has
 707          * any tasks than does queueSize, by checking whether a
 708          * near-empty queue has at least one unclaimed task.
 709          */
 710         final boolean isEmpty() {
 711             ForkJoinTask<?>[] a; int m, s;
 712             int n = base - (s = top);
 713             return (n >= 0 ||
 714                     (n == -1 &&
 715                      ((a = array) == null ||
 716                       (m = a.length - 1) < 0 ||
 717                       U.getObject
 718                       (a, (long)((m & (s - 1)) << ASHIFT) + ABASE) == null)));
 719         }
 720 
 721         /**
 722          * Pushes a task. Call only by owner in unshared queues.  (The
 723          * shared-queue version is embedded in method externalPush.)
 724          *
 725          * @param task the task. Caller must ensure non-null.
 726          * @throws RejectedExecutionException if array cannot be resized
 727          */
 728         final void push(ForkJoinTask<?> task) {
 729             ForkJoinTask<?>[] a; ForkJoinPool p;
 730             int s = top, m, n;
 731             if ((a = array) != null) {    // ignore if queue removed
 732                 int j = (((m = a.length - 1) & s) << ASHIFT) + ABASE;
 733                 U.putOrderedObject(a, j, task);
 734                 if ((n = (top = s + 1) - base) <= 2) {
 735                     if ((p = pool) != null)
 736                         p.signalWork(this);
 737                 }
 738                 else if (n >= m)
 739                     growArray();
 740             }
 741         }
 742 
 743        /**
 744          * Initializes or doubles the capacity of array. Call either
 745          * by owner or with lock held -- it is OK for base, but not
 746          * top, to move while resizings are in progress.


 905                 if (U.compareAndSwapObject(a, j, t, null)) {
 906                     top = s;
 907                     t.doExec();
 908                 }
 909             }
 910         }
 911 
 912         /**
 913          * Polls and runs tasks until empty.
 914          */
 915         private void pollAndExecAll() {
 916             for (ForkJoinTask<?> t; (t = poll()) != null;)
 917                 t.doExec();
 918         }
 919 
 920         /**
 921          * If present, removes from queue and executes the given task,
 922          * or any other cancelled task. Returns (true) on any CAS
 923          * or consistency check failure so caller can retry.
 924          *
 925          * @return false if no progress can be made, else true
 926          */
 927         final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
 928             boolean stat = true, removed = false, empty = true;
 929             ForkJoinTask<?>[] a; int m, s, b, n;
 930             if ((a = array) != null && (m = a.length - 1) >= 0 &&
 931                 (n = (s = top) - (b = base)) > 0) {
 932                 for (ForkJoinTask<?> t;;) {           // traverse from s to b
 933                     int j = ((--s & m) << ASHIFT) + ABASE;
 934                     t = (ForkJoinTask<?>)U.getObjectVolatile(a, j);
 935                     if (t == null)                    // inconsistent length
 936                         break;
 937                     else if (t == task) {
 938                         if (s + 1 == top) {           // pop
 939                             if (!U.compareAndSwapObject(a, j, task, null))
 940                                 break;
 941                             top = s;
 942                             removed = true;
 943                         }
 944                         else if (base == b)           // replace with proxy
 945                             removed = U.compareAndSwapObject(a, j, task,


 950                         empty = false;
 951                     else if (s + 1 == top) {          // pop and throw away
 952                         if (U.compareAndSwapObject(a, j, t, null))
 953                             top = s;
 954                         break;
 955                     }
 956                     if (--n == 0) {
 957                         if (!empty && base == b)
 958                             stat = false;
 959                         break;
 960                     }
 961                 }
 962             }
 963             if (removed)
 964                 task.doExec();
 965             return stat;
 966         }
 967 
 968         /**
 969          * Polls for and executes the given task or any other task in
 970          * its CountedCompleter computation.
 971          */
 972         final boolean pollAndExecCC(ForkJoinTask<?> root) {
 973             ForkJoinTask<?>[] a; int b; Object o;
 974             outer: while ((b = base) - top < 0 && (a = array) != null) {
 975                 long j = (((a.length - 1) & b) << ASHIFT) + ABASE;
 976                 if ((o = U.getObject(a, j)) == null ||
 977                     !(o instanceof CountedCompleter))
 978                     break;
 979                 for (CountedCompleter<?> t = (CountedCompleter<?>)o, r = t;;) {
 980                     if (r == root) {
 981                         if (base == b &&
 982                             U.compareAndSwapObject(a, j, t, null)) {
 983                             base = b + 1;
 984                             t.doExec();
 985                             return true;
 986                         }
 987                         else
 988                             break; // restart
 989                     }
 990                     if ((r = r.completer) == null)


1024         }
1025 
1026         /**
1027          * Returns true if owned and not known to be blocked.
1028          */
1029         final boolean isApparentlyUnblocked() {
1030             Thread wt; Thread.State s;
1031             return (eventCount >= 0 &&
1032                     (wt = owner) != null &&
1033                     (s = wt.getState()) != Thread.State.BLOCKED &&
1034                     s != Thread.State.WAITING &&
1035                     s != Thread.State.TIMED_WAITING);
1036         }
1037 
1038         // Unsafe mechanics
1039         private static final sun.misc.Unsafe U;
1040         private static final long QLOCK;
1041         private static final int ABASE;
1042         private static final int ASHIFT;
1043         static {

1044             try {
1045                 U = sun.misc.Unsafe.getUnsafe();
1046                 Class<?> k = WorkQueue.class;
1047                 Class<?> ak = ForkJoinTask[].class;
1048                 QLOCK = U.objectFieldOffset
1049                     (k.getDeclaredField("qlock"));
1050                 ABASE = U.arrayBaseOffset(ak);
1051                 int scale = U.arrayIndexScale(ak);
1052                 if ((scale & (scale - 1)) != 0)
1053                     throw new Error("data type scale not a power of two");
1054                 ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
1055             } catch (Exception e) {
1056                 throw new Error(e);
1057             }



1058         }
1059     }
1060 
1061     // static fields (initialized in static initializer below)
1062 
1063     /**
1064      * Creates a new ForkJoinWorkerThread. This factory is used unless
1065      * overridden in ForkJoinPool constructors.
1066      */
1067     public static final ForkJoinWorkerThreadFactory
1068         defaultForkJoinWorkerThreadFactory;
1069 
1070     /**









1071      * Permission required for callers of methods that may start or
1072      * kill threads.
1073      */
1074     private static final RuntimePermission modifyThreadPermission;
1075 
1076     /**
1077      * Common (static) pool. Non-null for public use unless a static
1078      * construction exception, but internal usages null-check on use
1079      * to paranoically avoid potential initialization circularities
1080      * as well as to simplify generated code.
1081      */
1082     static final ForkJoinPool common;
1083 
1084     /**
1085      * Common pool parallelism. To allow simpler use and management
1086      * when common pool threads are disabled, we allow the underlying
1087      * common.config field to be zero, but in that case still report
1088      * parallelism as 1 to reflect resulting caller-runs mechanics.
1089      */
1090     static final int commonParallelism;
1091 
1092     /**
1093      * Sequence number for creating workerNamePrefix.
1094      */
1095     private static int poolNumberSequence;
1096 
1097     /**
1098      * Returns the next sequence number. We don't expect this to
1099      * ever contend, so use simple builtin sync.
1100      */
1101     private static final synchronized int nextPoolId() {
1102         return ++poolNumberSequence;
1103     }
1104 
1105     // static constants
1106 
1107     /**
1108      * Initial timeout value (in nanoseconds) for the thread
1109      * triggering quiescence to park waiting for new work. On timeout,
1110      * the thread will instead try to shrink the number of
1111      * workers. The value should be large enough to avoid overly
1112      * aggressive shrinkage during most transient stalls (long GCs
1113      * etc).
1114      */
1115     private static final long IDLE_TIMEOUT      = 2000L * 1000L * 1000L; // 2sec
1116 
1117     /**
1118      * Timeout value when there are more threads than parallelism level
1119      */


1230 
1231     // Instance fields
1232 
1233     /*
1234      * Field layout of this class tends to matter more than one would
1235      * like. Runtime layout order is only loosely related to
1236      * declaration order and may differ across JVMs, but the following
1237      * empirically works OK on current JVMs.
1238      */
1239 
1240     // Heuristic padding to ameliorate unfortunate memory placements
1241     volatile long pad00, pad01, pad02, pad03, pad04, pad05, pad06;
1242 
1243     volatile long stealCount;                  // collects worker counts
1244     volatile long ctl;                         // main pool control
1245     volatile int plock;                        // shutdown status and seqLock
1246     volatile int indexSeed;                    // worker/submitter index seed
1247     final int config;                          // mode and parallelism level
1248     WorkQueue[] workQueues;                    // main registry
1249     final ForkJoinWorkerThreadFactory factory;
1250     final UncaughtExceptionHandler ueh;        // per-worker UEH
1251     final String workerNamePrefix;             // to create worker name string
1252 
1253     volatile Object pad10, pad11, pad12, pad13, pad14, pad15, pad16, pad17;
1254     volatile Object pad18, pad19, pad1a, pad1b;
1255 
1256     /**
1257      * Acquires the plock lock to protect worker array and related
1258      * updates. This method is called only if an initial CAS on plock
1259      * fails. This acts as a spinlock for normal cases, but falls back
1260      * to builtin monitor to block when (rarely) needed. This would be
1261      * a terrible idea for a highly contended lock, but works fine as
1262      * a more conservative alternative to a pure spinlock.
1263      */
1264     private int acquirePlock() {
1265         int spins = PL_SPINS, ps, nps;
1266         for (;;) {
1267             if (((ps = plock) & PL_LOCK) == 0 &&
1268                 U.compareAndSwapInt(this, PLOCK, ps, nps = ps + PL_LOCK))
1269                 return nps;










1270             else if (spins >= 0) {
1271                 if (ThreadLocalRandom.nextSecondarySeed() >= 0)

1272                     --spins;
1273             }
1274             else if (U.compareAndSwapInt(this, PLOCK, ps, ps | PL_SIGNAL)) {
1275                 synchronized (this) {
1276                     if ((plock & PL_SIGNAL) != 0) {
1277                         try {
1278                             wait();
1279                         } catch (InterruptedException ie) {
1280                             try {
1281                                 Thread.currentThread().interrupt();
1282                             } catch (SecurityException ignore) {
1283                             }
1284                         }
1285                     }
1286                     else
1287                         notifyAll();
1288                 }
1289             }
1290         }
1291     }
1292 
1293     /**
1294      * Unlocks and signals any thread waiting for plock. Called only
1295      * when CAS of seq value for unlock fails.
1296      */
1297     private void releasePlock(int ps) {
1298         plock = ps;
1299         synchronized (this) { notifyAll(); }
1300     }
1301 
1302     /**

































1303      * Tries to create and start one worker if fewer than target
1304      * parallelism level exist. Adjusts counts etc on failure.
1305      */
1306     private void tryAddWorker() {
1307         long c; int u;
1308         while ((u = (int)((c = ctl) >>> 32)) < 0 &&
1309                (u & SHORT_SIGN) != 0 && (int)c == 0) {
1310             long nc = (long)(((u + UTC_UNIT) & UTC_MASK) |
1311                              ((u + UAC_UNIT) & UAC_MASK)) << 32;
1312             if (U.compareAndSwapLong(this, CTL, c, nc)) {
1313                 ForkJoinWorkerThreadFactory fac;
1314                 Throwable ex = null;
1315                 ForkJoinWorkerThread wt = null;
1316                 try {
1317                     if ((fac = factory) != null &&
1318                         (wt = fac.newThread(this)) != null) {
1319                         wt.start();
1320                         break;
1321                     }
1322                 } catch (Throwable e) {


1324                 }
1325                 deregisterWorker(wt, ex);
1326                 break;
1327             }
1328         }
1329     }
1330 
1331     //  Registering and deregistering workers
1332 
1333     /**
1334      * Callback from ForkJoinWorkerThread to establish and record its
1335      * WorkQueue. To avoid scanning bias due to packing entries in
1336      * front of the workQueues array, we treat the array as a simple
1337      * power-of-two hash table using per-thread seed as hash,
1338      * expanding as needed.
1339      *
1340      * @param wt the worker thread
1341      * @return the worker's queue
1342      */
1343     final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
1344         UncaughtExceptionHandler handler; WorkQueue[] ws; int s, ps;
1345         wt.setDaemon(true);
1346         if ((handler = ueh) != null)
1347             wt.setUncaughtExceptionHandler(handler);
1348         do {} while (!U.compareAndSwapInt(this, INDEXSEED, s = indexSeed,
1349                                           s += SEED_INCREMENT) ||
1350                      s == 0); // skip 0
1351         WorkQueue w = new WorkQueue(this, wt, config >>> 16, s);
1352         if (((ps = plock) & PL_LOCK) != 0 ||
1353             !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
1354             ps = acquirePlock();
1355         int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
1356         try {
1357             if ((ws = workQueues) != null) {    // skip if shutting down
1358                 int n = ws.length, m = n - 1;
1359                 int r = (s << 1) | 1;           // use odd-numbered indices
1360                 if (ws[r &= m] != null) {       // collision
1361                     int probes = 0;             // step by approx half size
1362                     int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
1363                     while (ws[r = (r + step) & m] != null) {
1364                         if (++probes >= n) {


1368                         }
1369                     }
1370                 }
1371                 w.eventCount = w.poolIndex = r; // volatile write orders
1372                 ws[r] = w;
1373             }
1374         } finally {
1375             if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
1376                 releasePlock(nps);
1377         }
1378         wt.setName(workerNamePrefix.concat(Integer.toString(w.poolIndex)));
1379         return w;
1380     }
1381 
1382     /**
1383      * Final callback from terminating worker, as well as upon failure
1384      * to construct or start a worker.  Removes record of worker from
1385      * array, and adjusts counts. If pool is shutting down, tries to
1386      * complete termination.
1387      *
1388      * @param wt the worker thread, or null if construction failed
1389      * @param ex the exception causing failure, or null if none
1390      */
1391     final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
1392         WorkQueue w = null;
1393         if (wt != null && (w = wt.workQueue) != null) {
1394             int ps;
1395             w.qlock = -1;                // ensure set
1396             long ns = w.nsteals, sc;     // collect steal count
1397             do {} while (!U.compareAndSwapLong(this, STEALCOUNT,
1398                                                sc = stealCount, sc + ns));
1399             if (((ps = plock) & PL_LOCK) != 0 ||
1400                 !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
1401                 ps = acquirePlock();
1402             int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
1403             try {
1404                 int idx = w.poolIndex;
1405                 WorkQueue[] ws = workQueues;
1406                 if (ws != null && idx >= 0 && idx < ws.length && ws[idx] == w)
1407                     ws[idx] = null;
1408             } finally {
1409                 if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
1410                     releasePlock(nps);
1411             }
1412         }
1413 
1414         long c;                          // adjust ctl counts
1415         do {} while (!U.compareAndSwapLong
1416                      (this, CTL, c = ctl, (((c - AC_UNIT) & AC_MASK) |
1417                                            ((c - TC_UNIT) & TC_MASK) |
1418                                            (c & ~(AC_MASK|TC_MASK)))));
1419 
1420         if (!tryTerminate(false, false) && w != null && w.array != null) {
1421             w.cancelAll();               // cancel remaining tasks
1422             WorkQueue[] ws; WorkQueue v; Thread p; int u, i, e;
1423             while ((u = (int)((c = ctl) >>> 32)) < 0 && (e = (int)c) >= 0) {
1424                 if (e > 0) {             // activate or create replacement
1425                     if ((ws = workQueues) == null ||
1426                         (i = e & SMASK) >= ws.length ||
1427                         (v = ws[i]) == null)
1428                         break;
1429                     long nc = (((long)(v.nextWait & E_MASK)) |
1430                                ((long)(u + UAC_UNIT) << 32));
1431                     if (v.eventCount != (e | INT_SIGN))
1432                         break;
1433                     if (U.compareAndSwapLong(this, CTL, c, nc)) {
1434                         v.eventCount = (e + E_SEQ) & E_MASK;
1435                         if ((p = v.parker) != null)
1436                             U.unpark(p);
1437                         break;
1438                     }
1439                 }
1440                 else {
1441                     if ((short)u < 0)
1442                         tryAddWorker();
1443                     break;
1444                 }
1445             }
1446         }
1447         if (ex == null)                     // help clean refs on way out
1448             ForkJoinTask.helpExpungeStaleExceptions();
1449         else                                // rethrow
1450             ForkJoinTask.rethrow(ex);
1451     }
1452 
1453     // Submissions
1454 
1455     /**
1456      * Unless shutting down, adds the given task to a submission queue
1457      * at submitter's current queue index (modulo submission
1458      * range). Only the most common path is directly handled in this
1459      * method. All others are relayed to fullExternalPush.
1460      *
1461      * @param task the task. Caller must ensure non-null.
1462      */
1463     final void externalPush(ForkJoinTask<?> task) {
1464         WorkQueue[] ws; WorkQueue q; int z, m; ForkJoinTask<?>[] a;
1465         if ((z = ThreadLocalRandom.getProbe()) != 0 && plock > 0 &&
1466             (ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
1467             (q = ws[m & z & SQMASK]) != null &&
1468             U.compareAndSwapInt(q, QLOCK, 0, 1)) { // lock
1469             int b = q.base, s = q.top, n, an;
1470             if ((a = q.array) != null && (an = a.length) > (n = s + 1 - b)) {
1471                 int j = (((an - 1) & s) << ASHIFT) + ABASE;
1472                 U.putOrderedObject(a, j, task);
1473                 q.top = s + 1;                     // push on to deque
1474                 q.qlock = 0;
1475                 if (n <= 2)
1476                     signalWork(q);
1477                 return;
1478             }
1479             q.qlock = 0;
1480         }
1481         fullExternalPush(task);
1482     }
1483 
1484     /**
1485      * Full version of externalPush. This method is called, among
1486      * other times, upon the first submission of the first task to the
1487      * pool, so must perform secondary initialization.  It also
1488      * detects first submission by an external thread by looking up
1489      * its ThreadLocal, and creates a new shared queue if the one at
1490      * index if empty or contended. The plock lock body must be
1491      * exception-free (so no try/finally) so we optimistically
1492      * allocate new queues outside the lock and throw them away if
1493      * (very rarely) not needed.
1494      *
1495      * Secondary initialization occurs when plock is zero, to create
1496      * workQueue array and set plock to a valid value.  This lock body
1497      * must also be exception-free. Because the plock seq value can
1498      * eventually wrap around zero, this method harmlessly fails to
1499      * reinitialize if workQueues exists, while still advancing plock.
1500      */
1501     private void fullExternalPush(ForkJoinTask<?> task) {
1502         int r;
1503         if ((r = ThreadLocalRandom.getProbe()) == 0) {
1504             ThreadLocalRandom.localInit();
1505             r = ThreadLocalRandom.getProbe();



1506         }
1507         for (;;) {
1508             WorkQueue[] ws; WorkQueue q; int ps, m, k;
1509             boolean move = false;
1510             if ((ps = plock) < 0)



1511                 throw new RejectedExecutionException();
1512             else if (ps == 0 || (ws = workQueues) == null ||
1513                      (m = ws.length - 1) < 0) { // initialize workQueues
1514                 int p = config & SMASK;         // find power of two table size
1515                 int n = (p > 1) ? p - 1 : 1;    // ensure at least 2 slots
1516                 n |= n >>> 1; n |= n >>> 2;  n |= n >>> 4;
1517                 n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
1518                 WorkQueue[] nws = ((ws = workQueues) == null || ws.length == 0 ?
1519                                    new WorkQueue[n] : null);
1520                 if (((ps = plock) & PL_LOCK) != 0 ||
1521                     !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
1522                     ps = acquirePlock();
1523                 if (((ws = workQueues) == null || ws.length == 0) && nws != null)
1524                     workQueues = nws;
1525                 int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
1526                 if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
1527                     releasePlock(nps);
1528             }
1529             else if ((q = ws[k = r & m & SQMASK]) != null) {
1530                 if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
1531                     ForkJoinTask<?>[] a = q.array;
1532                     int s = q.top;
1533                     boolean submitted = false;
1534                     try {                      // locked version of push
1535                         if ((a != null && a.length > s + 1 - q.base) ||
1536                             (a = q.growArray()) != null) {   // must presize
1537                             int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
1538                             U.putOrderedObject(a, j, task);
1539                             q.top = s + 1;
1540                             submitted = true;
1541                         }
1542                     } finally {
1543                         q.qlock = 0;  // unlock
1544                     }
1545                     if (submitted) {
1546                         signalWork(q);
1547                         return;
1548                     }
1549                 }
1550                 move = true; // move on failure
1551             }
1552             else if (((ps = plock) & PL_LOCK) == 0) { // create new queue
1553                 q = new WorkQueue(this, null, SHARED_QUEUE, r);
1554                 if (((ps = plock) & PL_LOCK) != 0 ||
1555                     !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
1556                     ps = acquirePlock();
1557                 if ((ws = workQueues) != null && k < ws.length && ws[k] == null)
1558                     ws[k] = q;
1559                 int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
1560                 if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
1561                     releasePlock(nps);
1562             }
1563             else
1564                 move = true; // move if busy
1565             if (move)
1566                 r = ThreadLocalRandom.advanceProbe(r);
1567         }
1568     }
1569 
1570     // Maintaining ctl counts
1571 
1572     /**
1573      * Increments active count; mainly called upon return from blocking.
1574      */
1575     final void incrementActiveCount() {
1576         long c;
1577         do {} while (!U.compareAndSwapLong(this, CTL, c = ctl, c + AC_UNIT));
1578     }
1579 
1580     /**
1581      * Tries to create or activate a worker if too few are active.
1582      *
1583      * @param q the (non-null) queue holding tasks to be signalled
1584      */
1585     final void signalWork(WorkQueue q) {
1586         int hint = q.poolIndex;


1637      * completing the sweep. If the worker is not inactivated, it
1638      * takes and returns a task from this queue. Otherwise, if not
1639      * activated, it signals workers (that may include itself) and
1640      * returns so caller can retry. Also returns for true if the
1641      * worker array may have changed during an empty scan.  On failure
1642      * to find a task, we take one of the following actions, after
1643      * which the caller will retry calling this method unless
1644      * terminated.
1645      *
1646      * * If pool is terminating, terminate the worker.
1647      *
1648      * * If not already enqueued, try to inactivate and enqueue the
1649      * worker on wait queue. Or, if inactivating has caused the pool
1650      * to be quiescent, relay to idleAwaitWork to possibly shrink
1651      * pool.
1652      *
1653      * * If already enqueued and none of the above apply, possibly
1654      * park awaiting signal, else lingering to help scan and signal.
1655      *
1656      * * If a non-empty queue discovered or left as a hint,
1657      * help wake up other workers before return.
1658      *
1659      * @param w the worker (via its WorkQueue)
1660      * @return a task or null if none found
1661      */
1662     private final ForkJoinTask<?> scan(WorkQueue w) {
1663         WorkQueue[] ws; int m;
1664         int ps = plock;                          // read plock before ws
1665         if (w != null && (ws = workQueues) != null && (m = ws.length - 1) >= 0) {
1666             int ec = w.eventCount;               // ec is negative if inactive
1667             int r = w.seed; r ^= r << 13; r ^= r >>> 17; w.seed = r ^= r << 5;
1668             w.hint = -1;                         // update seed and clear hint
1669             int j = ((m + m + 1) | MIN_SCAN) & MAX_SCAN;
1670             do {
1671                 WorkQueue q; ForkJoinTask<?>[] a; int b;
1672                 if ((q = ws[(r + j) & m]) != null && (b = q.base) - q.top < 0 &&
1673                     (a = q.array) != null) {     // probably nonempty
1674                     int i = (((a.length - 1) & b) << ASHIFT) + ABASE;
1675                     ForkJoinTask<?> t = (ForkJoinTask<?>)
1676                         U.getObjectVolatile(a, i);
1677                     if (q.base == b && ec >= 0 && t != null &&


1692                 if (U.compareAndSwapLong(this, STEALCOUNT,
1693                                          sc = stealCount, sc + ns))
1694                     w.nsteals = 0;               // collect steals and rescan
1695             }
1696             else if (plock != ps)                // consistency check
1697                 ;                                // skip
1698             else if ((e = (int)(c = ctl)) < 0)
1699                 w.qlock = -1;                    // pool is terminating
1700             else {
1701                 if ((h = w.hint) < 0) {
1702                     if (ec >= 0) {               // try to enqueue/inactivate
1703                         long nc = (((long)ec |
1704                                     ((c - AC_UNIT) & (AC_MASK|TC_MASK))));
1705                         w.nextWait = e;          // link and mark inactive
1706                         w.eventCount = ec | INT_SIGN;
1707                         if (ctl != c || !U.compareAndSwapLong(this, CTL, c, nc))
1708                             w.eventCount = ec;   // unmark on CAS failure
1709                         else if ((int)(c >> AC_SHIFT) == 1 - (config & SMASK))
1710                             idleAwaitWork(w, nc, c);
1711                     }
1712                     else if (w.eventCount < 0 && ctl == c) {

1713                         Thread wt = Thread.currentThread();
1714                         Thread.interrupted();    // clear status
1715                         U.putObject(wt, PARKBLOCKER, this);
1716                         w.parker = wt;           // emulate LockSupport.park
1717                         if (w.eventCount < 0)    // recheck
1718                             U.park(false, 0L);   // block
1719                         w.parker = null;
1720                         U.putObject(wt, PARKBLOCKER, null);
1721                     }
1722                 }
1723                 if ((h >= 0 || (h = w.hint) >= 0) &&
1724                     (ws = workQueues) != null && h < ws.length &&
1725                     (q = ws[h]) != null) {      // signal others before retry
1726                     WorkQueue v; Thread p; int u, i, s;
1727                     for (int n = (config & SMASK) - 1;;) {
1728                         int idleCount = (w.eventCount < 0) ? 0 : -1;
1729                         if (((s = idleCount - q.base + q.top) <= n &&
1730                              (n = s) <= 0) ||
1731                             (u = (int)((c = ctl) >>> 32)) >= 0 ||
1732                             (e = (int)c) <= 0 || m < (i = e & SMASK) ||
1733                             (v = ws[i]) == null)
1734                             break;
1735                         long nc = (((long)(v.nextWait & E_MASK)) |
1736                                    ((long)(u + UAC_UNIT) << 32));
1737                         if (v.eventCount != (e | INT_SIGN) ||
1738                             !U.compareAndSwapLong(this, CTL, c, nc))
1739                             break;
1740                         v.hint = h;
1741                         v.eventCount = (e + E_SEQ) & E_MASK;
1742                         if ((p = v.parker) != null)
1743                             U.unpark(p);
1744                         if (--n <= 0)
1745                             break;
1746                     }
1747                 }
1748             }
1749         }
1750         return null;
1751     }
1752 
1753     /**
1754      * If inactivating worker w has caused the pool to become
1755      * quiescent, checks for pool termination, and, so long as this is
1756      * not the only worker, waits for event for up to a given
1757      * duration.  On timeout, if ctl has not changed, terminates the
1758      * worker, which will in turn wake up another worker to possibly
1759      * repeat this process.
1760      *
1761      * @param w the calling worker
1762      * @param currentCtl the ctl value triggering possible quiescence
1763      * @param prevCtl the ctl value to restore if thread is terminated
1764      */
1765     private void idleAwaitWork(WorkQueue w, long currentCtl, long prevCtl) {
1766         if (w != null && w.eventCount < 0 &&
1767             !tryTerminate(false, false) && (int)prevCtl != 0 &&
1768             ctl == currentCtl) {
1769             int dc = -(short)(currentCtl >>> TC_SHIFT);
1770             long parkTime = dc < 0 ? FAST_IDLE_TIMEOUT: (dc + 1) * IDLE_TIMEOUT;
1771             long deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
1772             Thread wt = Thread.currentThread();
1773             while (ctl == currentCtl) {
1774                 Thread.interrupted();  // timed variant of version in scan()
1775                 U.putObject(wt, PARKBLOCKER, this);
1776                 w.parker = wt;
1777                 if (ctl == currentCtl)
1778                     U.park(false, parkTime);
1779                 w.parker = null;
1780                 U.putObject(wt, PARKBLOCKER, null);
1781                 if (ctl != currentCtl)
1782                     break;
1783                 if (deadline - System.nanoTime() <= 0L &&
1784                     U.compareAndSwapLong(this, CTL, currentCtl, prevCtl)) {
1785                     w.eventCount = (w.eventCount + E_SEQ) | E_MASK;
1786                     w.hint = -1;
1787                     w.qlock = -1;   // shrink
1788                     break;
1789                 }
1790             }
1791         }
1792     }
1793 
1794     /**
1795      * Scans through queues looking for work while joining a task; if
1796      * any present, signals. May return early if more signalling is
1797      * detectably unneeded.
1798      *
1799      * @param task return early if done
1800      * @param origin an index to start scan
1801      */
1802     private void helpSignal(ForkJoinTask<?> task, int origin) {
1803         WorkQueue[] ws; WorkQueue w; Thread p; long c; int m, u, e, i, s;
1804         if (task != null && task.status >= 0 &&
1805             (u = (int)(ctl >>> 32)) < 0 && (u >> UAC_SHIFT) < 0 &&
1806             (ws = workQueues) != null && (m = ws.length - 1) >= 0) {


1908                             else {
1909                                 subtask = next;
1910                                 j = v;
1911                                 break;
1912                             }
1913                         }
1914                     }
1915                 }
1916             }
1917         }
1918         return stat;
1919     }
1920 
1921     /**
1922      * Analog of tryHelpStealer for CountedCompleters. Tries to steal
1923      * and run tasks within the target's computation.
1924      *
1925      * @param task the task to join
1926      * @param mode if shared, exit upon completing any task
1927      * if all workers are active

1928      */
1929     private int helpComplete(ForkJoinTask<?> task, int mode) {
1930         WorkQueue[] ws; WorkQueue q; int m, n, s, u;
1931         if (task != null && (ws = workQueues) != null &&
1932             (m = ws.length - 1) >= 0) {
1933             for (int j = 1, origin = j;;) {
1934                 if ((s = task.status) < 0)
1935                     return s;
1936                 if ((q = ws[j & m]) != null && q.pollAndExecCC(task)) {
1937                     origin = j;
1938                     if (mode == SHARED_QUEUE &&
1939                         ((u = (int)(ctl >>> 32)) >= 0 || (u >> UAC_SHIFT) >= 0))
1940                         break;
1941                 }
1942                 else if ((j = (j + 2) & m) == origin)
1943                     break;
1944             }
1945         }
1946         return 0;
1947     }


2059             ForkJoinTask<?> prevJoin = joiner.currentJoin;
2060             joiner.currentJoin = task;
2061             do {} while ((s = task.status) >= 0 && !joiner.isEmpty() &&
2062                          joiner.tryRemoveAndExec(task));
2063             if (s >= 0 && (s = task.status) >= 0) {
2064                 helpSignal(task, joiner.poolIndex);
2065                 if ((s = task.status) >= 0 &&
2066                     (task instanceof CountedCompleter))
2067                     s = helpComplete(task, LIFO_QUEUE);
2068             }
2069             if (s >= 0 && joiner.isEmpty()) {
2070                 do {} while (task.status >= 0 &&
2071                              tryHelpStealer(joiner, task) > 0);
2072             }
2073             joiner.currentJoin = prevJoin;
2074         }
2075     }
2076 
2077     /**
2078      * Returns a (probably) non-empty steal queue, if one is found
2079      * during a scan, else null.  This method must be retried by
2080      * caller if, by the time it tries to use the queue, it is empty.

2081      * @param r a (random) seed for scanning
2082      */
2083     private WorkQueue findNonEmptyStealQueue(int r) {
2084         for (;;) {
2085             int ps = plock, m; WorkQueue[] ws; WorkQueue q;
2086             if ((ws = workQueues) != null && (m = ws.length - 1) >= 0) {
2087                 for (int j = (m + 1) << 2; j >= 0; --j) {
2088                     if ((q = ws[(((r + j) << 1) | 1) & m]) != null &&
2089                         q.base - q.top < 0)



2090                         return q;
2091                 }
2092             }
2093             if (plock == ps)
2094                 return null;

2095         }
2096     }


2097 
2098     /**
2099      * Runs tasks until {@code isQuiescent()}. We piggyback on
2100      * active count ctl maintenance, but rather than blocking
2101      * when tasks cannot be found, we rescan until all others cannot
2102      * find tasks either.
2103      */
2104     final void helpQuiescePool(WorkQueue w) {
2105         for (boolean active = true;;) {
2106             long c; WorkQueue q; ForkJoinTask<?> t; int b;
2107             while ((t = w.nextLocalTask()) != null) {
2108                 if (w.base - w.top < 0)
2109                     signalWork(w);
2110                 t.doExec();
2111             }
2112             if ((q = findNonEmptyStealQueue(w.nextSeed())) != null) {
2113                 if (!active) {      // re-establish active count

2114                     active = true;
2115                     do {} while (!U.compareAndSwapLong
2116                                  (this, CTL, c = ctl, c + AC_UNIT));
2117                 }
2118                 if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) {
2119                     if (q.base - q.top < 0)
2120                         signalWork(q);
2121                     w.runSubtask(t);
2122                 }
2123             }
2124             else if (active) {       // decrement active count without queuing
2125                 long nc = (c = ctl) - AC_UNIT;
2126                 if ((int)(nc >> AC_SHIFT) + (config & SMASK) == 0)
2127                     return;          // bypass decrement-then-increment
2128                 if (U.compareAndSwapLong(this, CTL, c, nc))
2129                     active = false;


2130             }
2131             else if ((int)((c = ctl) >> AC_SHIFT) + (config & SMASK) == 0 &&
2132                      U.compareAndSwapLong(this, CTL, c, c + AC_UNIT))
2133                 return;



2134         }
2135     }


2136 
2137     /**
2138      * Gets and removes a local or stolen task for the given worker.
2139      *
2140      * @return a task, if available
2141      */
2142     final ForkJoinTask<?> nextTaskFor(WorkQueue w) {
2143         for (ForkJoinTask<?> t;;) {
2144             WorkQueue q; int b;
2145             if ((t = w.nextLocalTask()) != null)
2146                 return t;
2147             if ((q = findNonEmptyStealQueue(w.nextSeed())) == null)
2148                 return null;
2149             if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) {
2150                 if (q.base - q.top < 0)
2151                     signalWork(q);
2152                 return t;
2153             }
2154         }
2155     }
2156 
2157     /**
2158      * Returns a cheap heuristic guide for task partitioning when
2159      * programmers, frameworks, tools, or languages have little or no
2160      * idea about task granularity.  In essence by offering this
2161      * method, we ask users only about tradeoffs in overhead vs
2162      * expected throughput and its variance, rather than how finely to
2163      * partition tasks.
2164      *
2165      * In a steady state strict (tree-structured) computation, each
2166      * thread makes available for stealing enough tasks for other
2167      * threads to remain active. Inductively, if all threads play by
2168      * the same rules, each thread should make available only a
2169      * constant number of tasks.
2170      *
2171      * The minimum useful constant is just 1. But using a value of 1
2172      * would require immediate replenishment upon each steal to
2173      * maintain enough tasks, which is infeasible.  Further,
2174      * partitionings/granularities of offered tasks should minimize
2175      * steal rates, which in general means that threads nearer the top
2176      * of computation tree should generate more than those nearer the
2177      * bottom. In perfect steady state, each thread is at
2178      * approximately the same level of computation tree. However,
2179      * producing extra tasks amortizes the uncertainty of progress and
2180      * diffusion assumptions.
2181      *
2182      * So, users will want to use values larger (but not much larger)
2183      * than 1 to both smooth over transient shortages and hedge
2184      * against uneven progress; as traded off against the cost of
2185      * extra task overhead. We leave the user to pick a threshold
2186      * value to compare with the results of this call to guide
2187      * decisions, but recommend values such as 3.
2188      *
2189      * When all threads are active, it is on average OK to estimate
2190      * surplus strictly locally. In steady-state, if one thread is
2191      * maintaining say 2 surplus tasks, then so are others. So we can
2192      * just use estimated queue length.  However, this strategy alone
2193      * leads to serious mis-estimates in some non-steady-state
2194      * conditions (ramp-up, ramp-down, other stalls). We can detect
2195      * many of these by further considering the number of "idle"
2196      * threads, that are known to have zero queued tasks, so
2197      * compensate by a factor of (#idle/#active) threads.
2198      *
2199      * Note: The approximation of #busy workers as #active workers is
2200      * not very good under current signalling scheme, and should be
2201      * improved.
2202      */


2215         return 0;
2216     }
2217 
2218     //  Termination
2219 
2220     /**
2221      * Possibly initiates and/or completes termination.  The caller
2222      * triggering termination runs three passes through workQueues:
2223      * (0) Setting termination status, followed by wakeups of queued
2224      * workers; (1) cancelling all tasks; (2) interrupting lagging
2225      * threads (likely in external tasks, but possibly also blocked in
2226      * joins).  Each pass repeats previous steps because of potential
2227      * lagging thread creation.
2228      *
2229      * @param now if true, unconditionally terminate, else only
2230      * if no work and no active workers
2231      * @param enable if true, enable shutdown when next possible
2232      * @return true if now terminating or terminated
2233      */
2234     private boolean tryTerminate(boolean now, boolean enable) {
2235         int ps;
2236         if (this == common)                    // cannot shut down
2237             return false;
2238         if ((ps = plock) >= 0) {                   // enable by setting plock
2239             if (!enable)
2240                 return false;
2241             if ((ps & PL_LOCK) != 0 ||
2242                 !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
2243                 ps = acquirePlock();
2244             int nps = ((ps + PL_LOCK) & ~SHUTDOWN) | SHUTDOWN;
2245             if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
2246                 releasePlock(nps);
2247         }
2248         for (long c;;) {
2249             if (((c = ctl) & STOP_BIT) != 0) {     // already terminating
2250                 if ((short)(c >>> TC_SHIFT) == -(config & SMASK)) {
2251                     synchronized (this) {
2252                         notifyAll();               // signal when 0 workers
2253                     }
2254                 }
2255                 return true;
2256             }










2257             if (!now) {                            // check if idle & no tasks
2258                 WorkQueue[] ws; WorkQueue w;
2259                 if ((int)(c >> AC_SHIFT) != -(config & SMASK))
2260                     return false;
2261                 if ((ws = workQueues) != null) {
2262                     for (int i = 0; i < ws.length; ++i) {
2263                         if ((w = ws[i]) != null) {
2264                             if (!w.isEmpty()) {    // signal unprocessed tasks
2265                                 signalWork(w);
2266                                 return false;
2267                             }
2268                             if ((i & 1) != 0 && w.eventCount >= 0)
2269                                 return false;      // unqueued inactive worker
2270                         }
2271                     }
2272                 }
2273             }
2274             if (U.compareAndSwapLong(this, CTL, c, c | STOP_BIT)) {
2275                 for (int pass = 0; pass < 3; ++pass) {
2276                     WorkQueue[] ws; WorkQueue w; Thread wt;
2277                     if ((ws = workQueues) != null) {

2278                         int n = ws.length;
2279                         for (int i = 0; i < n; ++i) {
2280                             if ((w = ws[i]) != null) {
2281                                 w.qlock = -1;
2282                                 if (pass > 0) {
2283                                     w.cancelAll();
2284                                     if (pass > 1 && (wt = w.owner) != null) {
2285                                         if (!wt.isInterrupted()) {
2286                                             try {
2287                                                 wt.interrupt();
2288                                             } catch (Throwable ignore) {
2289                                             }
2290                                         }
2291                                         U.unpark(wt);
2292                                     }
2293                                 }
2294                             }
2295                         }
2296                         // Wake up workers parked on event queue
2297                         int i, e; long cc; Thread p;
2298                         while ((e = (int)(cc = ctl) & E_MASK) != 0 &&
2299                                (i = e & SMASK) < n && i >= 0 &&
2300                                (w = ws[i]) != null) {
2301                             long nc = ((long)(w.nextWait & E_MASK) |
2302                                        ((cc + AC_UNIT) & AC_MASK) |
2303                                        (cc & (TC_MASK|STOP_BIT)));
2304                             if (w.eventCount == (e | INT_SIGN) &&
2305                                 U.compareAndSwapLong(this, CTL, cc, nc)) {
2306                                 w.eventCount = (e + E_SEQ) & E_MASK;
2307                                 w.qlock = -1;
2308                                 if ((p = w.parker) != null)
2309                                     U.unpark(p);
2310                             }
2311                         }
2312                     }
2313                 }
2314             }
2315         }
2316     }
2317 
2318     // external operations on common pool
2319 
2320     /**
2321      * Returns common pool queue for a thread that has submitted at
2322      * least one task.
2323      */
2324     static WorkQueue commonSubmitterQueue() {
2325         ForkJoinPool p; WorkQueue[] ws; int m, z;
2326         return ((z = ThreadLocalRandom.getProbe()) != 0 &&
2327                 (p = common) != null &&
2328                 (ws = p.workQueues) != null &&
2329                 (m = ws.length - 1) >= 0) ?
2330             ws[m & z & SQMASK] : null;
2331     }
2332 
2333     /**
2334      * Tries to pop the given task from submitter's queue in common pool.
2335      */
2336     static boolean tryExternalUnpush(ForkJoinTask<?> t) {
2337         ForkJoinPool p; WorkQueue[] ws; WorkQueue q;
2338         ForkJoinTask<?>[] a;  int m, s, z;
2339         if (t != null &&
2340             (z = ThreadLocalRandom.getProbe()) != 0 &&
2341             (p = common) != null &&
2342             (ws = p.workQueues) != null &&
2343             (m = ws.length - 1) >= 0 &&
2344             (q = ws[m & z & SQMASK]) != null &&
2345             (s = q.top) != q.base &&
2346             (a = q.array) != null) {
2347             long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE;
2348             if (U.getObject(a, j) == t &&
2349                 U.compareAndSwapInt(q, QLOCK, 0, 1)) {
2350                 if (q.array == a && q.top == s && // recheck
2351                     U.compareAndSwapObject(a, j, t, null)) {
2352                     q.top = s - 1;
2353                     q.qlock = 0;
2354                     return true;
2355                 }
2356                 q.qlock = 0;
2357             }
2358         }
2359         return false;
2360     }
2361 
2362     /**
2363      * Tries to pop and run local tasks within the same computation
2364      * as the given root. On failure, tries to help complete from


2376                         (o instanceof CountedCompleter)) {
2377                         CountedCompleter<?> t = (CountedCompleter<?>)o, r = t;
2378                         do {
2379                             if (r == root) {
2380                                 if (U.compareAndSwapInt(q, QLOCK, 0, 1)) {
2381                                     if (q.array == a && q.top == s &&
2382                                         U.compareAndSwapObject(a, j, t, null)) {
2383                                         q.top = s - 1;
2384                                         task = t;
2385                                     }
2386                                     q.qlock = 0;
2387                                 }
2388                                 break;
2389                             }
2390                         } while ((r = r.completer) != null);
2391                     }
2392                 }
2393                 if (task != null)
2394                     task.doExec();
2395                 if (root.status < 0 ||
2396                     (config != 0 &&
2397                      ((u = (int)(ctl >>> 32)) >= 0 || (u >> UAC_SHIFT) >= 0)))
2398                     break;
2399                if (task == null) {
2400                     helpSignal(root, q.poolIndex);
2401                     if (root.status >= 0)
2402                         helpComplete(root, SHARED_QUEUE);
2403                     break;
2404                 }
2405             }
2406         }
2407     }
2408 
2409     /**
2410      * Tries to help execute or signal availability of the given task
2411      * from submitter's queue in common pool.
2412      */
2413     static void externalHelpJoin(ForkJoinTask<?> t) {
2414         // Some hard-to-avoid overlap with tryExternalUnpush
2415         ForkJoinPool p; WorkQueue[] ws; WorkQueue q, w;
2416         ForkJoinTask<?>[] a;  int m, s, n, z;
2417         if (t != null &&
2418             (z = ThreadLocalRandom.getProbe()) != 0 &&
2419             (p = common) != null &&
2420             (ws = p.workQueues) != null &&
2421             (m = ws.length - 1) >= 0 &&
2422             (q = ws[m & z & SQMASK]) != null &&
2423             (a = q.array) != null) {
2424             int am = a.length - 1;
2425             if ((s = q.top) != q.base) {
2426                 long j = ((am & (s - 1)) << ASHIFT) + ABASE;
2427                 if (U.getObject(a, j) == t &&
2428                     U.compareAndSwapInt(q, QLOCK, 0, 1)) {
2429                     if (q.array == a && q.top == s &&
2430                         U.compareAndSwapObject(a, j, t, null)) {
2431                         q.top = s - 1;
2432                         q.qlock = 0;
2433                         t.doExec();
2434                     }
2435                     else
2436                         q.qlock = 0;
2437                 }
2438             }
2439             if (t.status >= 0) {
2440                 if (t instanceof CountedCompleter)
2441                     p.externalHelpComplete(q, t);
2442                 else
2443                     p.helpSignal(t, q.poolIndex);
2444             }
2445         }
2446     }
2447 












2448     // Exported methods
2449 
2450     // Constructors
2451 
2452     /**
2453      * Creates a {@code ForkJoinPool} with parallelism equal to {@link
2454      * java.lang.Runtime#availableProcessors}, using the {@linkplain
2455      * #defaultForkJoinWorkerThreadFactory default thread factory},
2456      * no UncaughtExceptionHandler, and non-async LIFO processing mode.
2457      *
2458      * @throws SecurityException if a security manager exists and
2459      *         the caller is not permitted to modify threads
2460      *         because it does not hold {@link
2461      *         java.lang.RuntimePermission}{@code ("modifyThread")}
2462      */
2463     public ForkJoinPool() {
2464         this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
2465              defaultForkJoinWorkerThreadFactory, null, false);
2466     }
2467 
2468     /**
2469      * Creates a {@code ForkJoinPool} with the indicated parallelism
2470      * level, the {@linkplain
2471      * #defaultForkJoinWorkerThreadFactory default thread factory},
2472      * no UncaughtExceptionHandler, and non-async LIFO processing mode.
2473      *
2474      * @param parallelism the parallelism level
2475      * @throws IllegalArgumentException if parallelism less than or
2476      *         equal to zero, or greater than implementation limit
2477      * @throws SecurityException if a security manager exists and
2478      *         the caller is not permitted to modify threads
2479      *         because it does not hold {@link
2480      *         java.lang.RuntimePermission}{@code ("modifyThread")}
2481      */
2482     public ForkJoinPool(int parallelism) {
2483         this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
2484     }


2492      * use {@link #defaultForkJoinWorkerThreadFactory}.
2493      * @param handler the handler for internal worker threads that
2494      * terminate due to unrecoverable errors encountered while executing
2495      * tasks. For default value, use {@code null}.
2496      * @param asyncMode if true,
2497      * establishes local first-in-first-out scheduling mode for forked
2498      * tasks that are never joined. This mode may be more appropriate
2499      * than default locally stack-based mode in applications in which
2500      * worker threads only process event-style asynchronous tasks.
2501      * For default value, use {@code false}.
2502      * @throws IllegalArgumentException if parallelism less than or
2503      *         equal to zero, or greater than implementation limit
2504      * @throws NullPointerException if the factory is null
2505      * @throws SecurityException if a security manager exists and
2506      *         the caller is not permitted to modify threads
2507      *         because it does not hold {@link
2508      *         java.lang.RuntimePermission}{@code ("modifyThread")}
2509      */
2510     public ForkJoinPool(int parallelism,
2511                         ForkJoinWorkerThreadFactory factory,
2512                         UncaughtExceptionHandler handler,
2513                         boolean asyncMode) {
2514         this(checkParallelism(parallelism),
2515              checkFactory(factory),
2516              handler,
2517              asyncMode,
2518              "ForkJoinPool-" + nextPoolId() + "-worker-");
2519         checkPermission();
2520     }
2521 
2522     private static int checkParallelism(int parallelism) {
2523         if (parallelism <= 0 || parallelism > MAX_CAP)
2524             throw new IllegalArgumentException();
2525         return parallelism;









2526     }
2527 
2528     private static ForkJoinWorkerThreadFactory checkFactory
2529         (ForkJoinWorkerThreadFactory factory) {
2530         if (factory == null)
2531             throw new NullPointerException();
2532         return factory;
2533     }
2534 
2535     /**
2536      * Creates a {@code ForkJoinPool} with the given parameters, without
2537      * any security checks or parameter validation.  Invoked directly by
2538      * makeCommonPool.
2539      */
2540     private ForkJoinPool(int parallelism,
2541                          ForkJoinWorkerThreadFactory factory,
2542                          UncaughtExceptionHandler handler,
2543                          boolean asyncMode,
2544                          String workerNamePrefix) {
2545         this.workerNamePrefix = workerNamePrefix;
2546         this.factory = factory;
2547         this.ueh = handler;
2548         this.config = parallelism | (asyncMode ? (FIFO_QUEUE << 16) : 0);
2549         long np = (long)(-parallelism); // offset ctl counts
2550         this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
2551     }
2552 
2553     /**
2554      * Returns the common pool instance. This pool is statically
2555      * constructed; its run state is unaffected by attempts to {@link
2556      * #shutdown} or {@link #shutdownNow}. However this pool and any
2557      * ongoing processing are automatically terminated upon program
2558      * {@link System#exit}.  Any program that relies on asynchronous
2559      * task processing to complete before program termination should
2560      * invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence},
2561      * before exit.
2562      *
2563      * @return the common pool instance
2564      * @since 1.8
2565      */
2566     public static ForkJoinPool commonPool() {
2567         // assert common != null : "static init error";
2568         return common;
2569     }
2570 
2571     // Execution methods
2572 
2573     /**
2574      * Performs the given task, returning its result upon completion.
2575      * If the computation encounters an unchecked Exception or Error,
2576      * it is rethrown as the outcome of this invocation.  Rethrown
2577      * exceptions behave in the same way as regular exceptions, but,
2578      * when possible, contain stack traces (as displayed for example
2579      * using {@code ex.printStackTrace()}) of both the current thread
2580      * as well as the thread actually encountering the exception;
2581      * minimally only the latter.
2582      *
2583      * @param task the task
2584      * @return the task's result
2585      * @throws NullPointerException if the task is null
2586      * @throws RejectedExecutionException if the task cannot be
2587      *         scheduled for execution
2588      */


2604     public void execute(ForkJoinTask<?> task) {
2605         if (task == null)
2606             throw new NullPointerException();
2607         externalPush(task);
2608     }
2609 
2610     // AbstractExecutorService methods
2611 
2612     /**
2613      * @throws NullPointerException if the task is null
2614      * @throws RejectedExecutionException if the task cannot be
2615      *         scheduled for execution
2616      */
2617     public void execute(Runnable task) {
2618         if (task == null)
2619             throw new NullPointerException();
2620         ForkJoinTask<?> job;
2621         if (task instanceof ForkJoinTask<?>) // avoid re-wrap
2622             job = (ForkJoinTask<?>) task;
2623         else
2624             job = new ForkJoinTask.RunnableExecuteAction(task);
2625         externalPush(job);
2626     }
2627 
2628     /**
2629      * Submits a ForkJoinTask for execution.
2630      *
2631      * @param task the task to submit
2632      * @return the task
2633      * @throws NullPointerException if the task is null
2634      * @throws RejectedExecutionException if the task cannot be
2635      *         scheduled for execution
2636      */
2637     public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
2638         if (task == null)
2639             throw new NullPointerException();
2640         externalPush(task);
2641         return task;
2642     }
2643 
2644     /**


2671     public ForkJoinTask<?> submit(Runnable task) {
2672         if (task == null)
2673             throw new NullPointerException();
2674         ForkJoinTask<?> job;
2675         if (task instanceof ForkJoinTask<?>) // avoid re-wrap
2676             job = (ForkJoinTask<?>) task;
2677         else
2678             job = new ForkJoinTask.AdaptedRunnableAction(task);
2679         externalPush(job);
2680         return job;
2681     }
2682 
2683     /**
2684      * @throws NullPointerException       {@inheritDoc}
2685      * @throws RejectedExecutionException {@inheritDoc}
2686      */
2687     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
2688         // In previous versions of this class, this method constructed
2689         // a task to run ForkJoinTask.invokeAll, but now external
2690         // invocation of multiple tasks is at least as efficient.
2691         ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());




2692 
2693         boolean done = false;
2694         try {
2695             for (Callable<T> t : tasks) {
2696                 ForkJoinTask<T> f = new ForkJoinTask.AdaptedCallable<T>(t);
2697                 futures.add(f);
2698                 externalPush(f);

2699             }
2700             for (int i = 0, size = futures.size(); i < size; i++)
2701                 ((ForkJoinTask<?>)futures.get(i)).quietlyJoin();
2702             done = true;
2703             return futures;
2704         } finally {
2705             if (!done)
2706                 for (int i = 0, size = futures.size(); i < size; i++)
2707                     futures.get(i).cancel(false);
2708         }
2709     }
2710 
2711     /**
2712      * Returns the factory used for constructing new workers.
2713      *
2714      * @return the factory used for constructing new workers
2715      */
2716     public ForkJoinWorkerThreadFactory getFactory() {
2717         return factory;
2718     }
2719 
2720     /**
2721      * Returns the handler for internal worker threads that terminate
2722      * due to unrecoverable errors encountered while executing tasks.
2723      *
2724      * @return the handler, or {@code null} if none
2725      */
2726     public UncaughtExceptionHandler getUncaughtExceptionHandler() {
2727         return ueh;
2728     }
2729 
2730     /**
2731      * Returns the targeted parallelism level of this pool.
2732      *
2733      * @return the targeted parallelism level of this pool
2734      */
2735     public int getParallelism() {
2736         int par = (config & SMASK);
2737         return (par > 0) ? par : 1;
2738     }
2739 
2740     /**
2741      * Returns the targeted parallelism level of the common pool.
2742      *
2743      * @return the targeted parallelism level of the common pool
2744      * @since 1.8
2745      */
2746     public static int getCommonPoolParallelism() {
2747         return commonParallelism;
2748     }
2749 
2750     /**
2751      * Returns the number of worker threads that have started but not
2752      * yet terminated.  The result returned by this method may differ
2753      * from {@link #getParallelism} when threads are created to
2754      * maintain parallelism when others are cooperatively blocked.
2755      *
2756      * @return the number of worker threads
2757      */
2758     public int getPoolSize() {
2759         return (config & SMASK) + (short)(ctl >>> TC_SHIFT);
2760     }
2761 
2762     /**
2763      * Returns {@code true} if this pool uses local first-in-first-out
2764      * scheduling mode for forked tasks that are never joined.
2765      *
2766      * @return {@code true} if this pool uses async mode
2767      */


2985         if ((c & STOP_BIT) != 0)
2986             level = (tc == 0) ? "Terminated" : "Terminating";
2987         else
2988             level = plock < 0 ? "Shutting down" : "Running";
2989         return super.toString() +
2990             "[" + level +
2991             ", parallelism = " + pc +
2992             ", size = " + tc +
2993             ", active = " + ac +
2994             ", running = " + rc +
2995             ", steals = " + st +
2996             ", tasks = " + qt +
2997             ", submissions = " + qs +
2998             "]";
2999     }
3000 
3001     /**
3002      * Possibly initiates an orderly shutdown in which previously
3003      * submitted tasks are executed, but no new tasks will be
3004      * accepted. Invocation has no effect on execution state if this
3005      * is the {@link #commonPool()}, and no additional effect if
3006      * already shut down.  Tasks that are in the process of being
3007      * submitted concurrently during the course of this method may or
3008      * may not be rejected.
3009      *
3010      * @throws SecurityException if a security manager exists and
3011      *         the caller is not permitted to modify threads
3012      *         because it does not hold {@link
3013      *         java.lang.RuntimePermission}{@code ("modifyThread")}
3014      */
3015     public void shutdown() {
3016         checkPermission();
3017         tryTerminate(false, true);
3018     }
3019 
3020     /**
3021      * Possibly attempts to cancel and/or stop all tasks, and reject
3022      * all subsequently submitted tasks.  Invocation has no effect on
3023      * execution state if this is the {@link #commonPool()}, and no
3024      * additional effect if already shut down. Otherwise, tasks that
3025      * are in the process of being submitted or executed concurrently
3026      * during the course of this method may or may not be
3027      * rejected. This method cancels both existing and unexecuted
3028      * tasks, in order to permit termination in the presence of task
3029      * dependencies. So the method always returns an empty list
3030      * (unlike the case for some other Executors).
3031      *
3032      * @return an empty list
3033      * @throws SecurityException if a security manager exists and
3034      *         the caller is not permitted to modify threads
3035      *         because it does not hold {@link
3036      *         java.lang.RuntimePermission}{@code ("modifyThread")}
3037      */
3038     public List<Runnable> shutdownNow() {
3039         checkPermission();
3040         tryTerminate(true, true);
3041         return Collections.emptyList();
3042     }
3043 


3066      * @return {@code true} if terminating but not yet terminated
3067      */
3068     public boolean isTerminating() {
3069         long c = ctl;
3070         return ((c & STOP_BIT) != 0L &&
3071                 (short)(c >>> TC_SHIFT) != -(config & SMASK));
3072     }
3073 
3074     /**
3075      * Returns {@code true} if this pool has been shut down.
3076      *
3077      * @return {@code true} if this pool has been shut down
3078      */
3079     public boolean isShutdown() {
3080         return plock < 0;
3081     }
3082 
3083     /**
3084      * Blocks until all tasks have completed execution after a
3085      * shutdown request, or the timeout occurs, or the current thread
3086      * is interrupted, whichever happens first. Because the {@link
3087      * #commonPool()} never terminates until program shutdown, when
3088      * applied to the common pool, this method is equivalent to {@link
3089      * #awaitQuiescence(long, TimeUnit)} but always returns {@code false}.
3090      *
3091      * @param timeout the maximum time to wait
3092      * @param unit the time unit of the timeout argument
3093      * @return {@code true} if this executor terminated and
3094      *         {@code false} if the timeout elapsed before termination
3095      * @throws InterruptedException if interrupted while waiting
3096      */
3097     public boolean awaitTermination(long timeout, TimeUnit unit)
3098         throws InterruptedException {
3099         if (Thread.interrupted())
3100             throw new InterruptedException();
3101         if (this == common) {
3102             awaitQuiescence(timeout, unit);
3103             return false;
3104         }
3105         long nanos = unit.toNanos(timeout);
3106         if (isTerminated())
3107             return true;
3108         long startTime = System.nanoTime();
3109         boolean terminated = false;
3110         synchronized (this) {
3111             for (long waitTime = nanos, millis = 0L;;) {
3112                 if (terminated = isTerminated() ||
3113                     waitTime <= 0L ||
3114                     (millis = unit.toMillis(waitTime)) <= 0L)
3115                     break;
3116                 wait(millis);
3117                 waitTime = nanos - (System.nanoTime() - startTime);
3118             }
3119         }
3120         return terminated;
3121     }
3122 
3123     /**
3124      * If called by a ForkJoinTask operating in this pool, equivalent
3125      * in effect to {@link ForkJoinTask#helpQuiesce}. Otherwise,
3126      * waits and/or attempts to assist performing tasks until this
3127      * pool {@link #isQuiescent} or the indicated timeout elapses.
3128      *
3129      * @param timeout the maximum time to wait
3130      * @param unit the time unit of the timeout argument
3131      * @return {@code true} if quiescent; {@code false} if the
3132      * timeout elapsed.
3133      */
3134     public boolean awaitQuiescence(long timeout, TimeUnit unit) {
3135         long nanos = unit.toNanos(timeout);
3136         ForkJoinWorkerThread wt;
3137         Thread thread = Thread.currentThread();
3138         if ((thread instanceof ForkJoinWorkerThread) &&
3139             (wt = (ForkJoinWorkerThread)thread).pool == this) {
3140             helpQuiescePool(wt.workQueue);
3141             return true;
3142         }
3143         long startTime = System.nanoTime();
3144         WorkQueue[] ws;
3145         int r = 0, m;
3146         boolean found = true;
3147         while (!isQuiescent() && (ws = workQueues) != null &&
3148                (m = ws.length - 1) >= 0) {
3149             if (!found) {
3150                 if ((System.nanoTime() - startTime) > nanos)
3151                     return false;
3152                 Thread.yield(); // cannot block
3153             }
3154             found = false;
3155             for (int j = (m + 1) << 2; j >= 0; --j) {
3156                 ForkJoinTask<?> t; WorkQueue q; int b;
3157                 if ((q = ws[r++ & m]) != null && (b = q.base) - q.top < 0) {
3158                     found = true;
3159                     if ((t = q.pollAt(b)) != null) {
3160                         if (q.base - q.top < 0)
3161                             signalWork(q);
3162                         t.doExec();
3163                     }
3164                     break;
3165                 }
3166             }
3167         }
3168         return true;
3169     }
3170 
3171     /**
3172      * Waits and/or attempts to assist performing tasks indefinitely
3173      * until the {@link #commonPool()} {@link #isQuiescent}.
3174      */
3175     static void quiesceCommonPool() {
3176         common.awaitQuiescence(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
3177     }
3178 
3179     /**
3180      * Interface for extending managed parallelism for tasks running
3181      * in {@link ForkJoinPool}s.
3182      *
3183      * <p>A {@code ManagedBlocker} provides two methods.  Method
3184      * {@code isReleasable} must return {@code true} if blocking is
3185      * not necessary. Method {@code block} blocks the current thread
3186      * if necessary (perhaps internally invoking {@code isReleasable}
3187      * before actually blocking). These actions are performed by any
3188      * thread invoking {@link ForkJoinPool#managedBlock(ManagedBlocker)}.
3189      * The unusual methods in this API accommodate synchronizers that
3190      * may, but don't usually, block for long periods. Similarly, they
3191      * allow more efficient internal handling of cases in which
3192      * additional workers may be, but usually are not, needed to
3193      * ensure sufficient parallelism.  Toward this end,
3194      * implementations of method {@code isReleasable} must be amenable
3195      * to repeated invocation.
3196      *
3197      * <p>For example, here is a ManagedBlocker based on a
3198      * ReentrantLock:
3199      *  <pre> {@code
3200      * class ManagedLocker implements ManagedBlocker {
3201      *   final ReentrantLock lock;
3202      *   boolean hasLock = false;
3203      *   ManagedLocker(ReentrantLock lock) { this.lock = lock; }
3204      *   public boolean block() {
3205      *     if (!hasLock)
3206      *       lock.lock();
3207      *     return true;
3208      *   }
3209      *   public boolean isReleasable() {
3210      *     return hasLock || (hasLock = lock.tryLock());


3228      *   }
3229      *   public E getItem() { // call after pool.managedBlock completes
3230      *     return item;
3231      *   }
3232      * }}</pre>
3233      */
3234     public static interface ManagedBlocker {
3235         /**
3236          * Possibly blocks the current thread, for example waiting for
3237          * a lock or condition.
3238          *
3239          * @return {@code true} if no additional blocking is necessary
3240          * (i.e., if isReleasable would return true)
3241          * @throws InterruptedException if interrupted while waiting
3242          * (the method is not required to do so, but is allowed to)
3243          */
3244         boolean block() throws InterruptedException;
3245 
3246         /**
3247          * Returns {@code true} if blocking is unnecessary.
3248          * @return {@code true} if blocking is unnecessary
3249          */
3250         boolean isReleasable();
3251     }
3252 
3253     /**
3254      * Blocks in accord with the given blocker.  If the current thread
3255      * is a {@link ForkJoinWorkerThread}, this method possibly
3256      * arranges for a spare thread to be activated if necessary to
3257      * ensure sufficient parallelism while the current thread is blocked.
3258      *
3259      * <p>If the caller is not a {@link ForkJoinTask}, this method is
3260      * behaviorally equivalent to
3261      *  <pre> {@code
3262      * while (!blocker.isReleasable())
3263      *   if (blocker.block())
3264      *     return;
3265      * }</pre>
3266      *
3267      * If the caller is a {@code ForkJoinTask}, then the pool may
3268      * first be expanded to ensure parallelism, and later adjusted.


3313     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
3314         return new ForkJoinTask.AdaptedRunnable<T>(runnable, value);
3315     }
3316 
3317     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
3318         return new ForkJoinTask.AdaptedCallable<T>(callable);
3319     }
3320 
3321     // Unsafe mechanics
3322     private static final sun.misc.Unsafe U;
3323     private static final long CTL;
3324     private static final long PARKBLOCKER;
3325     private static final int ABASE;
3326     private static final int ASHIFT;
3327     private static final long STEALCOUNT;
3328     private static final long PLOCK;
3329     private static final long INDEXSEED;
3330     private static final long QLOCK;
3331 
3332     static {
3333         // initialize field offsets for CAS etc
3334         try {
3335             U = sun.misc.Unsafe.getUnsafe();
3336             Class<?> k = ForkJoinPool.class;
3337             CTL = U.objectFieldOffset
3338                 (k.getDeclaredField("ctl"));
3339             STEALCOUNT = U.objectFieldOffset
3340                 (k.getDeclaredField("stealCount"));
3341             PLOCK = U.objectFieldOffset
3342                 (k.getDeclaredField("plock"));
3343             INDEXSEED = U.objectFieldOffset
3344                 (k.getDeclaredField("indexSeed"));
3345             Class<?> tk = Thread.class;
3346             PARKBLOCKER = U.objectFieldOffset
3347                 (tk.getDeclaredField("parkBlocker"));
3348             Class<?> wk = WorkQueue.class;
3349             QLOCK = U.objectFieldOffset
3350                 (wk.getDeclaredField("qlock"));
3351             Class<?> ak = ForkJoinTask[].class;
3352             ABASE = U.arrayBaseOffset(ak);
3353             int scale = U.arrayIndexScale(ak);
3354             if ((scale & (scale - 1)) != 0)
3355                 throw new Error("data type scale not a power of two");
3356             ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
3357         } catch (Exception e) {
3358             throw new Error(e);
3359         }


3360 
3361         defaultForkJoinWorkerThreadFactory =

3362             new DefaultForkJoinWorkerThreadFactory();
3363         modifyThreadPermission = new RuntimePermission("modifyThread");
3364 
3365         common = java.security.AccessController.doPrivileged
3366             (new java.security.PrivilegedAction<ForkJoinPool>() {
3367                 public ForkJoinPool run() { return makeCommonPool(); }});
3368         int par = common.config; // report 1 even if threads disabled
3369         commonParallelism = par > 0 ? par : 1;
3370     }
3371 
3372     /**
3373      * Creates and returns the common pool, respecting user settings
3374      * specified via system properties.
3375      */
3376     private static ForkJoinPool makeCommonPool() {
3377         int parallelism = -1;
3378         ForkJoinWorkerThreadFactory factory
3379             = defaultForkJoinWorkerThreadFactory;
3380         UncaughtExceptionHandler handler = null;
3381         try {  // ignore exceptions in accesing/parsing properties
3382             String pp = System.getProperty
3383                 ("java.util.concurrent.ForkJoinPool.common.parallelism");


3384             String fp = System.getProperty
3385                 ("java.util.concurrent.ForkJoinPool.common.threadFactory");
3386             String hp = System.getProperty
3387                 ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
3388             if (pp != null)
3389                 parallelism = Integer.parseInt(pp);
3390             if (fp != null)
3391                 factory = ((ForkJoinWorkerThreadFactory)ClassLoader.
3392                            getSystemClassLoader().loadClass(fp).newInstance());
3393             if (hp != null)
3394                 handler = ((UncaughtExceptionHandler)ClassLoader.
3395                            getSystemClassLoader().loadClass(hp).newInstance());


3396         } catch (Exception ignore) {
3397         }
3398 
3399         if (parallelism < 0)
3400             parallelism = Runtime.getRuntime().availableProcessors();
3401         if (parallelism > MAX_CAP)
3402             parallelism = MAX_CAP;
3403         return new ForkJoinPool(parallelism, factory, handler, false,
3404                                 "ForkJoinPool.commonPool-worker-");



3405     }
3406 
3407 }