18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19 *
20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21 * or visit www.oracle.com if you need additional information or have any
22 * questions.
23 */
24
25 /*
26 * This file is available under and governed by the GNU General Public
27 * License version 2 only, as published by the Free Software Foundation.
28 * However, the following notice accompanied the original version of this
29 * file:
30 *
31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 * Expert Group and released to the public domain, as explained at
33 * http://creativecommons.org/publicdomain/zero/1.0/
34 */
35
36 package java.util.concurrent;
37
38 import java.util.ArrayList;
39 import java.util.Arrays;
40 import java.util.Collection;
41 import java.util.Collections;
42 import java.util.List;
43 import java.util.concurrent.AbstractExecutorService;
44 import java.util.concurrent.Callable;
45 import java.util.concurrent.ExecutorService;
46 import java.util.concurrent.Future;
47 import java.util.concurrent.RejectedExecutionException;
48 import java.util.concurrent.RunnableFuture;
49 import java.util.concurrent.TimeUnit;
50
51 /**
52 * An {@link ExecutorService} for running {@link ForkJoinTask}s.
53 * A {@code ForkJoinPool} provides the entry point for submissions
54 * from non-{@code ForkJoinTask} clients, as well as management and
55 * monitoring operations.
56 *
57 * <p>A {@code ForkJoinPool} differs from other kinds of {@link
87 * <p>In addition to execution and lifecycle control methods, this
88 * class provides status check methods (for example
89 * {@link #getStealCount}) that are intended to aid in developing,
90 * tuning, and monitoring fork/join applications. Also, method
91 * {@link #toString} returns indications of pool state in a
92 * convenient form for informal monitoring.
93 *
94 * <p>As is the case with other ExecutorServices, there are three
95 * main task execution methods summarized in the following table.
96 * These are designed to be used primarily by clients not already
97 * engaged in fork/join computations in the current pool. The main
98 * forms of these methods accept instances of {@code ForkJoinTask},
99 * but overloaded forms also allow mixed execution of plain {@code
100 * Runnable}- or {@code Callable}- based activities as well. However,
101 * tasks that are already executing in a pool should normally instead
102 * use the within-computation forms listed in the table unless using
103 * async event-style tasks that are not usually joined, in which case
104 * there is little difference among choice of methods.
105 *
106 * <table BORDER CELLPADDING=3 CELLSPACING=1>
107 * <tr>
108 * <td></td>
109 * <td ALIGN=CENTER> <b>Call from non-fork/join clients</b></td>
110 * <td ALIGN=CENTER> <b>Call from within fork/join computations</b></td>
111 * </tr>
112 * <tr>
113 * <td> <b>Arrange async execution</td>
114 * <td> {@link #execute(ForkJoinTask)}</td>
115 * <td> {@link ForkJoinTask#fork}</td>
116 * </tr>
117 * <tr>
118 * <td> <b>Await and obtain result</td>
119 * <td> {@link #invoke(ForkJoinTask)}</td>
120 * <td> {@link ForkJoinTask#invoke}</td>
121 * </tr>
122 * <tr>
123 * <td> <b>Arrange exec and obtain Future</td>
124 * <td> {@link #submit(ForkJoinTask)}</td>
125 * <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td>
126 * </tr>
127 * </table>
128 *
129 * <p>The common pool is by default constructed with default
130 * parameters, but these may be controlled by setting three {@link
131 * System#getProperty system properties} with prefix {@code
132 * java.util.concurrent.ForkJoinPool.common}: {@code parallelism} --
133 * an integer greater than zero, {@code threadFactory} -- the class
134 * name of a {@link ForkJoinWorkerThreadFactory}, and {@code
135 * exceptionHandler} -- the class name of a {@link
136 * java.lang.Thread.UncaughtExceptionHandler
137 * Thread.UncaughtExceptionHandler}. Upon any error in establishing
138 * these settings, default parameters are used.
139 *
140 * <p><b>Implementation notes</b>: This implementation restricts the
141 * maximum number of running threads to 32767. Attempts to create
142 * pools with greater than the maximum number result in
143 * {@code IllegalArgumentException}.
144 *
145 * <p>This implementation rejects submitted tasks (that is, by throwing
146 * {@link RejectedExecutionException}) only when the pool is shut down
147 * or internal resources have been exhausted.
148 *
149 * @since 1.7
150 * @author Doug Lea
151 */
152 public class ForkJoinPool extends AbstractExecutorService {
153
154 /*
155 * Implementation Overview
156 *
157 * This class and its nested classes provide the main
158 * functionality and control for a set of worker threads:
208 * base index, else consider alternative actions, rather than
209 * method poll.)
210 *
211 * This approach also enables support of a user mode in which local
212 * task processing is in FIFO, not LIFO order, simply by using
213 * poll rather than pop. This can be useful in message-passing
214 * frameworks in which tasks are never joined. However neither
215 * mode considers affinities, loads, cache localities, etc, so
216 * rarely provide the best possible performance on a given
217 * machine, but portably provide good throughput by averaging over
218 * these factors. (Further, even if we did try to use such
219 * information, we do not usually have a basis for exploiting it.
220 * For example, some sets of tasks profit from cache affinities,
221 * but others are harmed by cache pollution effects.)
222 *
223 * WorkQueues are also used in a similar way for tasks submitted
224 * to the pool. We cannot mix these tasks in the same queues used
225 * for work-stealing (this would contaminate lifo/fifo
226 * processing). Instead, we randomly associate submission queues
227 * with submitting threads, using a form of hashing. The
228 * ThreadLocal Submitter class contains a value initially used as
229 * a hash code for choosing existing queues, but may be randomly
230 * repositioned upon contention with other submitters. In
231 * essence, submitters act like workers except that they are
232 * restricted to executing local tasks that they submitted (or in
233 * the case of CountedCompleters, others with the same root task).
234 * However, because most shared/external queue operations are more
235 * expensive than internal, and because, at steady state, external
236 * submitters will compete for CPU with workers, ForkJoinTask.join
237 * and related methods disable them from repeatedly helping to
238 * process tasks if all workers are active. Insertion of tasks in
239 * shared mode requires a lock (mainly to protect in the case of
240 * resizing) but we use only a simple spinlock (using bits in
241 * field qlock), because submitters encountering a busy queue move
242 * on to try or create other queues -- they block only when
243 * creating and registering new queues.
244 *
245 * Management
246 * ==========
247 *
248 * The main throughput advantages of work-stealing stem from
249 * decentralized control -- workers mostly take tasks from
250 * themselves or each other. We cannot negate this in the
251 * implementation of other management responsibilities. The main
252 * tactic for avoiding bottlenecks is packing nearly all
253 * essentially atomic control state into two volatile variables
254 * that are by far most often read (not written) as status and
255 * consistency checks.
256 *
257 * Field "ctl" contains 64 bits holding all the information needed
258 * to atomically decide to add, inactivate, enqueue (on an event
259 * queue), dequeue, and/or re-activate workers. To enable this
452 * helping opportunities is challenging to control on JVMs, where
453 * GC and other activities can stall progress of tasks that in
454 * turn stall out many other dependent tasks, without us being
455 * able to determine whether they will ever require compensation.
456 * Even though work-stealing otherwise encounters little
457 * degradation in the presence of more threads than cores,
458 * aggressively adding new threads in such cases entails risk of
459 * unwanted positive feedback control loops in which more threads
460 * cause more dependent stalls (as well as delayed progress of
461 * unblocked threads to the point that we know they are available)
462 * leading to more situations requiring more threads, and so
463 * on. This aspect of control can be seen as an (analytically
464 * intractable) game with an opponent that may choose the worst
465 * (for us) active thread to stall at any time. We take several
466 * precautions to bound losses (and thus bound gains), mainly in
467 * methods tryCompensate and awaitJoin.
468 *
469 * Common Pool
470 * ===========
471 *
472 * The static commonPool always exists after static
473 * initialization. Since it (or any other created pool) need
474 * never be used, we minimize initial construction overhead and
475 * footprint to the setup of about a dozen fields, with no nested
476 * allocation. Most bootstrapping occurs within method
477 * fullExternalPush during the first submission to the pool.
478 *
479 * When external threads submit to the common pool, they can
480 * perform some subtask processing (see externalHelpJoin and
481 * related methods). We do not need to record whether these
482 * submissions are to the common pool -- if not, externalHelpJoin
483 * returns quickly (at the most helping to signal some common pool
484 * workers). These submitters would otherwise be blocked waiting
485 * for completion, so the extra effort (with liberally sprinkled
486 * task status checks) in inapplicable cases amounts to an odd
487 * form of limited spin-wait before blocking in ForkJoinTask.join.
488 *
489 * Style notes
490 * ===========
491 *
492 * There is a lot of representation-level coupling among classes
531 private static void checkPermission() {
532 SecurityManager security = System.getSecurityManager();
533 if (security != null)
534 security.checkPermission(modifyThreadPermission);
535 }
536
537 // Nested classes
538
539 /**
540 * Factory for creating new {@link ForkJoinWorkerThread}s.
541 * A {@code ForkJoinWorkerThreadFactory} must be defined and used
542 * for {@code ForkJoinWorkerThread} subclasses that extend base
543 * functionality or initialize threads with different contexts.
544 */
545 public static interface ForkJoinWorkerThreadFactory {
546 /**
547 * Returns a new worker thread operating in the given pool.
548 *
549 * @param pool the pool this thread works in
550 * @throws NullPointerException if the pool is null
551 */
552 public ForkJoinWorkerThread newThread(ForkJoinPool pool);
553 }
554
555 /**
556 * Default ForkJoinWorkerThreadFactory implementation; creates a
557 * new ForkJoinWorkerThread.
558 */
559 static final class DefaultForkJoinWorkerThreadFactory
560 implements ForkJoinWorkerThreadFactory {
561 public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
562 return new ForkJoinWorkerThread(pool);
563 }
564 }
565
566 /**
567 * Per-thread records for threads that submit to pools. Currently
568 * holds only pseudo-random seed / index that is used to choose
569 * submission queues in method externalPush. In the future, this may
570 * also incorporate a means to implement different task rejection
571 * and resubmission policies.
572 *
573 * Seeds for submitters and workers/workQueues work in basically
574 * the same way but are initialized and updated using slightly
575 * different mechanics. Both are initialized using the same
576 * approach as in class ThreadLocal, where successive values are
577 * unlikely to collide with previous values. Seeds are then
578 * randomly modified upon collisions using xorshifts, which
579 * requires a non-zero seed.
580 */
581 static final class Submitter {
582 int seed;
583 Submitter(int s) { seed = s; }
584 }
585
586 /**
587 * Class for artificial tasks that are used to replace the target
588 * of local joins if they are removed from an interior queue slot
589 * in WorkQueue.tryRemoveAndExec. We don't need the proxy to
590 * actually do anything beyond having a unique identity.
591 */
592 static final class EmptyTask extends ForkJoinTask<Void> {
593 private static final long serialVersionUID = -7721805057305804111L;
594 EmptyTask() { status = ForkJoinTask.NORMAL; } // force done
595 public final Void getRawResult() { return null; }
596 public final void setRawResult(Void x) {}
597 public final boolean exec() { return true; }
598 }
599
600 /**
601 * Queues supporting work-stealing as well as external task
602 * submission. See above for main rationale and algorithms.
603 * Implementation relies heavily on "Unsafe" intrinsics
604 * and selective use of "volatile":
605 *
606 * Field "base" is the index (mod array.length) of the least valid
720 * Provides a more accurate estimate of whether this queue has
721 * any tasks than does queueSize, by checking whether a
722 * near-empty queue has at least one unclaimed task.
723 */
724 final boolean isEmpty() {
725 ForkJoinTask<?>[] a; int m, s;
726 int n = base - (s = top);
727 return (n >= 0 ||
728 (n == -1 &&
729 ((a = array) == null ||
730 (m = a.length - 1) < 0 ||
731 U.getObject
732 (a, (long)((m & (s - 1)) << ASHIFT) + ABASE) == null)));
733 }
734
735 /**
736 * Pushes a task. Call only by owner in unshared queues. (The
737 * shared-queue version is embedded in method externalPush.)
738 *
739 * @param task the task. Caller must ensure non-null.
740 * @throw RejectedExecutionException if array cannot be resized
741 */
742 final void push(ForkJoinTask<?> task) {
743 ForkJoinTask<?>[] a; ForkJoinPool p;
744 int s = top, m, n;
745 if ((a = array) != null) { // ignore if queue removed
746 int j = (((m = a.length - 1) & s) << ASHIFT) + ABASE;
747 U.putOrderedObject(a, j, task);
748 if ((n = (top = s + 1) - base) <= 2) {
749 if ((p = pool) != null)
750 p.signalWork(this);
751 }
752 else if (n >= m)
753 growArray();
754 }
755 }
756
757 /**
758 * Initializes or doubles the capacity of array. Call either
759 * by owner or with lock held -- it is OK for base, but not
760 * top, to move while resizings are in progress.
919 if (U.compareAndSwapObject(a, j, t, null)) {
920 top = s;
921 t.doExec();
922 }
923 }
924 }
925
926 /**
927 * Polls and runs tasks until empty.
928 */
929 private void pollAndExecAll() {
930 for (ForkJoinTask<?> t; (t = poll()) != null;)
931 t.doExec();
932 }
933
934 /**
935 * If present, removes from queue and executes the given task,
936 * or any other cancelled task. Returns (true) on any CAS
937 * or consistency check failure so caller can retry.
938 *
939 * @return false if no progress can be made, else true;
940 */
941 final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
942 boolean stat = true, removed = false, empty = true;
943 ForkJoinTask<?>[] a; int m, s, b, n;
944 if ((a = array) != null && (m = a.length - 1) >= 0 &&
945 (n = (s = top) - (b = base)) > 0) {
946 for (ForkJoinTask<?> t;;) { // traverse from s to b
947 int j = ((--s & m) << ASHIFT) + ABASE;
948 t = (ForkJoinTask<?>)U.getObjectVolatile(a, j);
949 if (t == null) // inconsistent length
950 break;
951 else if (t == task) {
952 if (s + 1 == top) { // pop
953 if (!U.compareAndSwapObject(a, j, task, null))
954 break;
955 top = s;
956 removed = true;
957 }
958 else if (base == b) // replace with proxy
959 removed = U.compareAndSwapObject(a, j, task,
964 empty = false;
965 else if (s + 1 == top) { // pop and throw away
966 if (U.compareAndSwapObject(a, j, t, null))
967 top = s;
968 break;
969 }
970 if (--n == 0) {
971 if (!empty && base == b)
972 stat = false;
973 break;
974 }
975 }
976 }
977 if (removed)
978 task.doExec();
979 return stat;
980 }
981
982 /**
983 * Polls for and executes the given task or any other task in
984 * its CountedCompleter computation
985 */
986 final boolean pollAndExecCC(ForkJoinTask<?> root) {
987 ForkJoinTask<?>[] a; int b; Object o;
988 outer: while ((b = base) - top < 0 && (a = array) != null) {
989 long j = (((a.length - 1) & b) << ASHIFT) + ABASE;
990 if ((o = U.getObject(a, j)) == null ||
991 !(o instanceof CountedCompleter))
992 break;
993 for (CountedCompleter<?> t = (CountedCompleter<?>)o, r = t;;) {
994 if (r == root) {
995 if (base == b &&
996 U.compareAndSwapObject(a, j, t, null)) {
997 base = b + 1;
998 t.doExec();
999 return true;
1000 }
1001 else
1002 break; // restart
1003 }
1004 if ((r = r.completer) == null)
1038 }
1039
1040 /**
1041 * Returns true if owned and not known to be blocked.
1042 */
1043 final boolean isApparentlyUnblocked() {
1044 Thread wt; Thread.State s;
1045 return (eventCount >= 0 &&
1046 (wt = owner) != null &&
1047 (s = wt.getState()) != Thread.State.BLOCKED &&
1048 s != Thread.State.WAITING &&
1049 s != Thread.State.TIMED_WAITING);
1050 }
1051
1052 // Unsafe mechanics
1053 private static final sun.misc.Unsafe U;
1054 private static final long QLOCK;
1055 private static final int ABASE;
1056 private static final int ASHIFT;
1057 static {
1058 int s;
1059 try {
1060 U = sun.misc.Unsafe.getUnsafe();
1061 Class<?> k = WorkQueue.class;
1062 Class<?> ak = ForkJoinTask[].class;
1063 QLOCK = U.objectFieldOffset
1064 (k.getDeclaredField("qlock"));
1065 ABASE = U.arrayBaseOffset(ak);
1066 s = U.arrayIndexScale(ak);
1067 } catch (Exception e) {
1068 throw new Error(e);
1069 }
1070 if ((s & (s-1)) != 0)
1071 throw new Error("data type scale not a power of two");
1072 ASHIFT = 31 - Integer.numberOfLeadingZeros(s);
1073 }
1074 }
1075
1076 // static fields (initialized in static initializer below)
1077
1078 /**
1079 * Creates a new ForkJoinWorkerThread. This factory is used unless
1080 * overridden in ForkJoinPool constructors.
1081 */
1082 public static final ForkJoinWorkerThreadFactory
1083 defaultForkJoinWorkerThreadFactory;
1084
1085 /**
1086 * Per-thread submission bookkeeping. Shared across all pools
1087 * to reduce ThreadLocal pollution and because random motion
1088 * to avoid contention in one pool is likely to hold for others.
1089 * Lazily initialized on first submission (but null-checked
1090 * in other contexts to avoid unnecessary initialization).
1091 */
1092 static final ThreadLocal<Submitter> submitters;
1093
1094 /**
1095 * Permission required for callers of methods that may start or
1096 * kill threads.
1097 */
1098 private static final RuntimePermission modifyThreadPermission;
1099
1100 /**
1101 * Common (static) pool. Non-null for public use unless a static
1102 * construction exception, but internal usages null-check on use
1103 * to paranoically avoid potential initialization circularities
1104 * as well as to simplify generated code.
1105 */
1106 static final ForkJoinPool commonPool;
1107
1108 /**
1109 * Common pool parallelism. Must equal commonPool.parallelism.
1110 */
1111 static final int commonPoolParallelism;
1112
1113 /**
1114 * Sequence number for creating workerNamePrefix.
1115 */
1116 private static int poolNumberSequence;
1117
1118 /**
1119 * Return the next sequence number. We don't expect this to
1120 * ever contend so use simple builtin sync.
1121 */
1122 private static final synchronized int nextPoolId() {
1123 return ++poolNumberSequence;
1124 }
1125
1126 // static constants
1127
1128 /**
1129 * Initial timeout value (in nanoseconds) for the thread
1130 * triggering quiescence to park waiting for new work. On timeout,
1131 * the thread will instead try to shrink the number of
1132 * workers. The value should be large enough to avoid overly
1133 * aggressive shrinkage during most transient stalls (long GCs
1134 * etc).
1135 */
1136 private static final long IDLE_TIMEOUT = 2000L * 1000L * 1000L; // 2sec
1137
1138 /**
1139 * Timeout value when there are more threads than parallelism level
1140 */
1251
1252 // Instance fields
1253
1254 /*
1255 * Field layout of this class tends to matter more than one would
1256 * like. Runtime layout order is only loosely related to
1257 * declaration order and may differ across JVMs, but the following
1258 * empirically works OK on current JVMs.
1259 */
1260
1261 // Heuristic padding to ameliorate unfortunate memory placements
1262 volatile long pad00, pad01, pad02, pad03, pad04, pad05, pad06;
1263
1264 volatile long stealCount; // collects worker counts
1265 volatile long ctl; // main pool control
1266 volatile int plock; // shutdown status and seqLock
1267 volatile int indexSeed; // worker/submitter index seed
1268 final int config; // mode and parallelism level
1269 WorkQueue[] workQueues; // main registry
1270 final ForkJoinWorkerThreadFactory factory;
1271 final Thread.UncaughtExceptionHandler ueh; // per-worker UEH
1272 final String workerNamePrefix; // to create worker name string
1273
1274 volatile Object pad10, pad11, pad12, pad13, pad14, pad15, pad16, pad17;
1275 volatile Object pad18, pad19, pad1a, pad1b;
1276
1277 /*
1278 * Acquires the plock lock to protect worker array and related
1279 * updates. This method is called only if an initial CAS on plock
1280 * fails. This acts as a spinLock for normal cases, but falls back
1281 * to builtin monitor to block when (rarely) needed. This would be
1282 * a terrible idea for a highly contended lock, but works fine as
1283 * a more conservative alternative to a pure spinlock.
1284 */
1285 private int acquirePlock() {
1286 int spins = PL_SPINS, r = 0, ps, nps;
1287 for (;;) {
1288 if (((ps = plock) & PL_LOCK) == 0 &&
1289 U.compareAndSwapInt(this, PLOCK, ps, nps = ps + PL_LOCK))
1290 return nps;
1291 else if (r == 0) { // randomize spins if possible
1292 Thread t = Thread.currentThread(); WorkQueue w; Submitter z;
1293 if ((t instanceof ForkJoinWorkerThread) &&
1294 (w = ((ForkJoinWorkerThread)t).workQueue) != null)
1295 r = w.seed;
1296 else if ((z = submitters.get()) != null)
1297 r = z.seed;
1298 else
1299 r = 1;
1300 }
1301 else if (spins >= 0) {
1302 r ^= r << 1; r ^= r >>> 3; r ^= r << 10; // xorshift
1303 if (r >= 0)
1304 --spins;
1305 }
1306 else if (U.compareAndSwapInt(this, PLOCK, ps, ps | PL_SIGNAL)) {
1307 synchronized (this) {
1308 if ((plock & PL_SIGNAL) != 0) {
1309 try {
1310 wait();
1311 } catch (InterruptedException ie) {
1312 try {
1313 Thread.currentThread().interrupt();
1314 } catch (SecurityException ignore) {
1315 }
1316 }
1317 }
1318 else
1319 notifyAll();
1320 }
1321 }
1322 }
1323 }
1324
1325 /**
1326 * Unlocks and signals any thread waiting for plock. Called only
1327 * when CAS of seq value for unlock fails.
1328 */
1329 private void releasePlock(int ps) {
1330 plock = ps;
1331 synchronized (this) { notifyAll(); }
1332 }
1333
1334 /**
1335 * Performs secondary initialization, called when plock is zero.
1336 * Creates workQueue array and sets plock to a valid value. The
1337 * lock body must be exception-free (so no try/finally) so we
1338 * optimistically allocate new array outside the lock and throw
1339 * away if (very rarely) not needed. (A similar tactic is used in
1340 * fullExternalPush.) Because the plock seq value can eventually
1341 * wrap around zero, this method harmlessly fails to reinitialize
1342 * if workQueues exists, while still advancing plock.
1343 *
1344 * Additionally tries to create the first worker.
1345 */
1346 private void initWorkers() {
1347 WorkQueue[] ws, nws; int ps;
1348 int p = config & SMASK; // find power of two table size
1349 int n = (p > 1) ? p - 1 : 1; // ensure at least 2 slots
1350 n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16;
1351 n = (n + 1) << 1;
1352 if ((ws = workQueues) == null || ws.length == 0)
1353 nws = new WorkQueue[n];
1354 else
1355 nws = null;
1356 if (((ps = plock) & PL_LOCK) != 0 ||
1357 !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
1358 ps = acquirePlock();
1359 if (((ws = workQueues) == null || ws.length == 0) && nws != null)
1360 workQueues = nws;
1361 int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
1362 if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
1363 releasePlock(nps);
1364 tryAddWorker();
1365 }
1366
1367 /**
1368 * Tries to create and start one worker if fewer than target
1369 * parallelism level exist. Adjusts counts etc on failure.
1370 */
1371 private void tryAddWorker() {
1372 long c; int u;
1373 while ((u = (int)((c = ctl) >>> 32)) < 0 &&
1374 (u & SHORT_SIGN) != 0 && (int)c == 0) {
1375 long nc = (long)(((u + UTC_UNIT) & UTC_MASK) |
1376 ((u + UAC_UNIT) & UAC_MASK)) << 32;
1377 if (U.compareAndSwapLong(this, CTL, c, nc)) {
1378 ForkJoinWorkerThreadFactory fac;
1379 Throwable ex = null;
1380 ForkJoinWorkerThread wt = null;
1381 try {
1382 if ((fac = factory) != null &&
1383 (wt = fac.newThread(this)) != null) {
1384 wt.start();
1385 break;
1386 }
1387 } catch (Throwable e) {
1389 }
1390 deregisterWorker(wt, ex);
1391 break;
1392 }
1393 }
1394 }
1395
1396 // Registering and deregistering workers
1397
1398 /**
1399 * Callback from ForkJoinWorkerThread to establish and record its
1400 * WorkQueue. To avoid scanning bias due to packing entries in
1401 * front of the workQueues array, we treat the array as a simple
1402 * power-of-two hash table using per-thread seed as hash,
1403 * expanding as needed.
1404 *
1405 * @param wt the worker thread
1406 * @return the worker's queue
1407 */
1408 final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
1409 Thread.UncaughtExceptionHandler handler; WorkQueue[] ws; int s, ps;
1410 wt.setDaemon(true);
1411 if ((handler = ueh) != null)
1412 wt.setUncaughtExceptionHandler(handler);
1413 do {} while (!U.compareAndSwapInt(this, INDEXSEED, s = indexSeed,
1414 s += SEED_INCREMENT) ||
1415 s == 0); // skip 0
1416 WorkQueue w = new WorkQueue(this, wt, config >>> 16, s);
1417 if (((ps = plock) & PL_LOCK) != 0 ||
1418 !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
1419 ps = acquirePlock();
1420 int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
1421 try {
1422 if ((ws = workQueues) != null) { // skip if shutting down
1423 int n = ws.length, m = n - 1;
1424 int r = (s << 1) | 1; // use odd-numbered indices
1425 if (ws[r &= m] != null) { // collision
1426 int probes = 0; // step by approx half size
1427 int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
1428 while (ws[r = (r + step) & m] != null) {
1429 if (++probes >= n) {
1433 }
1434 }
1435 }
1436 w.eventCount = w.poolIndex = r; // volatile write orders
1437 ws[r] = w;
1438 }
1439 } finally {
1440 if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
1441 releasePlock(nps);
1442 }
1443 wt.setName(workerNamePrefix.concat(Integer.toString(w.poolIndex)));
1444 return w;
1445 }
1446
1447 /**
1448 * Final callback from terminating worker, as well as upon failure
1449 * to construct or start a worker. Removes record of worker from
1450 * array, and adjusts counts. If pool is shutting down, tries to
1451 * complete termination.
1452 *
1453 * @param wt the worker thread or null if construction failed
1454 * @param ex the exception causing failure, or null if none
1455 */
1456 final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
1457 WorkQueue w = null;
1458 if (wt != null && (w = wt.workQueue) != null) {
1459 int ps;
1460 w.qlock = -1; // ensure set
1461 long ns = w.nsteals, sc; // collect steal count
1462 do {} while (!U.compareAndSwapLong(this, STEALCOUNT,
1463 sc = stealCount, sc + ns));
1464 if (((ps = plock) & PL_LOCK) != 0 ||
1465 !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
1466 ps = acquirePlock();
1467 int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
1468 try {
1469 int idx = w.poolIndex;
1470 WorkQueue[] ws = workQueues;
1471 if (ws != null && idx >= 0 && idx < ws.length && ws[idx] == w)
1472 ws[idx] = null;
1473 } finally {
1474 if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
1475 releasePlock(nps);
1476 }
1477 }
1478
1479 long c; // adjust ctl counts
1480 do {} while (!U.compareAndSwapLong
1481 (this, CTL, c = ctl, (((c - AC_UNIT) & AC_MASK) |
1482 ((c - TC_UNIT) & TC_MASK) |
1483 (c & ~(AC_MASK|TC_MASK)))));
1484
1485 if (!tryTerminate(false, false) && w != null && w.array != null) {
1486 w.cancelAll(); // cancel remaining tasks
1487 WorkQueue[] ws; WorkQueue v; Thread p; int u, i, e;
1488 while ((u = (int)((c = ctl) >>> 32)) < 0 && (e = (int)c) >= 0) {
1489 if (e > 0) { // activate or create replacement
1490 if ((ws = workQueues) == null ||
1491 (i = e & SMASK) >= ws.length ||
1492 (v = ws[i]) != null)
1493 break;
1494 long nc = (((long)(v.nextWait & E_MASK)) |
1495 ((long)(u + UAC_UNIT) << 32));
1496 if (v.eventCount != (e | INT_SIGN))
1497 break;
1498 if (U.compareAndSwapLong(this, CTL, c, nc)) {
1499 v.eventCount = (e + E_SEQ) & E_MASK;
1500 if ((p = v.parker) != null)
1501 U.unpark(p);
1502 break;
1503 }
1504 }
1505 else {
1506 if ((short)u < 0)
1507 tryAddWorker();
1508 break;
1509 }
1510 }
1511 }
1512 if (ex == null) // help clean refs on way out
1513 ForkJoinTask.helpExpungeStaleExceptions();
1514 else // rethrow
1515 ForkJoinTask.rethrow(ex);
1516 }
1517
1518 // Submissions
1519
1520 /**
1521 * Unless shutting down, adds the given task to a submission queue
1522 * at submitter's current queue index (modulo submission
1523 * range). Only the most common path is directly handled in this
1524 * method. All others are relayed to fullExternalPush.
1525 *
1526 * @param task the task. Caller must ensure non-null.
1527 */
1528 final void externalPush(ForkJoinTask<?> task) {
1529 WorkQueue[] ws; WorkQueue q; Submitter z; int m; ForkJoinTask<?>[] a;
1530 if ((z = submitters.get()) != null && plock > 0 &&
1531 (ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
1532 (q = ws[m & z.seed & SQMASK]) != null &&
1533 U.compareAndSwapInt(q, QLOCK, 0, 1)) { // lock
1534 int b = q.base, s = q.top, n, an;
1535 if ((a = q.array) != null && (an = a.length) > (n = s + 1 - b)) {
1536 int j = (((an - 1) & s) << ASHIFT) + ABASE;
1537 U.putOrderedObject(a, j, task);
1538 q.top = s + 1; // push on to deque
1539 q.qlock = 0;
1540 if (n <= 2)
1541 signalWork(q);
1542 return;
1543 }
1544 q.qlock = 0;
1545 }
1546 fullExternalPush(task);
1547 }
1548
1549 /**
1550 * Full version of externalPush. This method is called, among
1551 * other times, upon the first submission of the first task to the
1552 * pool, so must perform secondary initialization (via
1553 * initWorkers). It also detects first submission by an external
1554 * thread by looking up its ThreadLocal, and creates a new shared
1555 * queue if the one at index if empty or contended. The plock lock
1556 * body must be exception-free (so no try/finally) so we
1557 * optimistically allocate new queues outside the lock and throw
1558 * them away if (very rarely) not needed.
1559 */
1560 private void fullExternalPush(ForkJoinTask<?> task) {
1561 int r = 0; // random index seed
1562 for (Submitter z = submitters.get();;) {
1563 WorkQueue[] ws; WorkQueue q; int ps, m, k;
1564 if (z == null) {
1565 if (U.compareAndSwapInt(this, INDEXSEED, r = indexSeed,
1566 r += SEED_INCREMENT) && r != 0)
1567 submitters.set(z = new Submitter(r));
1568 }
1569 else if (r == 0) { // move to a different index
1570 r = z.seed;
1571 r ^= r << 13; // same xorshift as WorkQueues
1572 r ^= r >>> 17;
1573 z.seed = r ^ (r << 5);
1574 }
1575 else if ((ps = plock) < 0)
1576 throw new RejectedExecutionException();
1577 else if (ps == 0 || (ws = workQueues) == null ||
1578 (m = ws.length - 1) < 0)
1579 initWorkers();
1580 else if ((q = ws[k = r & m & SQMASK]) != null) {
1581 if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
1582 ForkJoinTask<?>[] a = q.array;
1583 int s = q.top;
1584 boolean submitted = false;
1585 try { // locked version of push
1586 if ((a != null && a.length > s + 1 - q.base) ||
1587 (a = q.growArray()) != null) { // must presize
1588 int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
1589 U.putOrderedObject(a, j, task);
1590 q.top = s + 1;
1591 submitted = true;
1592 }
1593 } finally {
1594 q.qlock = 0; // unlock
1595 }
1596 if (submitted) {
1597 signalWork(q);
1598 return;
1599 }
1600 }
1601 r = 0; // move on failure
1602 }
1603 else if (((ps = plock) & PL_LOCK) == 0) { // create new queue
1604 q = new WorkQueue(this, null, SHARED_QUEUE, r);
1605 if (((ps = plock) & PL_LOCK) != 0 ||
1606 !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
1607 ps = acquirePlock();
1608 if ((ws = workQueues) != null && k < ws.length && ws[k] == null)
1609 ws[k] = q;
1610 int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
1611 if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
1612 releasePlock(nps);
1613 }
1614 else
1615 r = 0; // try elsewhere while lock held
1616 }
1617 }
1618
1619 // Maintaining ctl counts
1620
1621 /**
1622 * Increments active count; mainly called upon return from blocking.
1623 */
1624 final void incrementActiveCount() {
1625 long c;
1626 do {} while (!U.compareAndSwapLong(this, CTL, c = ctl, c + AC_UNIT));
1627 }
1628
1629 /**
1630 * Tries to create or activate a worker if too few are active.
1631 *
1632 * @param q the (non-null) queue holding tasks to be signalled
1633 */
1634 final void signalWork(WorkQueue q) {
1635 int hint = q.poolIndex;
1686 * completing the sweep. If the worker is not inactivated, it
1687 * takes and returns a task from this queue. Otherwise, if not
1688 * activated, it signals workers (that may include itself) and
1689 * returns so caller can retry. Also returns for true if the
1690 * worker array may have changed during an empty scan. On failure
1691 * to find a task, we take one of the following actions, after
1692 * which the caller will retry calling this method unless
1693 * terminated.
1694 *
1695 * * If pool is terminating, terminate the worker.
1696 *
1697 * * If not already enqueued, try to inactivate and enqueue the
1698 * worker on wait queue. Or, if inactivating has caused the pool
1699 * to be quiescent, relay to idleAwaitWork to possibly shrink
1700 * pool.
1701 *
1702 * * If already enqueued and none of the above apply, possibly
1703 * park awaiting signal, else lingering to help scan and signal.
1704 *
1705 * * If a non-empty queue discovered or left as a hint,
1706 * help wake up other workers before return
1707 *
1708 * @param w the worker (via its WorkQueue)
1709 * @return a task or null if none found
1710 */
1711 private final ForkJoinTask<?> scan(WorkQueue w) {
1712 WorkQueue[] ws; int m;
1713 int ps = plock; // read plock before ws
1714 if (w != null && (ws = workQueues) != null && (m = ws.length - 1) >= 0) {
1715 int ec = w.eventCount; // ec is negative if inactive
1716 int r = w.seed; r ^= r << 13; r ^= r >>> 17; w.seed = r ^= r << 5;
1717 w.hint = -1; // update seed and clear hint
1718 int j = ((m + m + 1) | MIN_SCAN) & MAX_SCAN;
1719 do {
1720 WorkQueue q; ForkJoinTask<?>[] a; int b;
1721 if ((q = ws[(r + j) & m]) != null && (b = q.base) - q.top < 0 &&
1722 (a = q.array) != null) { // probably nonempty
1723 int i = (((a.length - 1) & b) << ASHIFT) + ABASE;
1724 ForkJoinTask<?> t = (ForkJoinTask<?>)
1725 U.getObjectVolatile(a, i);
1726 if (q.base == b && ec >= 0 && t != null &&
1741 if (U.compareAndSwapLong(this, STEALCOUNT,
1742 sc = stealCount, sc + ns))
1743 w.nsteals = 0; // collect steals and rescan
1744 }
1745 else if (plock != ps) // consistency check
1746 ; // skip
1747 else if ((e = (int)(c = ctl)) < 0)
1748 w.qlock = -1; // pool is terminating
1749 else {
1750 if ((h = w.hint) < 0) {
1751 if (ec >= 0) { // try to enqueue/inactivate
1752 long nc = (((long)ec |
1753 ((c - AC_UNIT) & (AC_MASK|TC_MASK))));
1754 w.nextWait = e; // link and mark inactive
1755 w.eventCount = ec | INT_SIGN;
1756 if (ctl != c || !U.compareAndSwapLong(this, CTL, c, nc))
1757 w.eventCount = ec; // unmark on CAS failure
1758 else if ((int)(c >> AC_SHIFT) == 1 - (config & SMASK))
1759 idleAwaitWork(w, nc, c);
1760 }
1761 else if (w.eventCount < 0 && !tryTerminate(false, false) &&
1762 ctl == c) { // block
1763 Thread wt = Thread.currentThread();
1764 Thread.interrupted(); // clear status
1765 U.putObject(wt, PARKBLOCKER, this);
1766 w.parker = wt; // emulate LockSupport.park
1767 if (w.eventCount < 0) // recheck
1768 U.park(false, 0L);
1769 w.parker = null;
1770 U.putObject(wt, PARKBLOCKER, null);
1771 }
1772 }
1773 if ((h >= 0 || (h = w.hint) >= 0) &&
1774 (ws = workQueues) != null && h < ws.length &&
1775 (q = ws[h]) != null) { // signal others before retry
1776 WorkQueue v; Thread p; int u, i, s;
1777 for (int n = (config & SMASK) >>> 1;;) {
1778 int idleCount = (w.eventCount < 0) ? 0 : -1;
1779 if (((s = idleCount - q.base + q.top) <= n &&
1780 (n = s) <= 0) ||
1781 (u = (int)((c = ctl) >>> 32)) >= 0 ||
1782 (e = (int)c) <= 0 || m < (i = e & SMASK) ||
1783 (v = ws[i]) == null)
1784 break;
1785 long nc = (((long)(v.nextWait & E_MASK)) |
1786 ((long)(u + UAC_UNIT) << 32));
1787 if (v.eventCount != (e | INT_SIGN) ||
1788 !U.compareAndSwapLong(this, CTL, c, nc))
1789 break;
1790 v.hint = h;
1791 v.eventCount = (e + E_SEQ) & E_MASK;
1792 if ((p = v.parker) != null)
1793 U.unpark(p);
1794 if (--n <= 0)
1795 break;
1796 }
1797 }
1798 }
1799 }
1800 return null;
1801 }
1802
1803 /**
1804 * If inactivating worker w has caused the pool to become
1805 * quiescent, checks for pool termination, and, so long as this is
1806 * not the only worker, waits for event for up to a given
1807 * duration. On timeout, if ctl has not changed, terminates the
1808 * worker, which will in turn wake up another worker to possibly
1809 * repeat this process.
1810 *
1811 * @param w the calling worker
1812 * @param currentCtl the ctl value triggering possible quiescence
1813 * @param prevCtl the ctl value to restore if thread is terminated
1814 */
1815 private void idleAwaitWork(WorkQueue w, long currentCtl, long prevCtl) {
1816 if (w != null && w.eventCount < 0 &&
1817 !tryTerminate(false, false) && (int)prevCtl != 0) {
1818 int dc = -(short)(currentCtl >>> TC_SHIFT);
1819 long parkTime = dc < 0 ? FAST_IDLE_TIMEOUT: (dc + 1) * IDLE_TIMEOUT;
1820 long deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
1821 Thread wt = Thread.currentThread();
1822 while (ctl == currentCtl) {
1823 Thread.interrupted(); // timed variant of version in scan()
1824 U.putObject(wt, PARKBLOCKER, this);
1825 w.parker = wt;
1826 if (ctl == currentCtl)
1827 U.park(false, parkTime);
1828 w.parker = null;
1829 U.putObject(wt, PARKBLOCKER, null);
1830 if (ctl != currentCtl)
1831 break;
1832 if (deadline - System.nanoTime() <= 0L &&
1833 U.compareAndSwapLong(this, CTL, currentCtl, prevCtl)) {
1834 w.eventCount = (w.eventCount + E_SEQ) | E_MASK;
1835 w.qlock = -1; // shrink
1836 break;
1837 }
1838 }
1839 }
1840 }
1841
1842 /**
1843 * Scans through queues looking for work while joining a task; if
1844 * any present, signals. May return early if more signalling is
1845 * detectably unneeded.
1846 *
1847 * @param task return early if done
1848 * @param origin an index to start scan
1849 */
1850 private void helpSignal(ForkJoinTask<?> task, int origin) {
1851 WorkQueue[] ws; WorkQueue w; Thread p; long c; int m, u, e, i, s;
1852 if (task != null && task.status >= 0 &&
1853 (u = (int)(ctl >>> 32)) < 0 && (u >> UAC_SHIFT) < 0 &&
1854 (ws = workQueues) != null && (m = ws.length - 1) >= 0) {
1956 else {
1957 subtask = next;
1958 j = v;
1959 break;
1960 }
1961 }
1962 }
1963 }
1964 }
1965 }
1966 return stat;
1967 }
1968
1969 /**
1970 * Analog of tryHelpStealer for CountedCompleters. Tries to steal
1971 * and run tasks within the target's computation.
1972 *
1973 * @param task the task to join
1974 * @param mode if shared, exit upon completing any task
1975 * if all workers are active
1976 *
1977 */
1978 private int helpComplete(ForkJoinTask<?> task, int mode) {
1979 WorkQueue[] ws; WorkQueue q; int m, n, s, u;
1980 if (task != null && (ws = workQueues) != null &&
1981 (m = ws.length - 1) >= 0) {
1982 for (int j = 1, origin = j;;) {
1983 if ((s = task.status) < 0)
1984 return s;
1985 if ((q = ws[j & m]) != null && q.pollAndExecCC(task)) {
1986 origin = j;
1987 if (mode == SHARED_QUEUE &&
1988 ((u = (int)(ctl >>> 32)) >= 0 || (u >> UAC_SHIFT) >= 0))
1989 break;
1990 }
1991 else if ((j = (j + 2) & m) == origin)
1992 break;
1993 }
1994 }
1995 return 0;
1996 }
2108 ForkJoinTask<?> prevJoin = joiner.currentJoin;
2109 joiner.currentJoin = task;
2110 do {} while ((s = task.status) >= 0 && !joiner.isEmpty() &&
2111 joiner.tryRemoveAndExec(task));
2112 if (s >= 0 && (s = task.status) >= 0) {
2113 helpSignal(task, joiner.poolIndex);
2114 if ((s = task.status) >= 0 &&
2115 (task instanceof CountedCompleter))
2116 s = helpComplete(task, LIFO_QUEUE);
2117 }
2118 if (s >= 0 && joiner.isEmpty()) {
2119 do {} while (task.status >= 0 &&
2120 tryHelpStealer(joiner, task) > 0);
2121 }
2122 joiner.currentJoin = prevJoin;
2123 }
2124 }
2125
2126 /**
2127 * Returns a (probably) non-empty steal queue, if one is found
2128 * during a random, then cyclic scan, else null. This method must
2129 * be retried by caller if, by the time it tries to use the queue,
2130 * it is empty.
2131 * @param r a (random) seed for scanning
2132 */
2133 private WorkQueue findNonEmptyStealQueue(int r) {
2134 for (WorkQueue[] ws;;) {
2135 int ps = plock, m, n;
2136 if ((ws = workQueues) == null || (m = ws.length - 1) < 1)
2137 return null;
2138 for (int j = (m + 1) << 2; ;) {
2139 WorkQueue q = ws[(((r + j) << 1) | 1) & m];
2140 if (q != null && (n = q.base - q.top) < 0) {
2141 if (n < -1)
2142 signalWork(q);
2143 return q;
2144 }
2145 else if (--j < 0) {
2146 if (plock == ps)
2147 return null;
2148 break;
2149 }
2150 }
2151 }
2152 }
2153
2154 /**
2155 * Runs tasks until {@code isQuiescent()}. We piggyback on
2156 * active count ctl maintenance, but rather than blocking
2157 * when tasks cannot be found, we rescan until all others cannot
2158 * find tasks either.
2159 */
2160 final void helpQuiescePool(WorkQueue w) {
2161 for (boolean active = true;;) {
2162 ForkJoinTask<?> localTask; // exhaust local queue
2163 while ((localTask = w.nextLocalTask()) != null)
2164 localTask.doExec();
2165 // Similar to loop in scan(), but ignoring submissions
2166 WorkQueue q = findNonEmptyStealQueue(w.nextSeed());
2167 if (q != null) {
2168 ForkJoinTask<?> t; int b;
2169 if (!active) { // re-establish active count
2170 long c;
2171 active = true;
2172 do {} while (!U.compareAndSwapLong
2173 (this, CTL, c = ctl, c + AC_UNIT));
2174 }
2175 if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null)
2176 w.runSubtask(t);
2177 }
2178 else {
2179 long c;
2180 if (active) { // decrement active count without queuing
2181 active = false;
2182 do {} while (!U.compareAndSwapLong
2183 (this, CTL, c = ctl, c -= AC_UNIT));
2184 }
2185 else
2186 c = ctl; // re-increment on exit
2187 if ((int)(c >> AC_SHIFT) + (config & SMASK) == 0) {
2188 do {} while (!U.compareAndSwapLong
2189 (this, CTL, c = ctl, c + AC_UNIT));
2190 break;
2191 }
2192 }
2193 }
2194 }
2195
2196 /**
2197 * Gets and removes a local or stolen task for the given worker.
2198 *
2199 * @return a task, if available
2200 */
2201 final ForkJoinTask<?> nextTaskFor(WorkQueue w) {
2202 for (ForkJoinTask<?> t;;) {
2203 WorkQueue q; int b;
2204 if ((t = w.nextLocalTask()) != null)
2205 return t;
2206 if ((q = findNonEmptyStealQueue(w.nextSeed())) == null)
2207 return null;
2208 if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null)
2209 return t;
2210 }
2211 }
2212
2213 /**
2214 * Returns a cheap heuristic guide for task partitioning when
2215 * programmers, frameworks, tools, or languages have little or no
2216 * idea about task granularity. In essence by offering this
2217 * method, we ask users only about tradeoffs in overhead vs
2218 * expected throughput and its variance, rather than how finely to
2219 * partition tasks.
2220 *
2221 * In a steady state strict (tree-structured) computation, each
2222 * thread makes available for stealing enough tasks for other
2223 * threads to remain active. Inductively, if all threads play by
2224 * the same rules, each thread should make available only a
2225 * constant number of tasks.
2226 *
2227 * The minimum useful constant is just 1. But using a value of 1
2228 * would require immediate replenishment upon each steal to
2229 * maintain enough tasks, which is infeasible. Further,
2230 * partitionings/granularities of offered tasks should minimize
2231 * steal rates, which in general means that threads nearer the top
2232 * of computation tree should generate more than those nearer the
2233 * bottom. In perfect steady state, each thread is at
2234 * approximately the same level of computation tree. However,
2235 * producing extra tasks amortizes the uncertainty of progress and
2236 * diffusion assumptions.
2237 *
2238 * So, users will want to use values larger, but not much larger
2239 * than 1 to both smooth over transient shortages and hedge
2240 * against uneven progress; as traded off against the cost of
2241 * extra task overhead. We leave the user to pick a threshold
2242 * value to compare with the results of this call to guide
2243 * decisions, but recommend values such as 3.
2244 *
2245 * When all threads are active, it is on average OK to estimate
2246 * surplus strictly locally. In steady-state, if one thread is
2247 * maintaining say 2 surplus tasks, then so are others. So we can
2248 * just use estimated queue length. However, this strategy alone
2249 * leads to serious mis-estimates in some non-steady-state
2250 * conditions (ramp-up, ramp-down, other stalls). We can detect
2251 * many of these by further considering the number of "idle"
2252 * threads, that are known to have zero queued tasks, so
2253 * compensate by a factor of (#idle/#active) threads.
2254 *
2255 * Note: The approximation of #busy workers as #active workers is
2256 * not very good under current signalling scheme, and should be
2257 * improved.
2258 */
2271 return 0;
2272 }
2273
2274 // Termination
2275
2276 /**
2277 * Possibly initiates and/or completes termination. The caller
2278 * triggering termination runs three passes through workQueues:
2279 * (0) Setting termination status, followed by wakeups of queued
2280 * workers; (1) cancelling all tasks; (2) interrupting lagging
2281 * threads (likely in external tasks, but possibly also blocked in
2282 * joins). Each pass repeats previous steps because of potential
2283 * lagging thread creation.
2284 *
2285 * @param now if true, unconditionally terminate, else only
2286 * if no work and no active workers
2287 * @param enable if true, enable shutdown when next possible
2288 * @return true if now terminating or terminated
2289 */
2290 private boolean tryTerminate(boolean now, boolean enable) {
2291 if (this == commonPool) // cannot shut down
2292 return false;
2293 for (long c;;) {
2294 if (((c = ctl) & STOP_BIT) != 0) { // already terminating
2295 if ((short)(c >>> TC_SHIFT) == -(config & SMASK)) {
2296 synchronized (this) {
2297 notifyAll(); // signal when 0 workers
2298 }
2299 }
2300 return true;
2301 }
2302 if (plock >= 0) { // not yet enabled
2303 int ps;
2304 if (!enable)
2305 return false;
2306 if (((ps = plock) & PL_LOCK) != 0 ||
2307 !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
2308 ps = acquirePlock();
2309 if (!U.compareAndSwapInt(this, PLOCK, ps, SHUTDOWN))
2310 releasePlock(SHUTDOWN);
2311 }
2312 if (!now) { // check if idle & no tasks
2313 if ((int)(c >> AC_SHIFT) != -(config & SMASK) ||
2314 hasQueuedSubmissions())
2315 return false;
2316 // Check for unqueued inactive workers. One pass suffices.
2317 WorkQueue[] ws = workQueues; WorkQueue w;
2318 if (ws != null) {
2319 for (int i = 1; i < ws.length; i += 2) {
2320 if ((w = ws[i]) != null && w.eventCount >= 0)
2321 return false;
2322 }
2323 }
2324 }
2325 if (U.compareAndSwapLong(this, CTL, c, c | STOP_BIT)) {
2326 for (int pass = 0; pass < 3; ++pass) {
2327 WorkQueue[] ws = workQueues;
2328 if (ws != null) {
2329 WorkQueue w; Thread wt;
2330 int n = ws.length;
2331 for (int i = 0; i < n; ++i) {
2332 if ((w = ws[i]) != null) {
2333 w.qlock = -1;
2334 if (pass > 0) {
2335 w.cancelAll();
2336 if (pass > 1 && (wt = w.owner) != null) {
2337 if (!wt.isInterrupted()) {
2338 try {
2339 wt.interrupt();
2340 } catch (SecurityException ignore) {
2341 }
2342 }
2343 U.unpark(wt);
2344 }
2345 }
2346 }
2347 }
2348 // Wake up workers parked on event queue
2349 int i, e; long cc; Thread p;
2350 while ((e = (int)(cc = ctl) & E_MASK) != 0 &&
2351 (i = e & SMASK) < n &&
2352 (w = ws[i]) != null) {
2353 long nc = ((long)(w.nextWait & E_MASK) |
2354 ((cc + AC_UNIT) & AC_MASK) |
2355 (cc & (TC_MASK|STOP_BIT)));
2356 if (w.eventCount == (e | INT_SIGN) &&
2357 U.compareAndSwapLong(this, CTL, cc, nc)) {
2358 w.eventCount = (e + E_SEQ) & E_MASK;
2359 w.qlock = -1;
2360 if ((p = w.parker) != null)
2361 U.unpark(p);
2362 }
2363 }
2364 }
2365 }
2366 }
2367 }
2368 }
2369
2370 // external operations on common pool
2371
2372 /**
2373 * Returns common pool queue for a thread that has submitted at
2374 * least one task.
2375 */
2376 static WorkQueue commonSubmitterQueue() {
2377 ForkJoinPool p; WorkQueue[] ws; int m; Submitter z;
2378 return ((z = submitters.get()) != null &&
2379 (p = commonPool) != null &&
2380 (ws = p.workQueues) != null &&
2381 (m = ws.length - 1) >= 0) ?
2382 ws[m & z.seed & SQMASK] : null;
2383 }
2384
2385 /**
2386 * Tries to pop the given task from submitter's queue in common pool.
2387 */
2388 static boolean tryExternalUnpush(ForkJoinTask<?> t) {
2389 ForkJoinPool p; WorkQueue[] ws; WorkQueue q; Submitter z;
2390 ForkJoinTask<?>[] a; int m, s;
2391 if (t != null &&
2392 (z = submitters.get()) != null &&
2393 (p = commonPool) != null &&
2394 (ws = p.workQueues) != null &&
2395 (m = ws.length - 1) >= 0 &&
2396 (q = ws[m & z.seed & SQMASK]) != null &&
2397 (s = q.top) != q.base &&
2398 (a = q.array) != null) {
2399 long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE;
2400 if (U.getObject(a, j) == t &&
2401 U.compareAndSwapInt(q, QLOCK, 0, 1)) {
2402 if (q.array == a && q.top == s && // recheck
2403 U.compareAndSwapObject(a, j, t, null)) {
2404 q.top = s - 1;
2405 q.qlock = 0;
2406 return true;
2407 }
2408 q.qlock = 0;
2409 }
2410 }
2411 return false;
2412 }
2413
2414 /**
2415 * Tries to pop and run local tasks within the same computation
2416 * as the given root. On failure, tries to help complete from
2428 (o instanceof CountedCompleter)) {
2429 CountedCompleter<?> t = (CountedCompleter<?>)o, r = t;
2430 do {
2431 if (r == root) {
2432 if (U.compareAndSwapInt(q, QLOCK, 0, 1)) {
2433 if (q.array == a && q.top == s &&
2434 U.compareAndSwapObject(a, j, t, null)) {
2435 q.top = s - 1;
2436 task = t;
2437 }
2438 q.qlock = 0;
2439 }
2440 break;
2441 }
2442 } while ((r = r.completer) != null);
2443 }
2444 }
2445 if (task != null)
2446 task.doExec();
2447 if (root.status < 0 ||
2448 (u = (int)(ctl >>> 32)) >= 0 || (u >> UAC_SHIFT) >= 0)
2449 break;
2450 if (task == null) {
2451 helpSignal(root, q.poolIndex);
2452 if (root.status >= 0)
2453 helpComplete(root, SHARED_QUEUE);
2454 break;
2455 }
2456 }
2457 }
2458 }
2459
2460 /**
2461 * Tries to help execute or signal availability of the given task
2462 * from submitter's queue in common pool.
2463 */
2464 static void externalHelpJoin(ForkJoinTask<?> t) {
2465 // Some hard-to-avoid overlap with tryExternalUnpush
2466 ForkJoinPool p; WorkQueue[] ws; WorkQueue q, w; Submitter z;
2467 ForkJoinTask<?>[] a; int m, s, n;
2468 if (t != null &&
2469 (z = submitters.get()) != null &&
2470 (p = commonPool) != null &&
2471 (ws = p.workQueues) != null &&
2472 (m = ws.length - 1) >= 0 &&
2473 (q = ws[m & z.seed & SQMASK]) != null &&
2474 (a = q.array) != null) {
2475 int am = a.length - 1;
2476 if ((s = q.top) != q.base) {
2477 long j = ((am & (s - 1)) << ASHIFT) + ABASE;
2478 if (U.getObject(a, j) == t &&
2479 U.compareAndSwapInt(q, QLOCK, 0, 1)) {
2480 if (q.array == a && q.top == s &&
2481 U.compareAndSwapObject(a, j, t, null)) {
2482 q.top = s - 1;
2483 q.qlock = 0;
2484 t.doExec();
2485 }
2486 else
2487 q.qlock = 0;
2488 }
2489 }
2490 if (t.status >= 0) {
2491 if (t instanceof CountedCompleter)
2492 p.externalHelpComplete(q, t);
2493 else
2494 p.helpSignal(t, q.poolIndex);
2495 }
2496 }
2497 }
2498
2499 /**
2500 * Restricted version of helpQuiescePool for external callers
2501 */
2502 static void externalHelpQuiescePool() {
2503 ForkJoinPool p; ForkJoinTask<?> t; WorkQueue q; int b;
2504 if ((p = commonPool) != null &&
2505 (q = p.findNonEmptyStealQueue(1)) != null &&
2506 (b = q.base) - q.top < 0 &&
2507 (t = q.pollAt(b)) != null)
2508 t.doExec();
2509 }
2510
2511 // Exported methods
2512
2513 // Constructors
2514
2515 /**
2516 * Creates a {@code ForkJoinPool} with parallelism equal to {@link
2517 * java.lang.Runtime#availableProcessors}, using the {@linkplain
2518 * #defaultForkJoinWorkerThreadFactory default thread factory},
2519 * no UncaughtExceptionHandler, and non-async LIFO processing mode.
2520 *
2521 * @throws SecurityException if a security manager exists and
2522 * the caller is not permitted to modify threads
2523 * because it does not hold {@link
2524 * java.lang.RuntimePermission}{@code ("modifyThread")}
2525 */
2526 public ForkJoinPool() {
2527 this(Runtime.getRuntime().availableProcessors(),
2528 defaultForkJoinWorkerThreadFactory, null, false);
2529 }
2530
2531 /**
2532 * Creates a {@code ForkJoinPool} with the indicated parallelism
2533 * level, the {@linkplain
2534 * #defaultForkJoinWorkerThreadFactory default thread factory},
2535 * no UncaughtExceptionHandler, and non-async LIFO processing mode.
2536 *
2537 * @param parallelism the parallelism level
2538 * @throws IllegalArgumentException if parallelism less than or
2539 * equal to zero, or greater than implementation limit
2540 * @throws SecurityException if a security manager exists and
2541 * the caller is not permitted to modify threads
2542 * because it does not hold {@link
2543 * java.lang.RuntimePermission}{@code ("modifyThread")}
2544 */
2545 public ForkJoinPool(int parallelism) {
2546 this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
2547 }
2555 * use {@link #defaultForkJoinWorkerThreadFactory}.
2556 * @param handler the handler for internal worker threads that
2557 * terminate due to unrecoverable errors encountered while executing
2558 * tasks. For default value, use {@code null}.
2559 * @param asyncMode if true,
2560 * establishes local first-in-first-out scheduling mode for forked
2561 * tasks that are never joined. This mode may be more appropriate
2562 * than default locally stack-based mode in applications in which
2563 * worker threads only process event-style asynchronous tasks.
2564 * For default value, use {@code false}.
2565 * @throws IllegalArgumentException if parallelism less than or
2566 * equal to zero, or greater than implementation limit
2567 * @throws NullPointerException if the factory is null
2568 * @throws SecurityException if a security manager exists and
2569 * the caller is not permitted to modify threads
2570 * because it does not hold {@link
2571 * java.lang.RuntimePermission}{@code ("modifyThread")}
2572 */
2573 public ForkJoinPool(int parallelism,
2574 ForkJoinWorkerThreadFactory factory,
2575 Thread.UncaughtExceptionHandler handler,
2576 boolean asyncMode) {
2577 checkPermission();
2578 if (factory == null)
2579 throw new NullPointerException();
2580 if (parallelism <= 0 || parallelism > MAX_CAP)
2581 throw new IllegalArgumentException();
2582 this.factory = factory;
2583 this.ueh = handler;
2584 this.config = parallelism | (asyncMode ? (FIFO_QUEUE << 16) : 0);
2585 long np = (long)(-parallelism); // offset ctl counts
2586 this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
2587 int pn = nextPoolId();
2588 StringBuilder sb = new StringBuilder("ForkJoinPool-");
2589 sb.append(Integer.toString(pn));
2590 sb.append("-worker-");
2591 this.workerNamePrefix = sb.toString();
2592 }
2593
2594 /**
2595 * Constructor for common pool, suitable only for static initialization.
2596 * Basically the same as above, but uses smallest possible initial footprint.
2597 */
2598 ForkJoinPool(int parallelism, long ctl,
2599 ForkJoinWorkerThreadFactory factory,
2600 Thread.UncaughtExceptionHandler handler) {
2601 this.config = parallelism;
2602 this.ctl = ctl;
2603 this.factory = factory;
2604 this.ueh = handler;
2605 this.workerNamePrefix = "ForkJoinPool.commonPool-worker-";
2606 }
2607
2608 /**
2609 * Returns the common pool instance. This pool is statically
2610 * constructed; its run state is unaffected by attempts to
2611 * {@link #shutdown} or {@link #shutdownNow}.
2612 *
2613 * @return the common pool instance
2614 * @since 1.8
2615 */
2616 public static ForkJoinPool commonPool() {
2617 // assert commonPool != null : "static init error";
2618 return commonPool;
2619 }
2620
2621 // Execution methods
2622
2623 /**
2624 * Performs the given task, returning its result upon completion.
2625 * If the computation encounters an unchecked Exception or Error,
2626 * it is rethrown as the outcome of this invocation. Rethrown
2627 * exceptions behave in the same way as regular exceptions, but,
2628 * when possible, contain stack traces (as displayed for example
2629 * using {@code ex.printStackTrace()}) of both the current thread
2630 * as well as the thread actually encountering the exception;
2631 * minimally only the latter.
2632 *
2633 * @param task the task
2634 * @return the task's result
2635 * @throws NullPointerException if the task is null
2636 * @throws RejectedExecutionException if the task cannot be
2637 * scheduled for execution
2638 */
2654 public void execute(ForkJoinTask<?> task) {
2655 if (task == null)
2656 throw new NullPointerException();
2657 externalPush(task);
2658 }
2659
2660 // AbstractExecutorService methods
2661
2662 /**
2663 * @throws NullPointerException if the task is null
2664 * @throws RejectedExecutionException if the task cannot be
2665 * scheduled for execution
2666 */
2667 public void execute(Runnable task) {
2668 if (task == null)
2669 throw new NullPointerException();
2670 ForkJoinTask<?> job;
2671 if (task instanceof ForkJoinTask<?>) // avoid re-wrap
2672 job = (ForkJoinTask<?>) task;
2673 else
2674 job = new ForkJoinTask.AdaptedRunnableAction(task);
2675 externalPush(job);
2676 }
2677
2678 /**
2679 * Submits a ForkJoinTask for execution.
2680 *
2681 * @param task the task to submit
2682 * @return the task
2683 * @throws NullPointerException if the task is null
2684 * @throws RejectedExecutionException if the task cannot be
2685 * scheduled for execution
2686 */
2687 public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
2688 if (task == null)
2689 throw new NullPointerException();
2690 externalPush(task);
2691 return task;
2692 }
2693
2694 /**
2721 public ForkJoinTask<?> submit(Runnable task) {
2722 if (task == null)
2723 throw new NullPointerException();
2724 ForkJoinTask<?> job;
2725 if (task instanceof ForkJoinTask<?>) // avoid re-wrap
2726 job = (ForkJoinTask<?>) task;
2727 else
2728 job = new ForkJoinTask.AdaptedRunnableAction(task);
2729 externalPush(job);
2730 return job;
2731 }
2732
2733 /**
2734 * @throws NullPointerException {@inheritDoc}
2735 * @throws RejectedExecutionException {@inheritDoc}
2736 */
2737 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
2738 // In previous versions of this class, this method constructed
2739 // a task to run ForkJoinTask.invokeAll, but now external
2740 // invocation of multiple tasks is at least as efficient.
2741 List<ForkJoinTask<T>> fs = new ArrayList<ForkJoinTask<T>>(tasks.size());
2742 // Workaround needed because method wasn't declared with
2743 // wildcards in return type but should have been.
2744 @SuppressWarnings({"unchecked", "rawtypes"})
2745 List<Future<T>> futures = (List<Future<T>>) (List) fs;
2746
2747 boolean done = false;
2748 try {
2749 for (Callable<T> t : tasks) {
2750 ForkJoinTask<T> f = new ForkJoinTask.AdaptedCallable<T>(t);
2751 externalPush(f);
2752 fs.add(f);
2753 }
2754 for (ForkJoinTask<T> f : fs)
2755 f.quietlyJoin();
2756 done = true;
2757 return futures;
2758 } finally {
2759 if (!done)
2760 for (ForkJoinTask<T> f : fs)
2761 f.cancel(false);
2762 }
2763 }
2764
2765 /**
2766 * Returns the factory used for constructing new workers.
2767 *
2768 * @return the factory used for constructing new workers
2769 */
2770 public ForkJoinWorkerThreadFactory getFactory() {
2771 return factory;
2772 }
2773
2774 /**
2775 * Returns the handler for internal worker threads that terminate
2776 * due to unrecoverable errors encountered while executing tasks.
2777 *
2778 * @return the handler, or {@code null} if none
2779 */
2780 public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
2781 return ueh;
2782 }
2783
2784 /**
2785 * Returns the targeted parallelism level of this pool.
2786 *
2787 * @return the targeted parallelism level of this pool
2788 */
2789 public int getParallelism() {
2790 return config & SMASK;
2791 }
2792
2793 /**
2794 * Returns the targeted parallelism level of the common pool.
2795 *
2796 * @return the targeted parallelism level of the common pool
2797 * @since 1.8
2798 */
2799 public static int getCommonPoolParallelism() {
2800 return commonPoolParallelism;
2801 }
2802
2803 /**
2804 * Returns the number of worker threads that have started but not
2805 * yet terminated. The result returned by this method may differ
2806 * from {@link #getParallelism} when threads are created to
2807 * maintain parallelism when others are cooperatively blocked.
2808 *
2809 * @return the number of worker threads
2810 */
2811 public int getPoolSize() {
2812 return (config & SMASK) + (short)(ctl >>> TC_SHIFT);
2813 }
2814
2815 /**
2816 * Returns {@code true} if this pool uses local first-in-first-out
2817 * scheduling mode for forked tasks that are never joined.
2818 *
2819 * @return {@code true} if this pool uses async mode
2820 */
3038 if ((c & STOP_BIT) != 0)
3039 level = (tc == 0) ? "Terminated" : "Terminating";
3040 else
3041 level = plock < 0 ? "Shutting down" : "Running";
3042 return super.toString() +
3043 "[" + level +
3044 ", parallelism = " + pc +
3045 ", size = " + tc +
3046 ", active = " + ac +
3047 ", running = " + rc +
3048 ", steals = " + st +
3049 ", tasks = " + qt +
3050 ", submissions = " + qs +
3051 "]";
3052 }
3053
3054 /**
3055 * Possibly initiates an orderly shutdown in which previously
3056 * submitted tasks are executed, but no new tasks will be
3057 * accepted. Invocation has no effect on execution state if this
3058 * is the {@link #commonPool}, and no additional effect if
3059 * already shut down. Tasks that are in the process of being
3060 * submitted concurrently during the course of this method may or
3061 * may not be rejected.
3062 *
3063 * @throws SecurityException if a security manager exists and
3064 * the caller is not permitted to modify threads
3065 * because it does not hold {@link
3066 * java.lang.RuntimePermission}{@code ("modifyThread")}
3067 */
3068 public void shutdown() {
3069 checkPermission();
3070 tryTerminate(false, true);
3071 }
3072
3073 /**
3074 * Possibly attempts to cancel and/or stop all tasks, and reject
3075 * all subsequently submitted tasks. Invocation has no effect on
3076 * execution state if this is the {@link #commonPool}, and no
3077 * additional effect if already shut down. Otherwise, tasks that
3078 * are in the process of being submitted or executed concurrently
3079 * during the course of this method may or may not be
3080 * rejected. This method cancels both existing and unexecuted
3081 * tasks, in order to permit termination in the presence of task
3082 * dependencies. So the method always returns an empty list
3083 * (unlike the case for some other Executors).
3084 *
3085 * @return an empty list
3086 * @throws SecurityException if a security manager exists and
3087 * the caller is not permitted to modify threads
3088 * because it does not hold {@link
3089 * java.lang.RuntimePermission}{@code ("modifyThread")}
3090 */
3091 public List<Runnable> shutdownNow() {
3092 checkPermission();
3093 tryTerminate(true, true);
3094 return Collections.emptyList();
3095 }
3096
3119 * @return {@code true} if terminating but not yet terminated
3120 */
3121 public boolean isTerminating() {
3122 long c = ctl;
3123 return ((c & STOP_BIT) != 0L &&
3124 (short)(c >>> TC_SHIFT) != -(config & SMASK));
3125 }
3126
3127 /**
3128 * Returns {@code true} if this pool has been shut down.
3129 *
3130 * @return {@code true} if this pool has been shut down
3131 */
3132 public boolean isShutdown() {
3133 return plock < 0;
3134 }
3135
3136 /**
3137 * Blocks until all tasks have completed execution after a
3138 * shutdown request, or the timeout occurs, or the current thread
3139 * is interrupted, whichever happens first. Note that the {@link
3140 * #commonPool()} never terminates until program shutdown so
3141 * this method will always time out.
3142 *
3143 * @param timeout the maximum time to wait
3144 * @param unit the time unit of the timeout argument
3145 * @return {@code true} if this executor terminated and
3146 * {@code false} if the timeout elapsed before termination
3147 * @throws InterruptedException if interrupted while waiting
3148 */
3149 public boolean awaitTermination(long timeout, TimeUnit unit)
3150 throws InterruptedException {
3151 long nanos = unit.toNanos(timeout);
3152 if (isTerminated())
3153 return true;
3154 long startTime = System.nanoTime();
3155 boolean terminated = false;
3156 synchronized (this) {
3157 for (long waitTime = nanos, millis = 0L;;) {
3158 if (terminated = isTerminated() ||
3159 waitTime <= 0L ||
3160 (millis = unit.toMillis(waitTime)) <= 0L)
3161 break;
3162 wait(millis);
3163 waitTime = nanos - (System.nanoTime() - startTime);
3164 }
3165 }
3166 return terminated;
3167 }
3168
3169 /**
3170 * Interface for extending managed parallelism for tasks running
3171 * in {@link ForkJoinPool}s.
3172 *
3173 * <p>A {@code ManagedBlocker} provides two methods. Method
3174 * {@code isReleasable} must return {@code true} if blocking is
3175 * not necessary. Method {@code block} blocks the current thread
3176 * if necessary (perhaps internally invoking {@code isReleasable}
3177 * before actually blocking). These actions are performed by any
3178 * thread invoking {@link ForkJoinPool#managedBlock}. The
3179 * unusual methods in this API accommodate synchronizers that may,
3180 * but don't usually, block for long periods. Similarly, they
3181 * allow more efficient internal handling of cases in which
3182 * additional workers may be, but usually are not, needed to
3183 * ensure sufficient parallelism. Toward this end,
3184 * implementations of method {@code isReleasable} must be amenable
3185 * to repeated invocation.
3186 *
3187 * <p>For example, here is a ManagedBlocker based on a
3188 * ReentrantLock:
3189 * <pre> {@code
3190 * class ManagedLocker implements ManagedBlocker {
3191 * final ReentrantLock lock;
3192 * boolean hasLock = false;
3193 * ManagedLocker(ReentrantLock lock) { this.lock = lock; }
3194 * public boolean block() {
3195 * if (!hasLock)
3196 * lock.lock();
3197 * return true;
3198 * }
3199 * public boolean isReleasable() {
3200 * return hasLock || (hasLock = lock.tryLock());
3218 * }
3219 * public E getItem() { // call after pool.managedBlock completes
3220 * return item;
3221 * }
3222 * }}</pre>
3223 */
3224 public static interface ManagedBlocker {
3225 /**
3226 * Possibly blocks the current thread, for example waiting for
3227 * a lock or condition.
3228 *
3229 * @return {@code true} if no additional blocking is necessary
3230 * (i.e., if isReleasable would return true)
3231 * @throws InterruptedException if interrupted while waiting
3232 * (the method is not required to do so, but is allowed to)
3233 */
3234 boolean block() throws InterruptedException;
3235
3236 /**
3237 * Returns {@code true} if blocking is unnecessary.
3238 */
3239 boolean isReleasable();
3240 }
3241
3242 /**
3243 * Blocks in accord with the given blocker. If the current thread
3244 * is a {@link ForkJoinWorkerThread}, this method possibly
3245 * arranges for a spare thread to be activated if necessary to
3246 * ensure sufficient parallelism while the current thread is blocked.
3247 *
3248 * <p>If the caller is not a {@link ForkJoinTask}, this method is
3249 * behaviorally equivalent to
3250 * <pre> {@code
3251 * while (!blocker.isReleasable())
3252 * if (blocker.block())
3253 * return;
3254 * }</pre>
3255 *
3256 * If the caller is a {@code ForkJoinTask}, then the pool may
3257 * first be expanded to ensure parallelism, and later adjusted.
3302 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
3303 return new ForkJoinTask.AdaptedRunnable<T>(runnable, value);
3304 }
3305
3306 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
3307 return new ForkJoinTask.AdaptedCallable<T>(callable);
3308 }
3309
3310 // Unsafe mechanics
3311 private static final sun.misc.Unsafe U;
3312 private static final long CTL;
3313 private static final long PARKBLOCKER;
3314 private static final int ABASE;
3315 private static final int ASHIFT;
3316 private static final long STEALCOUNT;
3317 private static final long PLOCK;
3318 private static final long INDEXSEED;
3319 private static final long QLOCK;
3320
3321 static {
3322 int s; // initialize field offsets for CAS etc
3323 try {
3324 U = sun.misc.Unsafe.getUnsafe();
3325 Class<?> k = ForkJoinPool.class;
3326 CTL = U.objectFieldOffset
3327 (k.getDeclaredField("ctl"));
3328 STEALCOUNT = U.objectFieldOffset
3329 (k.getDeclaredField("stealCount"));
3330 PLOCK = U.objectFieldOffset
3331 (k.getDeclaredField("plock"));
3332 INDEXSEED = U.objectFieldOffset
3333 (k.getDeclaredField("indexSeed"));
3334 Class<?> tk = Thread.class;
3335 PARKBLOCKER = U.objectFieldOffset
3336 (tk.getDeclaredField("parkBlocker"));
3337 Class<?> wk = WorkQueue.class;
3338 QLOCK = U.objectFieldOffset
3339 (wk.getDeclaredField("qlock"));
3340 Class<?> ak = ForkJoinTask[].class;
3341 ABASE = U.arrayBaseOffset(ak);
3342 s = U.arrayIndexScale(ak);
3343 ASHIFT = 31 - Integer.numberOfLeadingZeros(s);
3344 } catch (Exception e) {
3345 throw new Error(e);
3346 }
3347 if ((s & (s-1)) != 0)
3348 throw new Error("data type scale not a power of two");
3349
3350 submitters = new ThreadLocal<Submitter>();
3351 ForkJoinWorkerThreadFactory fac = defaultForkJoinWorkerThreadFactory =
3352 new DefaultForkJoinWorkerThreadFactory();
3353 modifyThreadPermission = new RuntimePermission("modifyThread");
3354
3355 /*
3356 * Establish common pool parameters. For extra caution,
3357 * computations to set up common pool state are here; the
3358 * constructor just assigns these values to fields.
3359 */
3360
3361 int par = 0;
3362 Thread.UncaughtExceptionHandler handler = null;
3363 try { // TBD: limit or report ignored exceptions?
3364 String pp = System.getProperty
3365 ("java.util.concurrent.ForkJoinPool.common.parallelism");
3366 String hp = System.getProperty
3367 ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
3368 String fp = System.getProperty
3369 ("java.util.concurrent.ForkJoinPool.common.threadFactory");
3370 if (fp != null)
3371 fac = ((ForkJoinWorkerThreadFactory)ClassLoader.
3372 getSystemClassLoader().loadClass(fp).newInstance());
3373 if (hp != null)
3374 handler = ((Thread.UncaughtExceptionHandler)ClassLoader.
3375 getSystemClassLoader().loadClass(hp).newInstance());
3376 if (pp != null)
3377 par = Integer.parseInt(pp);
3378 } catch (Exception ignore) {
3379 }
3380
3381 if (par <= 0)
3382 par = Runtime.getRuntime().availableProcessors();
3383 if (par > MAX_CAP)
3384 par = MAX_CAP;
3385 commonPoolParallelism = par;
3386 long np = (long)(-par); // precompute initial ctl value
3387 long ct = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
3388
3389 commonPool = new ForkJoinPool(par, ct, fac, handler);
3390 }
3391
3392 }
|
18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19 *
20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21 * or visit www.oracle.com if you need additional information or have any
22 * questions.
23 */
24
25 /*
26 * This file is available under and governed by the GNU General Public
27 * License version 2 only, as published by the Free Software Foundation.
28 * However, the following notice accompanied the original version of this
29 * file:
30 *
31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 * Expert Group and released to the public domain, as explained at
33 * http://creativecommons.org/publicdomain/zero/1.0/
34 */
35
36 package java.util.concurrent;
37
38 import java.lang.Thread.UncaughtExceptionHandler;
39 import java.util.ArrayList;
40 import java.util.Arrays;
41 import java.util.Collection;
42 import java.util.Collections;
43 import java.util.List;
44 import java.util.concurrent.AbstractExecutorService;
45 import java.util.concurrent.Callable;
46 import java.util.concurrent.ExecutorService;
47 import java.util.concurrent.Future;
48 import java.util.concurrent.RejectedExecutionException;
49 import java.util.concurrent.RunnableFuture;
50 import java.util.concurrent.TimeUnit;
51
52 /**
53 * An {@link ExecutorService} for running {@link ForkJoinTask}s.
54 * A {@code ForkJoinPool} provides the entry point for submissions
55 * from non-{@code ForkJoinTask} clients, as well as management and
56 * monitoring operations.
57 *
58 * <p>A {@code ForkJoinPool} differs from other kinds of {@link
88 * <p>In addition to execution and lifecycle control methods, this
89 * class provides status check methods (for example
90 * {@link #getStealCount}) that are intended to aid in developing,
91 * tuning, and monitoring fork/join applications. Also, method
92 * {@link #toString} returns indications of pool state in a
93 * convenient form for informal monitoring.
94 *
95 * <p>As is the case with other ExecutorServices, there are three
96 * main task execution methods summarized in the following table.
97 * These are designed to be used primarily by clients not already
98 * engaged in fork/join computations in the current pool. The main
99 * forms of these methods accept instances of {@code ForkJoinTask},
100 * but overloaded forms also allow mixed execution of plain {@code
101 * Runnable}- or {@code Callable}- based activities as well. However,
102 * tasks that are already executing in a pool should normally instead
103 * use the within-computation forms listed in the table unless using
104 * async event-style tasks that are not usually joined, in which case
105 * there is little difference among choice of methods.
106 *
107 * <table BORDER CELLPADDING=3 CELLSPACING=1>
108 * <caption>Summary of task execution methods</caption>
109 * <tr>
110 * <td></td>
111 * <td ALIGN=CENTER> <b>Call from non-fork/join clients</b></td>
112 * <td ALIGN=CENTER> <b>Call from within fork/join computations</b></td>
113 * </tr>
114 * <tr>
115 * <td> <b>Arrange async execution</b></td>
116 * <td> {@link #execute(ForkJoinTask)}</td>
117 * <td> {@link ForkJoinTask#fork}</td>
118 * </tr>
119 * <tr>
120 * <td> <b>Await and obtain result</b></td>
121 * <td> {@link #invoke(ForkJoinTask)}</td>
122 * <td> {@link ForkJoinTask#invoke}</td>
123 * </tr>
124 * <tr>
125 * <td> <b>Arrange exec and obtain Future</b></td>
126 * <td> {@link #submit(ForkJoinTask)}</td>
127 * <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td>
128 * </tr>
129 * </table>
130 *
131 * <p>The common pool is by default constructed with default
132 * parameters, but these may be controlled by setting three
133 * {@linkplain System#getProperty system properties} with prefix
134 * {@code "java.util.concurrent.ForkJoinPool.common."}:
135 * {@code parallelism} -- a non-negative integer,
136 * {@code threadFactory} -- the class name of a
137 * {@link ForkJoinWorkerThreadFactory}, and
138 * {@code exceptionHandler} --
139 * the class name of a {@link UncaughtExceptionHandler}.
140 * Upon any error in establishing these settings, default parameters
141 * are used. It is possible to disable or limit the use of threads in
142 * the common pool by setting the parallelism property to zero, and/or
143 * using a factory that may return {@code null}.
144 *
145 * <p><b>Implementation notes</b>: This implementation restricts the
146 * maximum number of running threads to 32767. Attempts to create
147 * pools with greater than the maximum number result in
148 * {@code IllegalArgumentException}.
149 *
150 * <p>This implementation rejects submitted tasks (that is, by throwing
151 * {@link RejectedExecutionException}) only when the pool is shut down
152 * or internal resources have been exhausted.
153 *
154 * @since 1.7
155 * @author Doug Lea
156 */
157 public class ForkJoinPool extends AbstractExecutorService {
158
159 /*
160 * Implementation Overview
161 *
162 * This class and its nested classes provide the main
163 * functionality and control for a set of worker threads:
213 * base index, else consider alternative actions, rather than
214 * method poll.)
215 *
216 * This approach also enables support of a user mode in which local
217 * task processing is in FIFO, not LIFO order, simply by using
218 * poll rather than pop. This can be useful in message-passing
219 * frameworks in which tasks are never joined. However neither
220 * mode considers affinities, loads, cache localities, etc, so
221 * rarely provide the best possible performance on a given
222 * machine, but portably provide good throughput by averaging over
223 * these factors. (Further, even if we did try to use such
224 * information, we do not usually have a basis for exploiting it.
225 * For example, some sets of tasks profit from cache affinities,
226 * but others are harmed by cache pollution effects.)
227 *
228 * WorkQueues are also used in a similar way for tasks submitted
229 * to the pool. We cannot mix these tasks in the same queues used
230 * for work-stealing (this would contaminate lifo/fifo
231 * processing). Instead, we randomly associate submission queues
232 * with submitting threads, using a form of hashing. The
233 * ThreadLocalRandom probe value serves as a hash code for
234 * choosing existing queues, and may be randomly repositioned upon
235 * contention with other submitters. In essence, submitters act
236 * like workers except that they are restricted to executing local
237 * tasks that they submitted (or in the case of CountedCompleters,
238 * others with the same root task). However, because most
239 * shared/external queue operations are more expensive than
240 * internal, and because, at steady state, external submitters
241 * will compete for CPU with workers, ForkJoinTask.join and
242 * related methods disable them from repeatedly helping to process
243 * tasks if all workers are active. Insertion of tasks in shared
244 * mode requires a lock (mainly to protect in the case of
245 * resizing) but we use only a simple spinlock (using bits in
246 * field qlock), because submitters encountering a busy queue move
247 * on to try or create other queues -- they block only when
248 * creating and registering new queues.
249 *
250 * Management
251 * ==========
252 *
253 * The main throughput advantages of work-stealing stem from
254 * decentralized control -- workers mostly take tasks from
255 * themselves or each other. We cannot negate this in the
256 * implementation of other management responsibilities. The main
257 * tactic for avoiding bottlenecks is packing nearly all
258 * essentially atomic control state into two volatile variables
259 * that are by far most often read (not written) as status and
260 * consistency checks.
261 *
262 * Field "ctl" contains 64 bits holding all the information needed
263 * to atomically decide to add, inactivate, enqueue (on an event
264 * queue), dequeue, and/or re-activate workers. To enable this
457 * helping opportunities is challenging to control on JVMs, where
458 * GC and other activities can stall progress of tasks that in
459 * turn stall out many other dependent tasks, without us being
460 * able to determine whether they will ever require compensation.
461 * Even though work-stealing otherwise encounters little
462 * degradation in the presence of more threads than cores,
463 * aggressively adding new threads in such cases entails risk of
464 * unwanted positive feedback control loops in which more threads
465 * cause more dependent stalls (as well as delayed progress of
466 * unblocked threads to the point that we know they are available)
467 * leading to more situations requiring more threads, and so
468 * on. This aspect of control can be seen as an (analytically
469 * intractable) game with an opponent that may choose the worst
470 * (for us) active thread to stall at any time. We take several
471 * precautions to bound losses (and thus bound gains), mainly in
472 * methods tryCompensate and awaitJoin.
473 *
474 * Common Pool
475 * ===========
476 *
477 * The static common Pool always exists after static
478 * initialization. Since it (or any other created pool) need
479 * never be used, we minimize initial construction overhead and
480 * footprint to the setup of about a dozen fields, with no nested
481 * allocation. Most bootstrapping occurs within method
482 * fullExternalPush during the first submission to the pool.
483 *
484 * When external threads submit to the common pool, they can
485 * perform some subtask processing (see externalHelpJoin and
486 * related methods). We do not need to record whether these
487 * submissions are to the common pool -- if not, externalHelpJoin
488 * returns quickly (at the most helping to signal some common pool
489 * workers). These submitters would otherwise be blocked waiting
490 * for completion, so the extra effort (with liberally sprinkled
491 * task status checks) in inapplicable cases amounts to an odd
492 * form of limited spin-wait before blocking in ForkJoinTask.join.
493 *
494 * Style notes
495 * ===========
496 *
497 * There is a lot of representation-level coupling among classes
536 private static void checkPermission() {
537 SecurityManager security = System.getSecurityManager();
538 if (security != null)
539 security.checkPermission(modifyThreadPermission);
540 }
541
542 // Nested classes
543
544 /**
545 * Factory for creating new {@link ForkJoinWorkerThread}s.
546 * A {@code ForkJoinWorkerThreadFactory} must be defined and used
547 * for {@code ForkJoinWorkerThread} subclasses that extend base
548 * functionality or initialize threads with different contexts.
549 */
550 public static interface ForkJoinWorkerThreadFactory {
551 /**
552 * Returns a new worker thread operating in the given pool.
553 *
554 * @param pool the pool this thread works in
555 * @throws NullPointerException if the pool is null
556 * @return the new worker thread
557 */
558 public ForkJoinWorkerThread newThread(ForkJoinPool pool);
559 }
560
561 /**
562 * Default ForkJoinWorkerThreadFactory implementation; creates a
563 * new ForkJoinWorkerThread.
564 */
565 static final class DefaultForkJoinWorkerThreadFactory
566 implements ForkJoinWorkerThreadFactory {
567 public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
568 return new ForkJoinWorkerThread(pool);
569 }
570 }
571
572 /**
573 * Class for artificial tasks that are used to replace the target
574 * of local joins if they are removed from an interior queue slot
575 * in WorkQueue.tryRemoveAndExec. We don't need the proxy to
576 * actually do anything beyond having a unique identity.
577 */
578 static final class EmptyTask extends ForkJoinTask<Void> {
579 private static final long serialVersionUID = -7721805057305804111L;
580 EmptyTask() { status = ForkJoinTask.NORMAL; } // force done
581 public final Void getRawResult() { return null; }
582 public final void setRawResult(Void x) {}
583 public final boolean exec() { return true; }
584 }
585
586 /**
587 * Queues supporting work-stealing as well as external task
588 * submission. See above for main rationale and algorithms.
589 * Implementation relies heavily on "Unsafe" intrinsics
590 * and selective use of "volatile":
591 *
592 * Field "base" is the index (mod array.length) of the least valid
706 * Provides a more accurate estimate of whether this queue has
707 * any tasks than does queueSize, by checking whether a
708 * near-empty queue has at least one unclaimed task.
709 */
710 final boolean isEmpty() {
711 ForkJoinTask<?>[] a; int m, s;
712 int n = base - (s = top);
713 return (n >= 0 ||
714 (n == -1 &&
715 ((a = array) == null ||
716 (m = a.length - 1) < 0 ||
717 U.getObject
718 (a, (long)((m & (s - 1)) << ASHIFT) + ABASE) == null)));
719 }
720
721 /**
722 * Pushes a task. Call only by owner in unshared queues. (The
723 * shared-queue version is embedded in method externalPush.)
724 *
725 * @param task the task. Caller must ensure non-null.
726 * @throws RejectedExecutionException if array cannot be resized
727 */
728 final void push(ForkJoinTask<?> task) {
729 ForkJoinTask<?>[] a; ForkJoinPool p;
730 int s = top, m, n;
731 if ((a = array) != null) { // ignore if queue removed
732 int j = (((m = a.length - 1) & s) << ASHIFT) + ABASE;
733 U.putOrderedObject(a, j, task);
734 if ((n = (top = s + 1) - base) <= 2) {
735 if ((p = pool) != null)
736 p.signalWork(this);
737 }
738 else if (n >= m)
739 growArray();
740 }
741 }
742
743 /**
744 * Initializes or doubles the capacity of array. Call either
745 * by owner or with lock held -- it is OK for base, but not
746 * top, to move while resizings are in progress.
905 if (U.compareAndSwapObject(a, j, t, null)) {
906 top = s;
907 t.doExec();
908 }
909 }
910 }
911
912 /**
913 * Polls and runs tasks until empty.
914 */
915 private void pollAndExecAll() {
916 for (ForkJoinTask<?> t; (t = poll()) != null;)
917 t.doExec();
918 }
919
920 /**
921 * If present, removes from queue and executes the given task,
922 * or any other cancelled task. Returns (true) on any CAS
923 * or consistency check failure so caller can retry.
924 *
925 * @return false if no progress can be made, else true
926 */
927 final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
928 boolean stat = true, removed = false, empty = true;
929 ForkJoinTask<?>[] a; int m, s, b, n;
930 if ((a = array) != null && (m = a.length - 1) >= 0 &&
931 (n = (s = top) - (b = base)) > 0) {
932 for (ForkJoinTask<?> t;;) { // traverse from s to b
933 int j = ((--s & m) << ASHIFT) + ABASE;
934 t = (ForkJoinTask<?>)U.getObjectVolatile(a, j);
935 if (t == null) // inconsistent length
936 break;
937 else if (t == task) {
938 if (s + 1 == top) { // pop
939 if (!U.compareAndSwapObject(a, j, task, null))
940 break;
941 top = s;
942 removed = true;
943 }
944 else if (base == b) // replace with proxy
945 removed = U.compareAndSwapObject(a, j, task,
950 empty = false;
951 else if (s + 1 == top) { // pop and throw away
952 if (U.compareAndSwapObject(a, j, t, null))
953 top = s;
954 break;
955 }
956 if (--n == 0) {
957 if (!empty && base == b)
958 stat = false;
959 break;
960 }
961 }
962 }
963 if (removed)
964 task.doExec();
965 return stat;
966 }
967
968 /**
969 * Polls for and executes the given task or any other task in
970 * its CountedCompleter computation.
971 */
972 final boolean pollAndExecCC(ForkJoinTask<?> root) {
973 ForkJoinTask<?>[] a; int b; Object o;
974 outer: while ((b = base) - top < 0 && (a = array) != null) {
975 long j = (((a.length - 1) & b) << ASHIFT) + ABASE;
976 if ((o = U.getObject(a, j)) == null ||
977 !(o instanceof CountedCompleter))
978 break;
979 for (CountedCompleter<?> t = (CountedCompleter<?>)o, r = t;;) {
980 if (r == root) {
981 if (base == b &&
982 U.compareAndSwapObject(a, j, t, null)) {
983 base = b + 1;
984 t.doExec();
985 return true;
986 }
987 else
988 break; // restart
989 }
990 if ((r = r.completer) == null)
1024 }
1025
1026 /**
1027 * Returns true if owned and not known to be blocked.
1028 */
1029 final boolean isApparentlyUnblocked() {
1030 Thread wt; Thread.State s;
1031 return (eventCount >= 0 &&
1032 (wt = owner) != null &&
1033 (s = wt.getState()) != Thread.State.BLOCKED &&
1034 s != Thread.State.WAITING &&
1035 s != Thread.State.TIMED_WAITING);
1036 }
1037
1038 // Unsafe mechanics
1039 private static final sun.misc.Unsafe U;
1040 private static final long QLOCK;
1041 private static final int ABASE;
1042 private static final int ASHIFT;
1043 static {
1044 try {
1045 U = sun.misc.Unsafe.getUnsafe();
1046 Class<?> k = WorkQueue.class;
1047 Class<?> ak = ForkJoinTask[].class;
1048 QLOCK = U.objectFieldOffset
1049 (k.getDeclaredField("qlock"));
1050 ABASE = U.arrayBaseOffset(ak);
1051 int scale = U.arrayIndexScale(ak);
1052 if ((scale & (scale - 1)) != 0)
1053 throw new Error("data type scale not a power of two");
1054 ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
1055 } catch (Exception e) {
1056 throw new Error(e);
1057 }
1058 }
1059 }
1060
1061 // static fields (initialized in static initializer below)
1062
1063 /**
1064 * Creates a new ForkJoinWorkerThread. This factory is used unless
1065 * overridden in ForkJoinPool constructors.
1066 */
1067 public static final ForkJoinWorkerThreadFactory
1068 defaultForkJoinWorkerThreadFactory;
1069
1070 /**
1071 * Permission required for callers of methods that may start or
1072 * kill threads.
1073 */
1074 private static final RuntimePermission modifyThreadPermission;
1075
1076 /**
1077 * Common (static) pool. Non-null for public use unless a static
1078 * construction exception, but internal usages null-check on use
1079 * to paranoically avoid potential initialization circularities
1080 * as well as to simplify generated code.
1081 */
1082 static final ForkJoinPool common;
1083
1084 /**
1085 * Common pool parallelism. To allow simpler use and management
1086 * when common pool threads are disabled, we allow the underlying
1087 * common.config field to be zero, but in that case still report
1088 * parallelism as 1 to reflect resulting caller-runs mechanics.
1089 */
1090 static final int commonParallelism;
1091
1092 /**
1093 * Sequence number for creating workerNamePrefix.
1094 */
1095 private static int poolNumberSequence;
1096
1097 /**
1098 * Returns the next sequence number. We don't expect this to
1099 * ever contend, so use simple builtin sync.
1100 */
1101 private static final synchronized int nextPoolId() {
1102 return ++poolNumberSequence;
1103 }
1104
1105 // static constants
1106
1107 /**
1108 * Initial timeout value (in nanoseconds) for the thread
1109 * triggering quiescence to park waiting for new work. On timeout,
1110 * the thread will instead try to shrink the number of
1111 * workers. The value should be large enough to avoid overly
1112 * aggressive shrinkage during most transient stalls (long GCs
1113 * etc).
1114 */
1115 private static final long IDLE_TIMEOUT = 2000L * 1000L * 1000L; // 2sec
1116
1117 /**
1118 * Timeout value when there are more threads than parallelism level
1119 */
1230
1231 // Instance fields
1232
1233 /*
1234 * Field layout of this class tends to matter more than one would
1235 * like. Runtime layout order is only loosely related to
1236 * declaration order and may differ across JVMs, but the following
1237 * empirically works OK on current JVMs.
1238 */
1239
1240 // Heuristic padding to ameliorate unfortunate memory placements
1241 volatile long pad00, pad01, pad02, pad03, pad04, pad05, pad06;
1242
1243 volatile long stealCount; // collects worker counts
1244 volatile long ctl; // main pool control
1245 volatile int plock; // shutdown status and seqLock
1246 volatile int indexSeed; // worker/submitter index seed
1247 final int config; // mode and parallelism level
1248 WorkQueue[] workQueues; // main registry
1249 final ForkJoinWorkerThreadFactory factory;
1250 final UncaughtExceptionHandler ueh; // per-worker UEH
1251 final String workerNamePrefix; // to create worker name string
1252
1253 volatile Object pad10, pad11, pad12, pad13, pad14, pad15, pad16, pad17;
1254 volatile Object pad18, pad19, pad1a, pad1b;
1255
1256 /**
1257 * Acquires the plock lock to protect worker array and related
1258 * updates. This method is called only if an initial CAS on plock
1259 * fails. This acts as a spinlock for normal cases, but falls back
1260 * to builtin monitor to block when (rarely) needed. This would be
1261 * a terrible idea for a highly contended lock, but works fine as
1262 * a more conservative alternative to a pure spinlock.
1263 */
1264 private int acquirePlock() {
1265 int spins = PL_SPINS, ps, nps;
1266 for (;;) {
1267 if (((ps = plock) & PL_LOCK) == 0 &&
1268 U.compareAndSwapInt(this, PLOCK, ps, nps = ps + PL_LOCK))
1269 return nps;
1270 else if (spins >= 0) {
1271 if (ThreadLocalRandom.nextSecondarySeed() >= 0)
1272 --spins;
1273 }
1274 else if (U.compareAndSwapInt(this, PLOCK, ps, ps | PL_SIGNAL)) {
1275 synchronized (this) {
1276 if ((plock & PL_SIGNAL) != 0) {
1277 try {
1278 wait();
1279 } catch (InterruptedException ie) {
1280 try {
1281 Thread.currentThread().interrupt();
1282 } catch (SecurityException ignore) {
1283 }
1284 }
1285 }
1286 else
1287 notifyAll();
1288 }
1289 }
1290 }
1291 }
1292
1293 /**
1294 * Unlocks and signals any thread waiting for plock. Called only
1295 * when CAS of seq value for unlock fails.
1296 */
1297 private void releasePlock(int ps) {
1298 plock = ps;
1299 synchronized (this) { notifyAll(); }
1300 }
1301
1302 /**
1303 * Tries to create and start one worker if fewer than target
1304 * parallelism level exist. Adjusts counts etc on failure.
1305 */
1306 private void tryAddWorker() {
1307 long c; int u;
1308 while ((u = (int)((c = ctl) >>> 32)) < 0 &&
1309 (u & SHORT_SIGN) != 0 && (int)c == 0) {
1310 long nc = (long)(((u + UTC_UNIT) & UTC_MASK) |
1311 ((u + UAC_UNIT) & UAC_MASK)) << 32;
1312 if (U.compareAndSwapLong(this, CTL, c, nc)) {
1313 ForkJoinWorkerThreadFactory fac;
1314 Throwable ex = null;
1315 ForkJoinWorkerThread wt = null;
1316 try {
1317 if ((fac = factory) != null &&
1318 (wt = fac.newThread(this)) != null) {
1319 wt.start();
1320 break;
1321 }
1322 } catch (Throwable e) {
1324 }
1325 deregisterWorker(wt, ex);
1326 break;
1327 }
1328 }
1329 }
1330
1331 // Registering and deregistering workers
1332
1333 /**
1334 * Callback from ForkJoinWorkerThread to establish and record its
1335 * WorkQueue. To avoid scanning bias due to packing entries in
1336 * front of the workQueues array, we treat the array as a simple
1337 * power-of-two hash table using per-thread seed as hash,
1338 * expanding as needed.
1339 *
1340 * @param wt the worker thread
1341 * @return the worker's queue
1342 */
1343 final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
1344 UncaughtExceptionHandler handler; WorkQueue[] ws; int s, ps;
1345 wt.setDaemon(true);
1346 if ((handler = ueh) != null)
1347 wt.setUncaughtExceptionHandler(handler);
1348 do {} while (!U.compareAndSwapInt(this, INDEXSEED, s = indexSeed,
1349 s += SEED_INCREMENT) ||
1350 s == 0); // skip 0
1351 WorkQueue w = new WorkQueue(this, wt, config >>> 16, s);
1352 if (((ps = plock) & PL_LOCK) != 0 ||
1353 !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
1354 ps = acquirePlock();
1355 int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
1356 try {
1357 if ((ws = workQueues) != null) { // skip if shutting down
1358 int n = ws.length, m = n - 1;
1359 int r = (s << 1) | 1; // use odd-numbered indices
1360 if (ws[r &= m] != null) { // collision
1361 int probes = 0; // step by approx half size
1362 int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
1363 while (ws[r = (r + step) & m] != null) {
1364 if (++probes >= n) {
1368 }
1369 }
1370 }
1371 w.eventCount = w.poolIndex = r; // volatile write orders
1372 ws[r] = w;
1373 }
1374 } finally {
1375 if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
1376 releasePlock(nps);
1377 }
1378 wt.setName(workerNamePrefix.concat(Integer.toString(w.poolIndex)));
1379 return w;
1380 }
1381
1382 /**
1383 * Final callback from terminating worker, as well as upon failure
1384 * to construct or start a worker. Removes record of worker from
1385 * array, and adjusts counts. If pool is shutting down, tries to
1386 * complete termination.
1387 *
1388 * @param wt the worker thread, or null if construction failed
1389 * @param ex the exception causing failure, or null if none
1390 */
1391 final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
1392 WorkQueue w = null;
1393 if (wt != null && (w = wt.workQueue) != null) {
1394 int ps;
1395 w.qlock = -1; // ensure set
1396 long ns = w.nsteals, sc; // collect steal count
1397 do {} while (!U.compareAndSwapLong(this, STEALCOUNT,
1398 sc = stealCount, sc + ns));
1399 if (((ps = plock) & PL_LOCK) != 0 ||
1400 !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
1401 ps = acquirePlock();
1402 int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
1403 try {
1404 int idx = w.poolIndex;
1405 WorkQueue[] ws = workQueues;
1406 if (ws != null && idx >= 0 && idx < ws.length && ws[idx] == w)
1407 ws[idx] = null;
1408 } finally {
1409 if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
1410 releasePlock(nps);
1411 }
1412 }
1413
1414 long c; // adjust ctl counts
1415 do {} while (!U.compareAndSwapLong
1416 (this, CTL, c = ctl, (((c - AC_UNIT) & AC_MASK) |
1417 ((c - TC_UNIT) & TC_MASK) |
1418 (c & ~(AC_MASK|TC_MASK)))));
1419
1420 if (!tryTerminate(false, false) && w != null && w.array != null) {
1421 w.cancelAll(); // cancel remaining tasks
1422 WorkQueue[] ws; WorkQueue v; Thread p; int u, i, e;
1423 while ((u = (int)((c = ctl) >>> 32)) < 0 && (e = (int)c) >= 0) {
1424 if (e > 0) { // activate or create replacement
1425 if ((ws = workQueues) == null ||
1426 (i = e & SMASK) >= ws.length ||
1427 (v = ws[i]) == null)
1428 break;
1429 long nc = (((long)(v.nextWait & E_MASK)) |
1430 ((long)(u + UAC_UNIT) << 32));
1431 if (v.eventCount != (e | INT_SIGN))
1432 break;
1433 if (U.compareAndSwapLong(this, CTL, c, nc)) {
1434 v.eventCount = (e + E_SEQ) & E_MASK;
1435 if ((p = v.parker) != null)
1436 U.unpark(p);
1437 break;
1438 }
1439 }
1440 else {
1441 if ((short)u < 0)
1442 tryAddWorker();
1443 break;
1444 }
1445 }
1446 }
1447 if (ex == null) // help clean refs on way out
1448 ForkJoinTask.helpExpungeStaleExceptions();
1449 else // rethrow
1450 ForkJoinTask.rethrow(ex);
1451 }
1452
1453 // Submissions
1454
1455 /**
1456 * Unless shutting down, adds the given task to a submission queue
1457 * at submitter's current queue index (modulo submission
1458 * range). Only the most common path is directly handled in this
1459 * method. All others are relayed to fullExternalPush.
1460 *
1461 * @param task the task. Caller must ensure non-null.
1462 */
1463 final void externalPush(ForkJoinTask<?> task) {
1464 WorkQueue[] ws; WorkQueue q; int z, m; ForkJoinTask<?>[] a;
1465 if ((z = ThreadLocalRandom.getProbe()) != 0 && plock > 0 &&
1466 (ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
1467 (q = ws[m & z & SQMASK]) != null &&
1468 U.compareAndSwapInt(q, QLOCK, 0, 1)) { // lock
1469 int b = q.base, s = q.top, n, an;
1470 if ((a = q.array) != null && (an = a.length) > (n = s + 1 - b)) {
1471 int j = (((an - 1) & s) << ASHIFT) + ABASE;
1472 U.putOrderedObject(a, j, task);
1473 q.top = s + 1; // push on to deque
1474 q.qlock = 0;
1475 if (n <= 2)
1476 signalWork(q);
1477 return;
1478 }
1479 q.qlock = 0;
1480 }
1481 fullExternalPush(task);
1482 }
1483
1484 /**
1485 * Full version of externalPush. This method is called, among
1486 * other times, upon the first submission of the first task to the
1487 * pool, so must perform secondary initialization. It also
1488 * detects first submission by an external thread by looking up
1489 * its ThreadLocal, and creates a new shared queue if the one at
1490 * index if empty or contended. The plock lock body must be
1491 * exception-free (so no try/finally) so we optimistically
1492 * allocate new queues outside the lock and throw them away if
1493 * (very rarely) not needed.
1494 *
1495 * Secondary initialization occurs when plock is zero, to create
1496 * workQueue array and set plock to a valid value. This lock body
1497 * must also be exception-free. Because the plock seq value can
1498 * eventually wrap around zero, this method harmlessly fails to
1499 * reinitialize if workQueues exists, while still advancing plock.
1500 */
1501 private void fullExternalPush(ForkJoinTask<?> task) {
1502 int r;
1503 if ((r = ThreadLocalRandom.getProbe()) == 0) {
1504 ThreadLocalRandom.localInit();
1505 r = ThreadLocalRandom.getProbe();
1506 }
1507 for (;;) {
1508 WorkQueue[] ws; WorkQueue q; int ps, m, k;
1509 boolean move = false;
1510 if ((ps = plock) < 0)
1511 throw new RejectedExecutionException();
1512 else if (ps == 0 || (ws = workQueues) == null ||
1513 (m = ws.length - 1) < 0) { // initialize workQueues
1514 int p = config & SMASK; // find power of two table size
1515 int n = (p > 1) ? p - 1 : 1; // ensure at least 2 slots
1516 n |= n >>> 1; n |= n >>> 2; n |= n >>> 4;
1517 n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
1518 WorkQueue[] nws = ((ws = workQueues) == null || ws.length == 0 ?
1519 new WorkQueue[n] : null);
1520 if (((ps = plock) & PL_LOCK) != 0 ||
1521 !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
1522 ps = acquirePlock();
1523 if (((ws = workQueues) == null || ws.length == 0) && nws != null)
1524 workQueues = nws;
1525 int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
1526 if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
1527 releasePlock(nps);
1528 }
1529 else if ((q = ws[k = r & m & SQMASK]) != null) {
1530 if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
1531 ForkJoinTask<?>[] a = q.array;
1532 int s = q.top;
1533 boolean submitted = false;
1534 try { // locked version of push
1535 if ((a != null && a.length > s + 1 - q.base) ||
1536 (a = q.growArray()) != null) { // must presize
1537 int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
1538 U.putOrderedObject(a, j, task);
1539 q.top = s + 1;
1540 submitted = true;
1541 }
1542 } finally {
1543 q.qlock = 0; // unlock
1544 }
1545 if (submitted) {
1546 signalWork(q);
1547 return;
1548 }
1549 }
1550 move = true; // move on failure
1551 }
1552 else if (((ps = plock) & PL_LOCK) == 0) { // create new queue
1553 q = new WorkQueue(this, null, SHARED_QUEUE, r);
1554 if (((ps = plock) & PL_LOCK) != 0 ||
1555 !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
1556 ps = acquirePlock();
1557 if ((ws = workQueues) != null && k < ws.length && ws[k] == null)
1558 ws[k] = q;
1559 int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
1560 if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
1561 releasePlock(nps);
1562 }
1563 else
1564 move = true; // move if busy
1565 if (move)
1566 r = ThreadLocalRandom.advanceProbe(r);
1567 }
1568 }
1569
1570 // Maintaining ctl counts
1571
1572 /**
1573 * Increments active count; mainly called upon return from blocking.
1574 */
1575 final void incrementActiveCount() {
1576 long c;
1577 do {} while (!U.compareAndSwapLong(this, CTL, c = ctl, c + AC_UNIT));
1578 }
1579
1580 /**
1581 * Tries to create or activate a worker if too few are active.
1582 *
1583 * @param q the (non-null) queue holding tasks to be signalled
1584 */
1585 final void signalWork(WorkQueue q) {
1586 int hint = q.poolIndex;
1637 * completing the sweep. If the worker is not inactivated, it
1638 * takes and returns a task from this queue. Otherwise, if not
1639 * activated, it signals workers (that may include itself) and
1640 * returns so caller can retry. Also returns for true if the
1641 * worker array may have changed during an empty scan. On failure
1642 * to find a task, we take one of the following actions, after
1643 * which the caller will retry calling this method unless
1644 * terminated.
1645 *
1646 * * If pool is terminating, terminate the worker.
1647 *
1648 * * If not already enqueued, try to inactivate and enqueue the
1649 * worker on wait queue. Or, if inactivating has caused the pool
1650 * to be quiescent, relay to idleAwaitWork to possibly shrink
1651 * pool.
1652 *
1653 * * If already enqueued and none of the above apply, possibly
1654 * park awaiting signal, else lingering to help scan and signal.
1655 *
1656 * * If a non-empty queue discovered or left as a hint,
1657 * help wake up other workers before return.
1658 *
1659 * @param w the worker (via its WorkQueue)
1660 * @return a task or null if none found
1661 */
1662 private final ForkJoinTask<?> scan(WorkQueue w) {
1663 WorkQueue[] ws; int m;
1664 int ps = plock; // read plock before ws
1665 if (w != null && (ws = workQueues) != null && (m = ws.length - 1) >= 0) {
1666 int ec = w.eventCount; // ec is negative if inactive
1667 int r = w.seed; r ^= r << 13; r ^= r >>> 17; w.seed = r ^= r << 5;
1668 w.hint = -1; // update seed and clear hint
1669 int j = ((m + m + 1) | MIN_SCAN) & MAX_SCAN;
1670 do {
1671 WorkQueue q; ForkJoinTask<?>[] a; int b;
1672 if ((q = ws[(r + j) & m]) != null && (b = q.base) - q.top < 0 &&
1673 (a = q.array) != null) { // probably nonempty
1674 int i = (((a.length - 1) & b) << ASHIFT) + ABASE;
1675 ForkJoinTask<?> t = (ForkJoinTask<?>)
1676 U.getObjectVolatile(a, i);
1677 if (q.base == b && ec >= 0 && t != null &&
1692 if (U.compareAndSwapLong(this, STEALCOUNT,
1693 sc = stealCount, sc + ns))
1694 w.nsteals = 0; // collect steals and rescan
1695 }
1696 else if (plock != ps) // consistency check
1697 ; // skip
1698 else if ((e = (int)(c = ctl)) < 0)
1699 w.qlock = -1; // pool is terminating
1700 else {
1701 if ((h = w.hint) < 0) {
1702 if (ec >= 0) { // try to enqueue/inactivate
1703 long nc = (((long)ec |
1704 ((c - AC_UNIT) & (AC_MASK|TC_MASK))));
1705 w.nextWait = e; // link and mark inactive
1706 w.eventCount = ec | INT_SIGN;
1707 if (ctl != c || !U.compareAndSwapLong(this, CTL, c, nc))
1708 w.eventCount = ec; // unmark on CAS failure
1709 else if ((int)(c >> AC_SHIFT) == 1 - (config & SMASK))
1710 idleAwaitWork(w, nc, c);
1711 }
1712 else if (w.eventCount < 0 && ctl == c) {
1713 Thread wt = Thread.currentThread();
1714 Thread.interrupted(); // clear status
1715 U.putObject(wt, PARKBLOCKER, this);
1716 w.parker = wt; // emulate LockSupport.park
1717 if (w.eventCount < 0) // recheck
1718 U.park(false, 0L); // block
1719 w.parker = null;
1720 U.putObject(wt, PARKBLOCKER, null);
1721 }
1722 }
1723 if ((h >= 0 || (h = w.hint) >= 0) &&
1724 (ws = workQueues) != null && h < ws.length &&
1725 (q = ws[h]) != null) { // signal others before retry
1726 WorkQueue v; Thread p; int u, i, s;
1727 for (int n = (config & SMASK) - 1;;) {
1728 int idleCount = (w.eventCount < 0) ? 0 : -1;
1729 if (((s = idleCount - q.base + q.top) <= n &&
1730 (n = s) <= 0) ||
1731 (u = (int)((c = ctl) >>> 32)) >= 0 ||
1732 (e = (int)c) <= 0 || m < (i = e & SMASK) ||
1733 (v = ws[i]) == null)
1734 break;
1735 long nc = (((long)(v.nextWait & E_MASK)) |
1736 ((long)(u + UAC_UNIT) << 32));
1737 if (v.eventCount != (e | INT_SIGN) ||
1738 !U.compareAndSwapLong(this, CTL, c, nc))
1739 break;
1740 v.hint = h;
1741 v.eventCount = (e + E_SEQ) & E_MASK;
1742 if ((p = v.parker) != null)
1743 U.unpark(p);
1744 if (--n <= 0)
1745 break;
1746 }
1747 }
1748 }
1749 }
1750 return null;
1751 }
1752
1753 /**
1754 * If inactivating worker w has caused the pool to become
1755 * quiescent, checks for pool termination, and, so long as this is
1756 * not the only worker, waits for event for up to a given
1757 * duration. On timeout, if ctl has not changed, terminates the
1758 * worker, which will in turn wake up another worker to possibly
1759 * repeat this process.
1760 *
1761 * @param w the calling worker
1762 * @param currentCtl the ctl value triggering possible quiescence
1763 * @param prevCtl the ctl value to restore if thread is terminated
1764 */
1765 private void idleAwaitWork(WorkQueue w, long currentCtl, long prevCtl) {
1766 if (w != null && w.eventCount < 0 &&
1767 !tryTerminate(false, false) && (int)prevCtl != 0 &&
1768 ctl == currentCtl) {
1769 int dc = -(short)(currentCtl >>> TC_SHIFT);
1770 long parkTime = dc < 0 ? FAST_IDLE_TIMEOUT: (dc + 1) * IDLE_TIMEOUT;
1771 long deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
1772 Thread wt = Thread.currentThread();
1773 while (ctl == currentCtl) {
1774 Thread.interrupted(); // timed variant of version in scan()
1775 U.putObject(wt, PARKBLOCKER, this);
1776 w.parker = wt;
1777 if (ctl == currentCtl)
1778 U.park(false, parkTime);
1779 w.parker = null;
1780 U.putObject(wt, PARKBLOCKER, null);
1781 if (ctl != currentCtl)
1782 break;
1783 if (deadline - System.nanoTime() <= 0L &&
1784 U.compareAndSwapLong(this, CTL, currentCtl, prevCtl)) {
1785 w.eventCount = (w.eventCount + E_SEQ) | E_MASK;
1786 w.hint = -1;
1787 w.qlock = -1; // shrink
1788 break;
1789 }
1790 }
1791 }
1792 }
1793
1794 /**
1795 * Scans through queues looking for work while joining a task; if
1796 * any present, signals. May return early if more signalling is
1797 * detectably unneeded.
1798 *
1799 * @param task return early if done
1800 * @param origin an index to start scan
1801 */
1802 private void helpSignal(ForkJoinTask<?> task, int origin) {
1803 WorkQueue[] ws; WorkQueue w; Thread p; long c; int m, u, e, i, s;
1804 if (task != null && task.status >= 0 &&
1805 (u = (int)(ctl >>> 32)) < 0 && (u >> UAC_SHIFT) < 0 &&
1806 (ws = workQueues) != null && (m = ws.length - 1) >= 0) {
1908 else {
1909 subtask = next;
1910 j = v;
1911 break;
1912 }
1913 }
1914 }
1915 }
1916 }
1917 }
1918 return stat;
1919 }
1920
1921 /**
1922 * Analog of tryHelpStealer for CountedCompleters. Tries to steal
1923 * and run tasks within the target's computation.
1924 *
1925 * @param task the task to join
1926 * @param mode if shared, exit upon completing any task
1927 * if all workers are active
1928 */
1929 private int helpComplete(ForkJoinTask<?> task, int mode) {
1930 WorkQueue[] ws; WorkQueue q; int m, n, s, u;
1931 if (task != null && (ws = workQueues) != null &&
1932 (m = ws.length - 1) >= 0) {
1933 for (int j = 1, origin = j;;) {
1934 if ((s = task.status) < 0)
1935 return s;
1936 if ((q = ws[j & m]) != null && q.pollAndExecCC(task)) {
1937 origin = j;
1938 if (mode == SHARED_QUEUE &&
1939 ((u = (int)(ctl >>> 32)) >= 0 || (u >> UAC_SHIFT) >= 0))
1940 break;
1941 }
1942 else if ((j = (j + 2) & m) == origin)
1943 break;
1944 }
1945 }
1946 return 0;
1947 }
2059 ForkJoinTask<?> prevJoin = joiner.currentJoin;
2060 joiner.currentJoin = task;
2061 do {} while ((s = task.status) >= 0 && !joiner.isEmpty() &&
2062 joiner.tryRemoveAndExec(task));
2063 if (s >= 0 && (s = task.status) >= 0) {
2064 helpSignal(task, joiner.poolIndex);
2065 if ((s = task.status) >= 0 &&
2066 (task instanceof CountedCompleter))
2067 s = helpComplete(task, LIFO_QUEUE);
2068 }
2069 if (s >= 0 && joiner.isEmpty()) {
2070 do {} while (task.status >= 0 &&
2071 tryHelpStealer(joiner, task) > 0);
2072 }
2073 joiner.currentJoin = prevJoin;
2074 }
2075 }
2076
2077 /**
2078 * Returns a (probably) non-empty steal queue, if one is found
2079 * during a scan, else null. This method must be retried by
2080 * caller if, by the time it tries to use the queue, it is empty.
2081 * @param r a (random) seed for scanning
2082 */
2083 private WorkQueue findNonEmptyStealQueue(int r) {
2084 for (;;) {
2085 int ps = plock, m; WorkQueue[] ws; WorkQueue q;
2086 if ((ws = workQueues) != null && (m = ws.length - 1) >= 0) {
2087 for (int j = (m + 1) << 2; j >= 0; --j) {
2088 if ((q = ws[(((r + j) << 1) | 1) & m]) != null &&
2089 q.base - q.top < 0)
2090 return q;
2091 }
2092 }
2093 if (plock == ps)
2094 return null;
2095 }
2096 }
2097
2098 /**
2099 * Runs tasks until {@code isQuiescent()}. We piggyback on
2100 * active count ctl maintenance, but rather than blocking
2101 * when tasks cannot be found, we rescan until all others cannot
2102 * find tasks either.
2103 */
2104 final void helpQuiescePool(WorkQueue w) {
2105 for (boolean active = true;;) {
2106 long c; WorkQueue q; ForkJoinTask<?> t; int b;
2107 while ((t = w.nextLocalTask()) != null) {
2108 if (w.base - w.top < 0)
2109 signalWork(w);
2110 t.doExec();
2111 }
2112 if ((q = findNonEmptyStealQueue(w.nextSeed())) != null) {
2113 if (!active) { // re-establish active count
2114 active = true;
2115 do {} while (!U.compareAndSwapLong
2116 (this, CTL, c = ctl, c + AC_UNIT));
2117 }
2118 if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) {
2119 if (q.base - q.top < 0)
2120 signalWork(q);
2121 w.runSubtask(t);
2122 }
2123 }
2124 else if (active) { // decrement active count without queuing
2125 long nc = (c = ctl) - AC_UNIT;
2126 if ((int)(nc >> AC_SHIFT) + (config & SMASK) == 0)
2127 return; // bypass decrement-then-increment
2128 if (U.compareAndSwapLong(this, CTL, c, nc))
2129 active = false;
2130 }
2131 else if ((int)((c = ctl) >> AC_SHIFT) + (config & SMASK) == 0 &&
2132 U.compareAndSwapLong(this, CTL, c, c + AC_UNIT))
2133 return;
2134 }
2135 }
2136
2137 /**
2138 * Gets and removes a local or stolen task for the given worker.
2139 *
2140 * @return a task, if available
2141 */
2142 final ForkJoinTask<?> nextTaskFor(WorkQueue w) {
2143 for (ForkJoinTask<?> t;;) {
2144 WorkQueue q; int b;
2145 if ((t = w.nextLocalTask()) != null)
2146 return t;
2147 if ((q = findNonEmptyStealQueue(w.nextSeed())) == null)
2148 return null;
2149 if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) {
2150 if (q.base - q.top < 0)
2151 signalWork(q);
2152 return t;
2153 }
2154 }
2155 }
2156
2157 /**
2158 * Returns a cheap heuristic guide for task partitioning when
2159 * programmers, frameworks, tools, or languages have little or no
2160 * idea about task granularity. In essence by offering this
2161 * method, we ask users only about tradeoffs in overhead vs
2162 * expected throughput and its variance, rather than how finely to
2163 * partition tasks.
2164 *
2165 * In a steady state strict (tree-structured) computation, each
2166 * thread makes available for stealing enough tasks for other
2167 * threads to remain active. Inductively, if all threads play by
2168 * the same rules, each thread should make available only a
2169 * constant number of tasks.
2170 *
2171 * The minimum useful constant is just 1. But using a value of 1
2172 * would require immediate replenishment upon each steal to
2173 * maintain enough tasks, which is infeasible. Further,
2174 * partitionings/granularities of offered tasks should minimize
2175 * steal rates, which in general means that threads nearer the top
2176 * of computation tree should generate more than those nearer the
2177 * bottom. In perfect steady state, each thread is at
2178 * approximately the same level of computation tree. However,
2179 * producing extra tasks amortizes the uncertainty of progress and
2180 * diffusion assumptions.
2181 *
2182 * So, users will want to use values larger (but not much larger)
2183 * than 1 to both smooth over transient shortages and hedge
2184 * against uneven progress; as traded off against the cost of
2185 * extra task overhead. We leave the user to pick a threshold
2186 * value to compare with the results of this call to guide
2187 * decisions, but recommend values such as 3.
2188 *
2189 * When all threads are active, it is on average OK to estimate
2190 * surplus strictly locally. In steady-state, if one thread is
2191 * maintaining say 2 surplus tasks, then so are others. So we can
2192 * just use estimated queue length. However, this strategy alone
2193 * leads to serious mis-estimates in some non-steady-state
2194 * conditions (ramp-up, ramp-down, other stalls). We can detect
2195 * many of these by further considering the number of "idle"
2196 * threads, that are known to have zero queued tasks, so
2197 * compensate by a factor of (#idle/#active) threads.
2198 *
2199 * Note: The approximation of #busy workers as #active workers is
2200 * not very good under current signalling scheme, and should be
2201 * improved.
2202 */
2215 return 0;
2216 }
2217
2218 // Termination
2219
2220 /**
2221 * Possibly initiates and/or completes termination. The caller
2222 * triggering termination runs three passes through workQueues:
2223 * (0) Setting termination status, followed by wakeups of queued
2224 * workers; (1) cancelling all tasks; (2) interrupting lagging
2225 * threads (likely in external tasks, but possibly also blocked in
2226 * joins). Each pass repeats previous steps because of potential
2227 * lagging thread creation.
2228 *
2229 * @param now if true, unconditionally terminate, else only
2230 * if no work and no active workers
2231 * @param enable if true, enable shutdown when next possible
2232 * @return true if now terminating or terminated
2233 */
2234 private boolean tryTerminate(boolean now, boolean enable) {
2235 int ps;
2236 if (this == common) // cannot shut down
2237 return false;
2238 if ((ps = plock) >= 0) { // enable by setting plock
2239 if (!enable)
2240 return false;
2241 if ((ps & PL_LOCK) != 0 ||
2242 !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
2243 ps = acquirePlock();
2244 int nps = ((ps + PL_LOCK) & ~SHUTDOWN) | SHUTDOWN;
2245 if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
2246 releasePlock(nps);
2247 }
2248 for (long c;;) {
2249 if (((c = ctl) & STOP_BIT) != 0) { // already terminating
2250 if ((short)(c >>> TC_SHIFT) == -(config & SMASK)) {
2251 synchronized (this) {
2252 notifyAll(); // signal when 0 workers
2253 }
2254 }
2255 return true;
2256 }
2257 if (!now) { // check if idle & no tasks
2258 WorkQueue[] ws; WorkQueue w;
2259 if ((int)(c >> AC_SHIFT) != -(config & SMASK))
2260 return false;
2261 if ((ws = workQueues) != null) {
2262 for (int i = 0; i < ws.length; ++i) {
2263 if ((w = ws[i]) != null) {
2264 if (!w.isEmpty()) { // signal unprocessed tasks
2265 signalWork(w);
2266 return false;
2267 }
2268 if ((i & 1) != 0 && w.eventCount >= 0)
2269 return false; // unqueued inactive worker
2270 }
2271 }
2272 }
2273 }
2274 if (U.compareAndSwapLong(this, CTL, c, c | STOP_BIT)) {
2275 for (int pass = 0; pass < 3; ++pass) {
2276 WorkQueue[] ws; WorkQueue w; Thread wt;
2277 if ((ws = workQueues) != null) {
2278 int n = ws.length;
2279 for (int i = 0; i < n; ++i) {
2280 if ((w = ws[i]) != null) {
2281 w.qlock = -1;
2282 if (pass > 0) {
2283 w.cancelAll();
2284 if (pass > 1 && (wt = w.owner) != null) {
2285 if (!wt.isInterrupted()) {
2286 try {
2287 wt.interrupt();
2288 } catch (Throwable ignore) {
2289 }
2290 }
2291 U.unpark(wt);
2292 }
2293 }
2294 }
2295 }
2296 // Wake up workers parked on event queue
2297 int i, e; long cc; Thread p;
2298 while ((e = (int)(cc = ctl) & E_MASK) != 0 &&
2299 (i = e & SMASK) < n && i >= 0 &&
2300 (w = ws[i]) != null) {
2301 long nc = ((long)(w.nextWait & E_MASK) |
2302 ((cc + AC_UNIT) & AC_MASK) |
2303 (cc & (TC_MASK|STOP_BIT)));
2304 if (w.eventCount == (e | INT_SIGN) &&
2305 U.compareAndSwapLong(this, CTL, cc, nc)) {
2306 w.eventCount = (e + E_SEQ) & E_MASK;
2307 w.qlock = -1;
2308 if ((p = w.parker) != null)
2309 U.unpark(p);
2310 }
2311 }
2312 }
2313 }
2314 }
2315 }
2316 }
2317
2318 // external operations on common pool
2319
2320 /**
2321 * Returns common pool queue for a thread that has submitted at
2322 * least one task.
2323 */
2324 static WorkQueue commonSubmitterQueue() {
2325 ForkJoinPool p; WorkQueue[] ws; int m, z;
2326 return ((z = ThreadLocalRandom.getProbe()) != 0 &&
2327 (p = common) != null &&
2328 (ws = p.workQueues) != null &&
2329 (m = ws.length - 1) >= 0) ?
2330 ws[m & z & SQMASK] : null;
2331 }
2332
2333 /**
2334 * Tries to pop the given task from submitter's queue in common pool.
2335 */
2336 static boolean tryExternalUnpush(ForkJoinTask<?> t) {
2337 ForkJoinPool p; WorkQueue[] ws; WorkQueue q;
2338 ForkJoinTask<?>[] a; int m, s, z;
2339 if (t != null &&
2340 (z = ThreadLocalRandom.getProbe()) != 0 &&
2341 (p = common) != null &&
2342 (ws = p.workQueues) != null &&
2343 (m = ws.length - 1) >= 0 &&
2344 (q = ws[m & z & SQMASK]) != null &&
2345 (s = q.top) != q.base &&
2346 (a = q.array) != null) {
2347 long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE;
2348 if (U.getObject(a, j) == t &&
2349 U.compareAndSwapInt(q, QLOCK, 0, 1)) {
2350 if (q.array == a && q.top == s && // recheck
2351 U.compareAndSwapObject(a, j, t, null)) {
2352 q.top = s - 1;
2353 q.qlock = 0;
2354 return true;
2355 }
2356 q.qlock = 0;
2357 }
2358 }
2359 return false;
2360 }
2361
2362 /**
2363 * Tries to pop and run local tasks within the same computation
2364 * as the given root. On failure, tries to help complete from
2376 (o instanceof CountedCompleter)) {
2377 CountedCompleter<?> t = (CountedCompleter<?>)o, r = t;
2378 do {
2379 if (r == root) {
2380 if (U.compareAndSwapInt(q, QLOCK, 0, 1)) {
2381 if (q.array == a && q.top == s &&
2382 U.compareAndSwapObject(a, j, t, null)) {
2383 q.top = s - 1;
2384 task = t;
2385 }
2386 q.qlock = 0;
2387 }
2388 break;
2389 }
2390 } while ((r = r.completer) != null);
2391 }
2392 }
2393 if (task != null)
2394 task.doExec();
2395 if (root.status < 0 ||
2396 (config != 0 &&
2397 ((u = (int)(ctl >>> 32)) >= 0 || (u >> UAC_SHIFT) >= 0)))
2398 break;
2399 if (task == null) {
2400 helpSignal(root, q.poolIndex);
2401 if (root.status >= 0)
2402 helpComplete(root, SHARED_QUEUE);
2403 break;
2404 }
2405 }
2406 }
2407 }
2408
2409 /**
2410 * Tries to help execute or signal availability of the given task
2411 * from submitter's queue in common pool.
2412 */
2413 static void externalHelpJoin(ForkJoinTask<?> t) {
2414 // Some hard-to-avoid overlap with tryExternalUnpush
2415 ForkJoinPool p; WorkQueue[] ws; WorkQueue q, w;
2416 ForkJoinTask<?>[] a; int m, s, n, z;
2417 if (t != null &&
2418 (z = ThreadLocalRandom.getProbe()) != 0 &&
2419 (p = common) != null &&
2420 (ws = p.workQueues) != null &&
2421 (m = ws.length - 1) >= 0 &&
2422 (q = ws[m & z & SQMASK]) != null &&
2423 (a = q.array) != null) {
2424 int am = a.length - 1;
2425 if ((s = q.top) != q.base) {
2426 long j = ((am & (s - 1)) << ASHIFT) + ABASE;
2427 if (U.getObject(a, j) == t &&
2428 U.compareAndSwapInt(q, QLOCK, 0, 1)) {
2429 if (q.array == a && q.top == s &&
2430 U.compareAndSwapObject(a, j, t, null)) {
2431 q.top = s - 1;
2432 q.qlock = 0;
2433 t.doExec();
2434 }
2435 else
2436 q.qlock = 0;
2437 }
2438 }
2439 if (t.status >= 0) {
2440 if (t instanceof CountedCompleter)
2441 p.externalHelpComplete(q, t);
2442 else
2443 p.helpSignal(t, q.poolIndex);
2444 }
2445 }
2446 }
2447
2448 // Exported methods
2449
2450 // Constructors
2451
2452 /**
2453 * Creates a {@code ForkJoinPool} with parallelism equal to {@link
2454 * java.lang.Runtime#availableProcessors}, using the {@linkplain
2455 * #defaultForkJoinWorkerThreadFactory default thread factory},
2456 * no UncaughtExceptionHandler, and non-async LIFO processing mode.
2457 *
2458 * @throws SecurityException if a security manager exists and
2459 * the caller is not permitted to modify threads
2460 * because it does not hold {@link
2461 * java.lang.RuntimePermission}{@code ("modifyThread")}
2462 */
2463 public ForkJoinPool() {
2464 this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
2465 defaultForkJoinWorkerThreadFactory, null, false);
2466 }
2467
2468 /**
2469 * Creates a {@code ForkJoinPool} with the indicated parallelism
2470 * level, the {@linkplain
2471 * #defaultForkJoinWorkerThreadFactory default thread factory},
2472 * no UncaughtExceptionHandler, and non-async LIFO processing mode.
2473 *
2474 * @param parallelism the parallelism level
2475 * @throws IllegalArgumentException if parallelism less than or
2476 * equal to zero, or greater than implementation limit
2477 * @throws SecurityException if a security manager exists and
2478 * the caller is not permitted to modify threads
2479 * because it does not hold {@link
2480 * java.lang.RuntimePermission}{@code ("modifyThread")}
2481 */
2482 public ForkJoinPool(int parallelism) {
2483 this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
2484 }
2492 * use {@link #defaultForkJoinWorkerThreadFactory}.
2493 * @param handler the handler for internal worker threads that
2494 * terminate due to unrecoverable errors encountered while executing
2495 * tasks. For default value, use {@code null}.
2496 * @param asyncMode if true,
2497 * establishes local first-in-first-out scheduling mode for forked
2498 * tasks that are never joined. This mode may be more appropriate
2499 * than default locally stack-based mode in applications in which
2500 * worker threads only process event-style asynchronous tasks.
2501 * For default value, use {@code false}.
2502 * @throws IllegalArgumentException if parallelism less than or
2503 * equal to zero, or greater than implementation limit
2504 * @throws NullPointerException if the factory is null
2505 * @throws SecurityException if a security manager exists and
2506 * the caller is not permitted to modify threads
2507 * because it does not hold {@link
2508 * java.lang.RuntimePermission}{@code ("modifyThread")}
2509 */
2510 public ForkJoinPool(int parallelism,
2511 ForkJoinWorkerThreadFactory factory,
2512 UncaughtExceptionHandler handler,
2513 boolean asyncMode) {
2514 this(checkParallelism(parallelism),
2515 checkFactory(factory),
2516 handler,
2517 asyncMode,
2518 "ForkJoinPool-" + nextPoolId() + "-worker-");
2519 checkPermission();
2520 }
2521
2522 private static int checkParallelism(int parallelism) {
2523 if (parallelism <= 0 || parallelism > MAX_CAP)
2524 throw new IllegalArgumentException();
2525 return parallelism;
2526 }
2527
2528 private static ForkJoinWorkerThreadFactory checkFactory
2529 (ForkJoinWorkerThreadFactory factory) {
2530 if (factory == null)
2531 throw new NullPointerException();
2532 return factory;
2533 }
2534
2535 /**
2536 * Creates a {@code ForkJoinPool} with the given parameters, without
2537 * any security checks or parameter validation. Invoked directly by
2538 * makeCommonPool.
2539 */
2540 private ForkJoinPool(int parallelism,
2541 ForkJoinWorkerThreadFactory factory,
2542 UncaughtExceptionHandler handler,
2543 boolean asyncMode,
2544 String workerNamePrefix) {
2545 this.workerNamePrefix = workerNamePrefix;
2546 this.factory = factory;
2547 this.ueh = handler;
2548 this.config = parallelism | (asyncMode ? (FIFO_QUEUE << 16) : 0);
2549 long np = (long)(-parallelism); // offset ctl counts
2550 this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
2551 }
2552
2553 /**
2554 * Returns the common pool instance. This pool is statically
2555 * constructed; its run state is unaffected by attempts to {@link
2556 * #shutdown} or {@link #shutdownNow}. However this pool and any
2557 * ongoing processing are automatically terminated upon program
2558 * {@link System#exit}. Any program that relies on asynchronous
2559 * task processing to complete before program termination should
2560 * invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence},
2561 * before exit.
2562 *
2563 * @return the common pool instance
2564 * @since 1.8
2565 */
2566 public static ForkJoinPool commonPool() {
2567 // assert common != null : "static init error";
2568 return common;
2569 }
2570
2571 // Execution methods
2572
2573 /**
2574 * Performs the given task, returning its result upon completion.
2575 * If the computation encounters an unchecked Exception or Error,
2576 * it is rethrown as the outcome of this invocation. Rethrown
2577 * exceptions behave in the same way as regular exceptions, but,
2578 * when possible, contain stack traces (as displayed for example
2579 * using {@code ex.printStackTrace()}) of both the current thread
2580 * as well as the thread actually encountering the exception;
2581 * minimally only the latter.
2582 *
2583 * @param task the task
2584 * @return the task's result
2585 * @throws NullPointerException if the task is null
2586 * @throws RejectedExecutionException if the task cannot be
2587 * scheduled for execution
2588 */
2604 public void execute(ForkJoinTask<?> task) {
2605 if (task == null)
2606 throw new NullPointerException();
2607 externalPush(task);
2608 }
2609
2610 // AbstractExecutorService methods
2611
2612 /**
2613 * @throws NullPointerException if the task is null
2614 * @throws RejectedExecutionException if the task cannot be
2615 * scheduled for execution
2616 */
2617 public void execute(Runnable task) {
2618 if (task == null)
2619 throw new NullPointerException();
2620 ForkJoinTask<?> job;
2621 if (task instanceof ForkJoinTask<?>) // avoid re-wrap
2622 job = (ForkJoinTask<?>) task;
2623 else
2624 job = new ForkJoinTask.RunnableExecuteAction(task);
2625 externalPush(job);
2626 }
2627
2628 /**
2629 * Submits a ForkJoinTask for execution.
2630 *
2631 * @param task the task to submit
2632 * @return the task
2633 * @throws NullPointerException if the task is null
2634 * @throws RejectedExecutionException if the task cannot be
2635 * scheduled for execution
2636 */
2637 public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
2638 if (task == null)
2639 throw new NullPointerException();
2640 externalPush(task);
2641 return task;
2642 }
2643
2644 /**
2671 public ForkJoinTask<?> submit(Runnable task) {
2672 if (task == null)
2673 throw new NullPointerException();
2674 ForkJoinTask<?> job;
2675 if (task instanceof ForkJoinTask<?>) // avoid re-wrap
2676 job = (ForkJoinTask<?>) task;
2677 else
2678 job = new ForkJoinTask.AdaptedRunnableAction(task);
2679 externalPush(job);
2680 return job;
2681 }
2682
2683 /**
2684 * @throws NullPointerException {@inheritDoc}
2685 * @throws RejectedExecutionException {@inheritDoc}
2686 */
2687 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
2688 // In previous versions of this class, this method constructed
2689 // a task to run ForkJoinTask.invokeAll, but now external
2690 // invocation of multiple tasks is at least as efficient.
2691 ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
2692
2693 boolean done = false;
2694 try {
2695 for (Callable<T> t : tasks) {
2696 ForkJoinTask<T> f = new ForkJoinTask.AdaptedCallable<T>(t);
2697 futures.add(f);
2698 externalPush(f);
2699 }
2700 for (int i = 0, size = futures.size(); i < size; i++)
2701 ((ForkJoinTask<?>)futures.get(i)).quietlyJoin();
2702 done = true;
2703 return futures;
2704 } finally {
2705 if (!done)
2706 for (int i = 0, size = futures.size(); i < size; i++)
2707 futures.get(i).cancel(false);
2708 }
2709 }
2710
2711 /**
2712 * Returns the factory used for constructing new workers.
2713 *
2714 * @return the factory used for constructing new workers
2715 */
2716 public ForkJoinWorkerThreadFactory getFactory() {
2717 return factory;
2718 }
2719
2720 /**
2721 * Returns the handler for internal worker threads that terminate
2722 * due to unrecoverable errors encountered while executing tasks.
2723 *
2724 * @return the handler, or {@code null} if none
2725 */
2726 public UncaughtExceptionHandler getUncaughtExceptionHandler() {
2727 return ueh;
2728 }
2729
2730 /**
2731 * Returns the targeted parallelism level of this pool.
2732 *
2733 * @return the targeted parallelism level of this pool
2734 */
2735 public int getParallelism() {
2736 int par = (config & SMASK);
2737 return (par > 0) ? par : 1;
2738 }
2739
2740 /**
2741 * Returns the targeted parallelism level of the common pool.
2742 *
2743 * @return the targeted parallelism level of the common pool
2744 * @since 1.8
2745 */
2746 public static int getCommonPoolParallelism() {
2747 return commonParallelism;
2748 }
2749
2750 /**
2751 * Returns the number of worker threads that have started but not
2752 * yet terminated. The result returned by this method may differ
2753 * from {@link #getParallelism} when threads are created to
2754 * maintain parallelism when others are cooperatively blocked.
2755 *
2756 * @return the number of worker threads
2757 */
2758 public int getPoolSize() {
2759 return (config & SMASK) + (short)(ctl >>> TC_SHIFT);
2760 }
2761
2762 /**
2763 * Returns {@code true} if this pool uses local first-in-first-out
2764 * scheduling mode for forked tasks that are never joined.
2765 *
2766 * @return {@code true} if this pool uses async mode
2767 */
2985 if ((c & STOP_BIT) != 0)
2986 level = (tc == 0) ? "Terminated" : "Terminating";
2987 else
2988 level = plock < 0 ? "Shutting down" : "Running";
2989 return super.toString() +
2990 "[" + level +
2991 ", parallelism = " + pc +
2992 ", size = " + tc +
2993 ", active = " + ac +
2994 ", running = " + rc +
2995 ", steals = " + st +
2996 ", tasks = " + qt +
2997 ", submissions = " + qs +
2998 "]";
2999 }
3000
3001 /**
3002 * Possibly initiates an orderly shutdown in which previously
3003 * submitted tasks are executed, but no new tasks will be
3004 * accepted. Invocation has no effect on execution state if this
3005 * is the {@link #commonPool()}, and no additional effect if
3006 * already shut down. Tasks that are in the process of being
3007 * submitted concurrently during the course of this method may or
3008 * may not be rejected.
3009 *
3010 * @throws SecurityException if a security manager exists and
3011 * the caller is not permitted to modify threads
3012 * because it does not hold {@link
3013 * java.lang.RuntimePermission}{@code ("modifyThread")}
3014 */
3015 public void shutdown() {
3016 checkPermission();
3017 tryTerminate(false, true);
3018 }
3019
3020 /**
3021 * Possibly attempts to cancel and/or stop all tasks, and reject
3022 * all subsequently submitted tasks. Invocation has no effect on
3023 * execution state if this is the {@link #commonPool()}, and no
3024 * additional effect if already shut down. Otherwise, tasks that
3025 * are in the process of being submitted or executed concurrently
3026 * during the course of this method may or may not be
3027 * rejected. This method cancels both existing and unexecuted
3028 * tasks, in order to permit termination in the presence of task
3029 * dependencies. So the method always returns an empty list
3030 * (unlike the case for some other Executors).
3031 *
3032 * @return an empty list
3033 * @throws SecurityException if a security manager exists and
3034 * the caller is not permitted to modify threads
3035 * because it does not hold {@link
3036 * java.lang.RuntimePermission}{@code ("modifyThread")}
3037 */
3038 public List<Runnable> shutdownNow() {
3039 checkPermission();
3040 tryTerminate(true, true);
3041 return Collections.emptyList();
3042 }
3043
3066 * @return {@code true} if terminating but not yet terminated
3067 */
3068 public boolean isTerminating() {
3069 long c = ctl;
3070 return ((c & STOP_BIT) != 0L &&
3071 (short)(c >>> TC_SHIFT) != -(config & SMASK));
3072 }
3073
3074 /**
3075 * Returns {@code true} if this pool has been shut down.
3076 *
3077 * @return {@code true} if this pool has been shut down
3078 */
3079 public boolean isShutdown() {
3080 return plock < 0;
3081 }
3082
3083 /**
3084 * Blocks until all tasks have completed execution after a
3085 * shutdown request, or the timeout occurs, or the current thread
3086 * is interrupted, whichever happens first. Because the {@link
3087 * #commonPool()} never terminates until program shutdown, when
3088 * applied to the common pool, this method is equivalent to {@link
3089 * #awaitQuiescence(long, TimeUnit)} but always returns {@code false}.
3090 *
3091 * @param timeout the maximum time to wait
3092 * @param unit the time unit of the timeout argument
3093 * @return {@code true} if this executor terminated and
3094 * {@code false} if the timeout elapsed before termination
3095 * @throws InterruptedException if interrupted while waiting
3096 */
3097 public boolean awaitTermination(long timeout, TimeUnit unit)
3098 throws InterruptedException {
3099 if (Thread.interrupted())
3100 throw new InterruptedException();
3101 if (this == common) {
3102 awaitQuiescence(timeout, unit);
3103 return false;
3104 }
3105 long nanos = unit.toNanos(timeout);
3106 if (isTerminated())
3107 return true;
3108 long startTime = System.nanoTime();
3109 boolean terminated = false;
3110 synchronized (this) {
3111 for (long waitTime = nanos, millis = 0L;;) {
3112 if (terminated = isTerminated() ||
3113 waitTime <= 0L ||
3114 (millis = unit.toMillis(waitTime)) <= 0L)
3115 break;
3116 wait(millis);
3117 waitTime = nanos - (System.nanoTime() - startTime);
3118 }
3119 }
3120 return terminated;
3121 }
3122
3123 /**
3124 * If called by a ForkJoinTask operating in this pool, equivalent
3125 * in effect to {@link ForkJoinTask#helpQuiesce}. Otherwise,
3126 * waits and/or attempts to assist performing tasks until this
3127 * pool {@link #isQuiescent} or the indicated timeout elapses.
3128 *
3129 * @param timeout the maximum time to wait
3130 * @param unit the time unit of the timeout argument
3131 * @return {@code true} if quiescent; {@code false} if the
3132 * timeout elapsed.
3133 */
3134 public boolean awaitQuiescence(long timeout, TimeUnit unit) {
3135 long nanos = unit.toNanos(timeout);
3136 ForkJoinWorkerThread wt;
3137 Thread thread = Thread.currentThread();
3138 if ((thread instanceof ForkJoinWorkerThread) &&
3139 (wt = (ForkJoinWorkerThread)thread).pool == this) {
3140 helpQuiescePool(wt.workQueue);
3141 return true;
3142 }
3143 long startTime = System.nanoTime();
3144 WorkQueue[] ws;
3145 int r = 0, m;
3146 boolean found = true;
3147 while (!isQuiescent() && (ws = workQueues) != null &&
3148 (m = ws.length - 1) >= 0) {
3149 if (!found) {
3150 if ((System.nanoTime() - startTime) > nanos)
3151 return false;
3152 Thread.yield(); // cannot block
3153 }
3154 found = false;
3155 for (int j = (m + 1) << 2; j >= 0; --j) {
3156 ForkJoinTask<?> t; WorkQueue q; int b;
3157 if ((q = ws[r++ & m]) != null && (b = q.base) - q.top < 0) {
3158 found = true;
3159 if ((t = q.pollAt(b)) != null) {
3160 if (q.base - q.top < 0)
3161 signalWork(q);
3162 t.doExec();
3163 }
3164 break;
3165 }
3166 }
3167 }
3168 return true;
3169 }
3170
3171 /**
3172 * Waits and/or attempts to assist performing tasks indefinitely
3173 * until the {@link #commonPool()} {@link #isQuiescent}.
3174 */
3175 static void quiesceCommonPool() {
3176 common.awaitQuiescence(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
3177 }
3178
3179 /**
3180 * Interface for extending managed parallelism for tasks running
3181 * in {@link ForkJoinPool}s.
3182 *
3183 * <p>A {@code ManagedBlocker} provides two methods. Method
3184 * {@code isReleasable} must return {@code true} if blocking is
3185 * not necessary. Method {@code block} blocks the current thread
3186 * if necessary (perhaps internally invoking {@code isReleasable}
3187 * before actually blocking). These actions are performed by any
3188 * thread invoking {@link ForkJoinPool#managedBlock(ManagedBlocker)}.
3189 * The unusual methods in this API accommodate synchronizers that
3190 * may, but don't usually, block for long periods. Similarly, they
3191 * allow more efficient internal handling of cases in which
3192 * additional workers may be, but usually are not, needed to
3193 * ensure sufficient parallelism. Toward this end,
3194 * implementations of method {@code isReleasable} must be amenable
3195 * to repeated invocation.
3196 *
3197 * <p>For example, here is a ManagedBlocker based on a
3198 * ReentrantLock:
3199 * <pre> {@code
3200 * class ManagedLocker implements ManagedBlocker {
3201 * final ReentrantLock lock;
3202 * boolean hasLock = false;
3203 * ManagedLocker(ReentrantLock lock) { this.lock = lock; }
3204 * public boolean block() {
3205 * if (!hasLock)
3206 * lock.lock();
3207 * return true;
3208 * }
3209 * public boolean isReleasable() {
3210 * return hasLock || (hasLock = lock.tryLock());
3228 * }
3229 * public E getItem() { // call after pool.managedBlock completes
3230 * return item;
3231 * }
3232 * }}</pre>
3233 */
3234 public static interface ManagedBlocker {
3235 /**
3236 * Possibly blocks the current thread, for example waiting for
3237 * a lock or condition.
3238 *
3239 * @return {@code true} if no additional blocking is necessary
3240 * (i.e., if isReleasable would return true)
3241 * @throws InterruptedException if interrupted while waiting
3242 * (the method is not required to do so, but is allowed to)
3243 */
3244 boolean block() throws InterruptedException;
3245
3246 /**
3247 * Returns {@code true} if blocking is unnecessary.
3248 * @return {@code true} if blocking is unnecessary
3249 */
3250 boolean isReleasable();
3251 }
3252
3253 /**
3254 * Blocks in accord with the given blocker. If the current thread
3255 * is a {@link ForkJoinWorkerThread}, this method possibly
3256 * arranges for a spare thread to be activated if necessary to
3257 * ensure sufficient parallelism while the current thread is blocked.
3258 *
3259 * <p>If the caller is not a {@link ForkJoinTask}, this method is
3260 * behaviorally equivalent to
3261 * <pre> {@code
3262 * while (!blocker.isReleasable())
3263 * if (blocker.block())
3264 * return;
3265 * }</pre>
3266 *
3267 * If the caller is a {@code ForkJoinTask}, then the pool may
3268 * first be expanded to ensure parallelism, and later adjusted.
3313 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
3314 return new ForkJoinTask.AdaptedRunnable<T>(runnable, value);
3315 }
3316
3317 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
3318 return new ForkJoinTask.AdaptedCallable<T>(callable);
3319 }
3320
3321 // Unsafe mechanics
3322 private static final sun.misc.Unsafe U;
3323 private static final long CTL;
3324 private static final long PARKBLOCKER;
3325 private static final int ABASE;
3326 private static final int ASHIFT;
3327 private static final long STEALCOUNT;
3328 private static final long PLOCK;
3329 private static final long INDEXSEED;
3330 private static final long QLOCK;
3331
3332 static {
3333 // initialize field offsets for CAS etc
3334 try {
3335 U = sun.misc.Unsafe.getUnsafe();
3336 Class<?> k = ForkJoinPool.class;
3337 CTL = U.objectFieldOffset
3338 (k.getDeclaredField("ctl"));
3339 STEALCOUNT = U.objectFieldOffset
3340 (k.getDeclaredField("stealCount"));
3341 PLOCK = U.objectFieldOffset
3342 (k.getDeclaredField("plock"));
3343 INDEXSEED = U.objectFieldOffset
3344 (k.getDeclaredField("indexSeed"));
3345 Class<?> tk = Thread.class;
3346 PARKBLOCKER = U.objectFieldOffset
3347 (tk.getDeclaredField("parkBlocker"));
3348 Class<?> wk = WorkQueue.class;
3349 QLOCK = U.objectFieldOffset
3350 (wk.getDeclaredField("qlock"));
3351 Class<?> ak = ForkJoinTask[].class;
3352 ABASE = U.arrayBaseOffset(ak);
3353 int scale = U.arrayIndexScale(ak);
3354 if ((scale & (scale - 1)) != 0)
3355 throw new Error("data type scale not a power of two");
3356 ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
3357 } catch (Exception e) {
3358 throw new Error(e);
3359 }
3360
3361 defaultForkJoinWorkerThreadFactory =
3362 new DefaultForkJoinWorkerThreadFactory();
3363 modifyThreadPermission = new RuntimePermission("modifyThread");
3364
3365 common = java.security.AccessController.doPrivileged
3366 (new java.security.PrivilegedAction<ForkJoinPool>() {
3367 public ForkJoinPool run() { return makeCommonPool(); }});
3368 int par = common.config; // report 1 even if threads disabled
3369 commonParallelism = par > 0 ? par : 1;
3370 }
3371
3372 /**
3373 * Creates and returns the common pool, respecting user settings
3374 * specified via system properties.
3375 */
3376 private static ForkJoinPool makeCommonPool() {
3377 int parallelism = -1;
3378 ForkJoinWorkerThreadFactory factory
3379 = defaultForkJoinWorkerThreadFactory;
3380 UncaughtExceptionHandler handler = null;
3381 try { // ignore exceptions in accesing/parsing properties
3382 String pp = System.getProperty
3383 ("java.util.concurrent.ForkJoinPool.common.parallelism");
3384 String fp = System.getProperty
3385 ("java.util.concurrent.ForkJoinPool.common.threadFactory");
3386 String hp = System.getProperty
3387 ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
3388 if (pp != null)
3389 parallelism = Integer.parseInt(pp);
3390 if (fp != null)
3391 factory = ((ForkJoinWorkerThreadFactory)ClassLoader.
3392 getSystemClassLoader().loadClass(fp).newInstance());
3393 if (hp != null)
3394 handler = ((UncaughtExceptionHandler)ClassLoader.
3395 getSystemClassLoader().loadClass(hp).newInstance());
3396 } catch (Exception ignore) {
3397 }
3398
3399 if (parallelism < 0)
3400 parallelism = Runtime.getRuntime().availableProcessors();
3401 if (parallelism > MAX_CAP)
3402 parallelism = MAX_CAP;
3403 return new ForkJoinPool(parallelism, factory, handler, false,
3404 "ForkJoinPool.commonPool-worker-");
3405 }
3406
3407 }
|