src/share/classes/java/util/concurrent/ForkJoinPool.java
Print this page
*** 33,42 ****
--- 33,43 ----
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package java.util.concurrent;
+ import java.lang.Thread.UncaughtExceptionHandler;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
*** 102,143 ****
* use the within-computation forms listed in the table unless using
* async event-style tasks that are not usually joined, in which case
* there is little difference among choice of methods.
*
* <table BORDER CELLPADDING=3 CELLSPACING=1>
* <tr>
* <td></td>
* <td ALIGN=CENTER> <b>Call from non-fork/join clients</b></td>
* <td ALIGN=CENTER> <b>Call from within fork/join computations</b></td>
* </tr>
* <tr>
! * <td> <b>Arrange async execution</td>
* <td> {@link #execute(ForkJoinTask)}</td>
* <td> {@link ForkJoinTask#fork}</td>
* </tr>
* <tr>
! * <td> <b>Await and obtain result</td>
* <td> {@link #invoke(ForkJoinTask)}</td>
* <td> {@link ForkJoinTask#invoke}</td>
* </tr>
* <tr>
! * <td> <b>Arrange exec and obtain Future</td>
* <td> {@link #submit(ForkJoinTask)}</td>
* <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td>
* </tr>
* </table>
*
* <p>The common pool is by default constructed with default
! * parameters, but these may be controlled by setting three {@link
! * System#getProperty system properties} with prefix {@code
! * java.util.concurrent.ForkJoinPool.common}: {@code parallelism} --
! * an integer greater than zero, {@code threadFactory} -- the class
! * name of a {@link ForkJoinWorkerThreadFactory}, and {@code
! * exceptionHandler} -- the class name of a {@link
! * java.lang.Thread.UncaughtExceptionHandler
! * Thread.UncaughtExceptionHandler}. Upon any error in establishing
! * these settings, default parameters are used.
*
* <p><b>Implementation notes</b>: This implementation restricts the
* maximum number of running threads to 32767. Attempts to create
* pools with greater than the maximum number result in
* {@code IllegalArgumentException}.
--- 103,148 ----
* use the within-computation forms listed in the table unless using
* async event-style tasks that are not usually joined, in which case
* there is little difference among choice of methods.
*
* <table BORDER CELLPADDING=3 CELLSPACING=1>
+ * <caption>Summary of task execution methods</caption>
* <tr>
* <td></td>
* <td ALIGN=CENTER> <b>Call from non-fork/join clients</b></td>
* <td ALIGN=CENTER> <b>Call from within fork/join computations</b></td>
* </tr>
* <tr>
! * <td> <b>Arrange async execution</b></td>
* <td> {@link #execute(ForkJoinTask)}</td>
* <td> {@link ForkJoinTask#fork}</td>
* </tr>
* <tr>
! * <td> <b>Await and obtain result</b></td>
* <td> {@link #invoke(ForkJoinTask)}</td>
* <td> {@link ForkJoinTask#invoke}</td>
* </tr>
* <tr>
! * <td> <b>Arrange exec and obtain Future</b></td>
* <td> {@link #submit(ForkJoinTask)}</td>
* <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td>
* </tr>
* </table>
*
* <p>The common pool is by default constructed with default
! * parameters, but these may be controlled by setting three
! * {@linkplain System#getProperty system properties} with prefix
! * {@code "java.util.concurrent.ForkJoinPool.common."}:
! * {@code parallelism} -- a non-negative integer,
! * {@code threadFactory} -- the class name of a
! * {@link ForkJoinWorkerThreadFactory}, and
! * {@code exceptionHandler} --
! * the class name of a {@link UncaughtExceptionHandler}.
! * Upon any error in establishing these settings, default parameters
! * are used. It is possible to disable or limit the use of threads in
! * the common pool by setting the parallelism property to zero, and/or
! * using a factory that may return {@code null}.
*
* <p><b>Implementation notes</b>: This implementation restricts the
* maximum number of running threads to 32767. Attempts to create
* pools with greater than the maximum number result in
* {@code IllegalArgumentException}.
*** 223,244 ****
* WorkQueues are also used in a similar way for tasks submitted
* to the pool. We cannot mix these tasks in the same queues used
* for work-stealing (this would contaminate lifo/fifo
* processing). Instead, we randomly associate submission queues
* with submitting threads, using a form of hashing. The
! * ThreadLocal Submitter class contains a value initially used as
! * a hash code for choosing existing queues, but may be randomly
! * repositioned upon contention with other submitters. In
! * essence, submitters act like workers except that they are
! * restricted to executing local tasks that they submitted (or in
! * the case of CountedCompleters, others with the same root task).
! * However, because most shared/external queue operations are more
! * expensive than internal, and because, at steady state, external
! * submitters will compete for CPU with workers, ForkJoinTask.join
! * and related methods disable them from repeatedly helping to
! * process tasks if all workers are active. Insertion of tasks in
! * shared mode requires a lock (mainly to protect in the case of
* resizing) but we use only a simple spinlock (using bits in
* field qlock), because submitters encountering a busy queue move
* on to try or create other queues -- they block only when
* creating and registering new queues.
*
--- 228,249 ----
* WorkQueues are also used in a similar way for tasks submitted
* to the pool. We cannot mix these tasks in the same queues used
* for work-stealing (this would contaminate lifo/fifo
* processing). Instead, we randomly associate submission queues
* with submitting threads, using a form of hashing. The
! * ThreadLocalRandom probe value serves as a hash code for
! * choosing existing queues, and may be randomly repositioned upon
! * contention with other submitters. In essence, submitters act
! * like workers except that they are restricted to executing local
! * tasks that they submitted (or in the case of CountedCompleters,
! * others with the same root task). However, because most
! * shared/external queue operations are more expensive than
! * internal, and because, at steady state, external submitters
! * will compete for CPU with workers, ForkJoinTask.join and
! * related methods disable them from repeatedly helping to process
! * tasks if all workers are active. Insertion of tasks in shared
! * mode requires a lock (mainly to protect in the case of
* resizing) but we use only a simple spinlock (using bits in
* field qlock), because submitters encountering a busy queue move
* on to try or create other queues -- they block only when
* creating and registering new queues.
*
*** 467,477 ****
* methods tryCompensate and awaitJoin.
*
* Common Pool
* ===========
*
! * The static commonPool always exists after static
* initialization. Since it (or any other created pool) need
* never be used, we minimize initial construction overhead and
* footprint to the setup of about a dozen fields, with no nested
* allocation. Most bootstrapping occurs within method
* fullExternalPush during the first submission to the pool.
--- 472,482 ----
* methods tryCompensate and awaitJoin.
*
* Common Pool
* ===========
*
! * The static common Pool always exists after static
* initialization. Since it (or any other created pool) need
* never be used, we minimize initial construction overhead and
* footprint to the setup of about a dozen fields, with no nested
* allocation. Most bootstrapping occurs within method
* fullExternalPush during the first submission to the pool.
*** 546,555 ****
--- 551,561 ----
/**
* Returns a new worker thread operating in the given pool.
*
* @param pool the pool this thread works in
* @throws NullPointerException if the pool is null
+ * @return the new worker thread
*/
public ForkJoinWorkerThread newThread(ForkJoinPool pool);
}
/**
*** 562,591 ****
return new ForkJoinWorkerThread(pool);
}
}
/**
- * Per-thread records for threads that submit to pools. Currently
- * holds only pseudo-random seed / index that is used to choose
- * submission queues in method externalPush. In the future, this may
- * also incorporate a means to implement different task rejection
- * and resubmission policies.
- *
- * Seeds for submitters and workers/workQueues work in basically
- * the same way but are initialized and updated using slightly
- * different mechanics. Both are initialized using the same
- * approach as in class ThreadLocal, where successive values are
- * unlikely to collide with previous values. Seeds are then
- * randomly modified upon collisions using xorshifts, which
- * requires a non-zero seed.
- */
- static final class Submitter {
- int seed;
- Submitter(int s) { seed = s; }
- }
-
- /**
* Class for artificial tasks that are used to replace the target
* of local joins if they are removed from an interior queue slot
* in WorkQueue.tryRemoveAndExec. We don't need the proxy to
* actually do anything beyond having a unique identity.
*/
--- 568,577 ----
*** 735,745 ****
/**
* Pushes a task. Call only by owner in unshared queues. (The
* shared-queue version is embedded in method externalPush.)
*
* @param task the task. Caller must ensure non-null.
! * @throw RejectedExecutionException if array cannot be resized
*/
final void push(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a; ForkJoinPool p;
int s = top, m, n;
if ((a = array) != null) { // ignore if queue removed
--- 721,731 ----
/**
* Pushes a task. Call only by owner in unshared queues. (The
* shared-queue version is embedded in method externalPush.)
*
* @param task the task. Caller must ensure non-null.
! * @throws RejectedExecutionException if array cannot be resized
*/
final void push(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a; ForkJoinPool p;
int s = top, m, n;
if ((a = array) != null) { // ignore if queue removed
*** 934,944 ****
/**
* If present, removes from queue and executes the given task,
* or any other cancelled task. Returns (true) on any CAS
* or consistency check failure so caller can retry.
*
! * @return false if no progress can be made, else true;
*/
final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
boolean stat = true, removed = false, empty = true;
ForkJoinTask<?>[] a; int m, s, b, n;
if ((a = array) != null && (m = a.length - 1) >= 0 &&
--- 920,930 ----
/**
* If present, removes from queue and executes the given task,
* or any other cancelled task. Returns (true) on any CAS
* or consistency check failure so caller can retry.
*
! * @return false if no progress can be made, else true
*/
final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
boolean stat = true, removed = false, empty = true;
ForkJoinTask<?>[] a; int m, s, b, n;
if ((a = array) != null && (m = a.length - 1) >= 0 &&
*** 979,989 ****
return stat;
}
/**
* Polls for and executes the given task or any other task in
! * its CountedCompleter computation
*/
final boolean pollAndExecCC(ForkJoinTask<?> root) {
ForkJoinTask<?>[] a; int b; Object o;
outer: while ((b = base) - top < 0 && (a = array) != null) {
long j = (((a.length - 1) & b) << ASHIFT) + ABASE;
--- 965,975 ----
return stat;
}
/**
* Polls for and executes the given task or any other task in
! * its CountedCompleter computation.
*/
final boolean pollAndExecCC(ForkJoinTask<?> root) {
ForkJoinTask<?>[] a; int b; Object o;
outer: while ((b = base) - top < 0 && (a = array) != null) {
long j = (((a.length - 1) & b) << ASHIFT) + ABASE;
*** 1053,1077 ****
private static final sun.misc.Unsafe U;
private static final long QLOCK;
private static final int ABASE;
private static final int ASHIFT;
static {
- int s;
try {
U = sun.misc.Unsafe.getUnsafe();
Class<?> k = WorkQueue.class;
Class<?> ak = ForkJoinTask[].class;
QLOCK = U.objectFieldOffset
(k.getDeclaredField("qlock"));
ABASE = U.arrayBaseOffset(ak);
! s = U.arrayIndexScale(ak);
} catch (Exception e) {
throw new Error(e);
}
- if ((s & (s-1)) != 0)
- throw new Error("data type scale not a power of two");
- ASHIFT = 31 - Integer.numberOfLeadingZeros(s);
}
}
// static fields (initialized in static initializer below)
--- 1039,1062 ----
private static final sun.misc.Unsafe U;
private static final long QLOCK;
private static final int ABASE;
private static final int ASHIFT;
static {
try {
U = sun.misc.Unsafe.getUnsafe();
Class<?> k = WorkQueue.class;
Class<?> ak = ForkJoinTask[].class;
QLOCK = U.objectFieldOffset
(k.getDeclaredField("qlock"));
ABASE = U.arrayBaseOffset(ak);
! int scale = U.arrayIndexScale(ak);
! if ((scale & (scale - 1)) != 0)
! throw new Error("data type scale not a power of two");
! ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
} catch (Exception e) {
throw new Error(e);
}
}
}
// static fields (initialized in static initializer below)
*** 1081,1099 ****
*/
public static final ForkJoinWorkerThreadFactory
defaultForkJoinWorkerThreadFactory;
/**
- * Per-thread submission bookkeeping. Shared across all pools
- * to reduce ThreadLocal pollution and because random motion
- * to avoid contention in one pool is likely to hold for others.
- * Lazily initialized on first submission (but null-checked
- * in other contexts to avoid unnecessary initialization).
- */
- static final ThreadLocal<Submitter> submitters;
-
- /**
* Permission required for callers of methods that may start or
* kill threads.
*/
private static final RuntimePermission modifyThreadPermission;
--- 1066,1075 ----
*** 1101,1125 ****
* Common (static) pool. Non-null for public use unless a static
* construction exception, but internal usages null-check on use
* to paranoically avoid potential initialization circularities
* as well as to simplify generated code.
*/
! static final ForkJoinPool commonPool;
/**
! * Common pool parallelism. Must equal commonPool.parallelism.
*/
! static final int commonPoolParallelism;
/**
* Sequence number for creating workerNamePrefix.
*/
private static int poolNumberSequence;
/**
! * Return the next sequence number. We don't expect this to
! * ever contend so use simple builtin sync.
*/
private static final synchronized int nextPoolId() {
return ++poolNumberSequence;
}
--- 1077,1104 ----
* Common (static) pool. Non-null for public use unless a static
* construction exception, but internal usages null-check on use
* to paranoically avoid potential initialization circularities
* as well as to simplify generated code.
*/
! static final ForkJoinPool common;
/**
! * Common pool parallelism. To allow simpler use and management
! * when common pool threads are disabled, we allow the underlying
! * common.config field to be zero, but in that case still report
! * parallelism as 1 to reflect resulting caller-runs mechanics.
*/
! static final int commonParallelism;
/**
* Sequence number for creating workerNamePrefix.
*/
private static int poolNumberSequence;
/**
! * Returns the next sequence number. We don't expect this to
! * ever contend, so use simple builtin sync.
*/
private static final synchronized int nextPoolId() {
return ++poolNumberSequence;
}
*** 1266,1308 ****
volatile int plock; // shutdown status and seqLock
volatile int indexSeed; // worker/submitter index seed
final int config; // mode and parallelism level
WorkQueue[] workQueues; // main registry
final ForkJoinWorkerThreadFactory factory;
! final Thread.UncaughtExceptionHandler ueh; // per-worker UEH
final String workerNamePrefix; // to create worker name string
volatile Object pad10, pad11, pad12, pad13, pad14, pad15, pad16, pad17;
volatile Object pad18, pad19, pad1a, pad1b;
! /*
* Acquires the plock lock to protect worker array and related
* updates. This method is called only if an initial CAS on plock
! * fails. This acts as a spinLock for normal cases, but falls back
* to builtin monitor to block when (rarely) needed. This would be
* a terrible idea for a highly contended lock, but works fine as
* a more conservative alternative to a pure spinlock.
*/
private int acquirePlock() {
! int spins = PL_SPINS, r = 0, ps, nps;
for (;;) {
if (((ps = plock) & PL_LOCK) == 0 &&
U.compareAndSwapInt(this, PLOCK, ps, nps = ps + PL_LOCK))
return nps;
- else if (r == 0) { // randomize spins if possible
- Thread t = Thread.currentThread(); WorkQueue w; Submitter z;
- if ((t instanceof ForkJoinWorkerThread) &&
- (w = ((ForkJoinWorkerThread)t).workQueue) != null)
- r = w.seed;
- else if ((z = submitters.get()) != null)
- r = z.seed;
- else
- r = 1;
- }
else if (spins >= 0) {
! r ^= r << 1; r ^= r >>> 3; r ^= r << 10; // xorshift
! if (r >= 0)
--spins;
}
else if (U.compareAndSwapInt(this, PLOCK, ps, ps | PL_SIGNAL)) {
synchronized (this) {
if ((plock & PL_SIGNAL) != 0) {
--- 1245,1276 ----
volatile int plock; // shutdown status and seqLock
volatile int indexSeed; // worker/submitter index seed
final int config; // mode and parallelism level
WorkQueue[] workQueues; // main registry
final ForkJoinWorkerThreadFactory factory;
! final UncaughtExceptionHandler ueh; // per-worker UEH
final String workerNamePrefix; // to create worker name string
volatile Object pad10, pad11, pad12, pad13, pad14, pad15, pad16, pad17;
volatile Object pad18, pad19, pad1a, pad1b;
! /**
* Acquires the plock lock to protect worker array and related
* updates. This method is called only if an initial CAS on plock
! * fails. This acts as a spinlock for normal cases, but falls back
* to builtin monitor to block when (rarely) needed. This would be
* a terrible idea for a highly contended lock, but works fine as
* a more conservative alternative to a pure spinlock.
*/
private int acquirePlock() {
! int spins = PL_SPINS, ps, nps;
for (;;) {
if (((ps = plock) & PL_LOCK) == 0 &&
U.compareAndSwapInt(this, PLOCK, ps, nps = ps + PL_LOCK))
return nps;
else if (spins >= 0) {
! if (ThreadLocalRandom.nextSecondarySeed() >= 0)
--spins;
}
else if (U.compareAndSwapInt(this, PLOCK, ps, ps | PL_SIGNAL)) {
synchronized (this) {
if ((plock & PL_SIGNAL) != 0) {
*** 1330,1372 ****
plock = ps;
synchronized (this) { notifyAll(); }
}
/**
- * Performs secondary initialization, called when plock is zero.
- * Creates workQueue array and sets plock to a valid value. The
- * lock body must be exception-free (so no try/finally) so we
- * optimistically allocate new array outside the lock and throw
- * away if (very rarely) not needed. (A similar tactic is used in
- * fullExternalPush.) Because the plock seq value can eventually
- * wrap around zero, this method harmlessly fails to reinitialize
- * if workQueues exists, while still advancing plock.
- *
- * Additionally tries to create the first worker.
- */
- private void initWorkers() {
- WorkQueue[] ws, nws; int ps;
- int p = config & SMASK; // find power of two table size
- int n = (p > 1) ? p - 1 : 1; // ensure at least 2 slots
- n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16;
- n = (n + 1) << 1;
- if ((ws = workQueues) == null || ws.length == 0)
- nws = new WorkQueue[n];
- else
- nws = null;
- if (((ps = plock) & PL_LOCK) != 0 ||
- !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
- ps = acquirePlock();
- if (((ws = workQueues) == null || ws.length == 0) && nws != null)
- workQueues = nws;
- int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
- if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
- releasePlock(nps);
- tryAddWorker();
- }
-
- /**
* Tries to create and start one worker if fewer than target
* parallelism level exist. Adjusts counts etc on failure.
*/
private void tryAddWorker() {
long c; int u;
--- 1298,1307 ----
*** 1404,1414 ****
*
* @param wt the worker thread
* @return the worker's queue
*/
final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
! Thread.UncaughtExceptionHandler handler; WorkQueue[] ws; int s, ps;
wt.setDaemon(true);
if ((handler = ueh) != null)
wt.setUncaughtExceptionHandler(handler);
do {} while (!U.compareAndSwapInt(this, INDEXSEED, s = indexSeed,
s += SEED_INCREMENT) ||
--- 1339,1349 ----
*
* @param wt the worker thread
* @return the worker's queue
*/
final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
! UncaughtExceptionHandler handler; WorkQueue[] ws; int s, ps;
wt.setDaemon(true);
if ((handler = ueh) != null)
wt.setUncaughtExceptionHandler(handler);
do {} while (!U.compareAndSwapInt(this, INDEXSEED, s = indexSeed,
s += SEED_INCREMENT) ||
*** 1448,1458 ****
* Final callback from terminating worker, as well as upon failure
* to construct or start a worker. Removes record of worker from
* array, and adjusts counts. If pool is shutting down, tries to
* complete termination.
*
! * @param wt the worker thread or null if construction failed
* @param ex the exception causing failure, or null if none
*/
final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
WorkQueue w = null;
if (wt != null && (w = wt.workQueue) != null) {
--- 1383,1393 ----
* Final callback from terminating worker, as well as upon failure
* to construct or start a worker. Removes record of worker from
* array, and adjusts counts. If pool is shutting down, tries to
* complete termination.
*
! * @param wt the worker thread, or null if construction failed
* @param ex the exception causing failure, or null if none
*/
final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
WorkQueue w = null;
if (wt != null && (w = wt.workQueue) != null) {
*** 1487,1497 ****
WorkQueue[] ws; WorkQueue v; Thread p; int u, i, e;
while ((u = (int)((c = ctl) >>> 32)) < 0 && (e = (int)c) >= 0) {
if (e > 0) { // activate or create replacement
if ((ws = workQueues) == null ||
(i = e & SMASK) >= ws.length ||
! (v = ws[i]) != null)
break;
long nc = (((long)(v.nextWait & E_MASK)) |
((long)(u + UAC_UNIT) << 32));
if (v.eventCount != (e | INT_SIGN))
break;
--- 1422,1432 ----
WorkQueue[] ws; WorkQueue v; Thread p; int u, i, e;
while ((u = (int)((c = ctl) >>> 32)) < 0 && (e = (int)c) >= 0) {
if (e > 0) { // activate or create replacement
if ((ws = workQueues) == null ||
(i = e & SMASK) >= ws.length ||
! (v = ws[i]) == null)
break;
long nc = (((long)(v.nextWait & E_MASK)) |
((long)(u + UAC_UNIT) << 32));
if (v.eventCount != (e | INT_SIGN))
break;
*** 1524,1537 ****
* method. All others are relayed to fullExternalPush.
*
* @param task the task. Caller must ensure non-null.
*/
final void externalPush(ForkJoinTask<?> task) {
! WorkQueue[] ws; WorkQueue q; Submitter z; int m; ForkJoinTask<?>[] a;
! if ((z = submitters.get()) != null && plock > 0 &&
(ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
! (q = ws[m & z.seed & SQMASK]) != null &&
U.compareAndSwapInt(q, QLOCK, 0, 1)) { // lock
int b = q.base, s = q.top, n, an;
if ((a = q.array) != null && (an = a.length) > (n = s + 1 - b)) {
int j = (((an - 1) & s) << ASHIFT) + ABASE;
U.putOrderedObject(a, j, task);
--- 1459,1472 ----
* method. All others are relayed to fullExternalPush.
*
* @param task the task. Caller must ensure non-null.
*/
final void externalPush(ForkJoinTask<?> task) {
! WorkQueue[] ws; WorkQueue q; int z, m; ForkJoinTask<?>[] a;
! if ((z = ThreadLocalRandom.getProbe()) != 0 && plock > 0 &&
(ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
! (q = ws[m & z & SQMASK]) != null &&
U.compareAndSwapInt(q, QLOCK, 0, 1)) { // lock
int b = q.base, s = q.top, n, an;
if ((a = q.array) != null && (an = a.length) > (n = s + 1 - b)) {
int j = (((an - 1) & s) << ASHIFT) + ABASE;
U.putOrderedObject(a, j, task);
*** 1547,1584 ****
}
/**
* Full version of externalPush. This method is called, among
* other times, upon the first submission of the first task to the
! * pool, so must perform secondary initialization (via
! * initWorkers). It also detects first submission by an external
! * thread by looking up its ThreadLocal, and creates a new shared
! * queue if the one at index if empty or contended. The plock lock
! * body must be exception-free (so no try/finally) so we
! * optimistically allocate new queues outside the lock and throw
! * them away if (very rarely) not needed.
*/
private void fullExternalPush(ForkJoinTask<?> task) {
! int r = 0; // random index seed
! for (Submitter z = submitters.get();;) {
! WorkQueue[] ws; WorkQueue q; int ps, m, k;
! if (z == null) {
! if (U.compareAndSwapInt(this, INDEXSEED, r = indexSeed,
! r += SEED_INCREMENT) && r != 0)
! submitters.set(z = new Submitter(r));
}
! else if (r == 0) { // move to a different index
! r = z.seed;
! r ^= r << 13; // same xorshift as WorkQueues
! r ^= r >>> 17;
! z.seed = r ^ (r << 5);
! }
! else if ((ps = plock) < 0)
throw new RejectedExecutionException();
else if (ps == 0 || (ws = workQueues) == null ||
! (m = ws.length - 1) < 0)
! initWorkers();
else if ((q = ws[k = r & m & SQMASK]) != null) {
if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
ForkJoinTask<?>[] a = q.array;
int s = q.top;
boolean submitted = false;
--- 1482,1533 ----
}
/**
* Full version of externalPush. This method is called, among
* other times, upon the first submission of the first task to the
! * pool, so must perform secondary initialization. It also
! * detects first submission by an external thread by looking up
! * its ThreadLocal, and creates a new shared queue if the one at
! * index if empty or contended. The plock lock body must be
! * exception-free (so no try/finally) so we optimistically
! * allocate new queues outside the lock and throw them away if
! * (very rarely) not needed.
! *
! * Secondary initialization occurs when plock is zero, to create
! * workQueue array and set plock to a valid value. This lock body
! * must also be exception-free. Because the plock seq value can
! * eventually wrap around zero, this method harmlessly fails to
! * reinitialize if workQueues exists, while still advancing plock.
*/
private void fullExternalPush(ForkJoinTask<?> task) {
! int r;
! if ((r = ThreadLocalRandom.getProbe()) == 0) {
! ThreadLocalRandom.localInit();
! r = ThreadLocalRandom.getProbe();
}
! for (;;) {
! WorkQueue[] ws; WorkQueue q; int ps, m, k;
! boolean move = false;
! if ((ps = plock) < 0)
throw new RejectedExecutionException();
else if (ps == 0 || (ws = workQueues) == null ||
! (m = ws.length - 1) < 0) { // initialize workQueues
! int p = config & SMASK; // find power of two table size
! int n = (p > 1) ? p - 1 : 1; // ensure at least 2 slots
! n |= n >>> 1; n |= n >>> 2; n |= n >>> 4;
! n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
! WorkQueue[] nws = ((ws = workQueues) == null || ws.length == 0 ?
! new WorkQueue[n] : null);
! if (((ps = plock) & PL_LOCK) != 0 ||
! !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
! ps = acquirePlock();
! if (((ws = workQueues) == null || ws.length == 0) && nws != null)
! workQueues = nws;
! int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
! if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
! releasePlock(nps);
! }
else if ((q = ws[k = r & m & SQMASK]) != null) {
if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
ForkJoinTask<?>[] a = q.array;
int s = q.top;
boolean submitted = false;
*** 1596,1606 ****
if (submitted) {
signalWork(q);
return;
}
}
! r = 0; // move on failure
}
else if (((ps = plock) & PL_LOCK) == 0) { // create new queue
q = new WorkQueue(this, null, SHARED_QUEUE, r);
if (((ps = plock) & PL_LOCK) != 0 ||
!U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
--- 1545,1555 ----
if (submitted) {
signalWork(q);
return;
}
}
! move = true; // move on failure
}
else if (((ps = plock) & PL_LOCK) == 0) { // create new queue
q = new WorkQueue(this, null, SHARED_QUEUE, r);
if (((ps = plock) & PL_LOCK) != 0 ||
!U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
*** 1610,1620 ****
int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
releasePlock(nps);
}
else
! r = 0; // try elsewhere while lock held
}
}
// Maintaining ctl counts
--- 1559,1571 ----
int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
releasePlock(nps);
}
else
! move = true; // move if busy
! if (move)
! r = ThreadLocalRandom.advanceProbe(r);
}
}
// Maintaining ctl counts
*** 1701,1711 ****
*
* * If already enqueued and none of the above apply, possibly
* park awaiting signal, else lingering to help scan and signal.
*
* * If a non-empty queue discovered or left as a hint,
! * help wake up other workers before return
*
* @param w the worker (via its WorkQueue)
* @return a task or null if none found
*/
private final ForkJoinTask<?> scan(WorkQueue w) {
--- 1652,1662 ----
*
* * If already enqueued and none of the above apply, possibly
* park awaiting signal, else lingering to help scan and signal.
*
* * If a non-empty queue discovered or left as a hint,
! * help wake up other workers before return.
*
* @param w the worker (via its WorkQueue)
* @return a task or null if none found
*/
private final ForkJoinTask<?> scan(WorkQueue w) {
*** 1756,1782 ****
if (ctl != c || !U.compareAndSwapLong(this, CTL, c, nc))
w.eventCount = ec; // unmark on CAS failure
else if ((int)(c >> AC_SHIFT) == 1 - (config & SMASK))
idleAwaitWork(w, nc, c);
}
! else if (w.eventCount < 0 && !tryTerminate(false, false) &&
! ctl == c) { // block
Thread wt = Thread.currentThread();
Thread.interrupted(); // clear status
U.putObject(wt, PARKBLOCKER, this);
w.parker = wt; // emulate LockSupport.park
if (w.eventCount < 0) // recheck
! U.park(false, 0L);
w.parker = null;
U.putObject(wt, PARKBLOCKER, null);
}
}
if ((h >= 0 || (h = w.hint) >= 0) &&
(ws = workQueues) != null && h < ws.length &&
(q = ws[h]) != null) { // signal others before retry
WorkQueue v; Thread p; int u, i, s;
! for (int n = (config & SMASK) >>> 1;;) {
int idleCount = (w.eventCount < 0) ? 0 : -1;
if (((s = idleCount - q.base + q.top) <= n &&
(n = s) <= 0) ||
(u = (int)((c = ctl) >>> 32)) >= 0 ||
(e = (int)c) <= 0 || m < (i = e & SMASK) ||
--- 1707,1732 ----
if (ctl != c || !U.compareAndSwapLong(this, CTL, c, nc))
w.eventCount = ec; // unmark on CAS failure
else if ((int)(c >> AC_SHIFT) == 1 - (config & SMASK))
idleAwaitWork(w, nc, c);
}
! else if (w.eventCount < 0 && ctl == c) {
Thread wt = Thread.currentThread();
Thread.interrupted(); // clear status
U.putObject(wt, PARKBLOCKER, this);
w.parker = wt; // emulate LockSupport.park
if (w.eventCount < 0) // recheck
! U.park(false, 0L); // block
w.parker = null;
U.putObject(wt, PARKBLOCKER, null);
}
}
if ((h >= 0 || (h = w.hint) >= 0) &&
(ws = workQueues) != null && h < ws.length &&
(q = ws[h]) != null) { // signal others before retry
WorkQueue v; Thread p; int u, i, s;
! for (int n = (config & SMASK) - 1;;) {
int idleCount = (w.eventCount < 0) ? 0 : -1;
if (((s = idleCount - q.base + q.top) <= n &&
(n = s) <= 0) ||
(u = (int)((c = ctl) >>> 32)) >= 0 ||
(e = (int)c) <= 0 || m < (i = e & SMASK) ||
*** 1812,1822 ****
* @param currentCtl the ctl value triggering possible quiescence
* @param prevCtl the ctl value to restore if thread is terminated
*/
private void idleAwaitWork(WorkQueue w, long currentCtl, long prevCtl) {
if (w != null && w.eventCount < 0 &&
! !tryTerminate(false, false) && (int)prevCtl != 0) {
int dc = -(short)(currentCtl >>> TC_SHIFT);
long parkTime = dc < 0 ? FAST_IDLE_TIMEOUT: (dc + 1) * IDLE_TIMEOUT;
long deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
Thread wt = Thread.currentThread();
while (ctl == currentCtl) {
--- 1762,1773 ----
* @param currentCtl the ctl value triggering possible quiescence
* @param prevCtl the ctl value to restore if thread is terminated
*/
private void idleAwaitWork(WorkQueue w, long currentCtl, long prevCtl) {
if (w != null && w.eventCount < 0 &&
! !tryTerminate(false, false) && (int)prevCtl != 0 &&
! ctl == currentCtl) {
int dc = -(short)(currentCtl >>> TC_SHIFT);
long parkTime = dc < 0 ? FAST_IDLE_TIMEOUT: (dc + 1) * IDLE_TIMEOUT;
long deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
Thread wt = Thread.currentThread();
while (ctl == currentCtl) {
*** 1830,1839 ****
--- 1781,1791 ----
if (ctl != currentCtl)
break;
if (deadline - System.nanoTime() <= 0L &&
U.compareAndSwapLong(this, CTL, currentCtl, prevCtl)) {
w.eventCount = (w.eventCount + E_SEQ) | E_MASK;
+ w.hint = -1;
w.qlock = -1; // shrink
break;
}
}
}
*** 1971,1981 ****
* and run tasks within the target's computation.
*
* @param task the task to join
* @param mode if shared, exit upon completing any task
* if all workers are active
- *
*/
private int helpComplete(ForkJoinTask<?> task, int mode) {
WorkQueue[] ws; WorkQueue q; int m, n, s, u;
if (task != null && (ws = workQueues) != null &&
(m = ws.length - 1) >= 0) {
--- 1923,1932 ----
*** 2123,2199 ****
}
}
/**
* Returns a (probably) non-empty steal queue, if one is found
! * during a random, then cyclic scan, else null. This method must
! * be retried by caller if, by the time it tries to use the queue,
! * it is empty.
* @param r a (random) seed for scanning
*/
private WorkQueue findNonEmptyStealQueue(int r) {
! for (WorkQueue[] ws;;) {
! int ps = plock, m, n;
! if ((ws = workQueues) == null || (m = ws.length - 1) < 1)
! return null;
! for (int j = (m + 1) << 2; ;) {
! WorkQueue q = ws[(((r + j) << 1) | 1) & m];
! if (q != null && (n = q.base - q.top) < 0) {
! if (n < -1)
! signalWork(q);
return q;
}
! else if (--j < 0) {
if (plock == ps)
return null;
- break;
}
}
- }
- }
/**
* Runs tasks until {@code isQuiescent()}. We piggyback on
* active count ctl maintenance, but rather than blocking
* when tasks cannot be found, we rescan until all others cannot
* find tasks either.
*/
final void helpQuiescePool(WorkQueue w) {
for (boolean active = true;;) {
! ForkJoinTask<?> localTask; // exhaust local queue
! while ((localTask = w.nextLocalTask()) != null)
! localTask.doExec();
! // Similar to loop in scan(), but ignoring submissions
! WorkQueue q = findNonEmptyStealQueue(w.nextSeed());
! if (q != null) {
! ForkJoinTask<?> t; int b;
if (!active) { // re-establish active count
- long c;
active = true;
do {} while (!U.compareAndSwapLong
(this, CTL, c = ctl, c + AC_UNIT));
}
! if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null)
w.runSubtask(t);
}
! else {
! long c;
! if (active) { // decrement active count without queuing
active = false;
- do {} while (!U.compareAndSwapLong
- (this, CTL, c = ctl, c -= AC_UNIT));
}
! else
! c = ctl; // re-increment on exit
! if ((int)(c >> AC_SHIFT) + (config & SMASK) == 0) {
! do {} while (!U.compareAndSwapLong
! (this, CTL, c = ctl, c + AC_UNIT));
! break;
}
}
- }
- }
/**
* Gets and removes a local or stolen task for the given worker.
*
* @return a task, if available
--- 2074,2140 ----
}
}
/**
* Returns a (probably) non-empty steal queue, if one is found
! * during a scan, else null. This method must be retried by
! * caller if, by the time it tries to use the queue, it is empty.
* @param r a (random) seed for scanning
*/
private WorkQueue findNonEmptyStealQueue(int r) {
! for (;;) {
! int ps = plock, m; WorkQueue[] ws; WorkQueue q;
! if ((ws = workQueues) != null && (m = ws.length - 1) >= 0) {
! for (int j = (m + 1) << 2; j >= 0; --j) {
! if ((q = ws[(((r + j) << 1) | 1) & m]) != null &&
! q.base - q.top < 0)
return q;
}
! }
if (plock == ps)
return null;
}
}
/**
* Runs tasks until {@code isQuiescent()}. We piggyback on
* active count ctl maintenance, but rather than blocking
* when tasks cannot be found, we rescan until all others cannot
* find tasks either.
*/
final void helpQuiescePool(WorkQueue w) {
for (boolean active = true;;) {
! long c; WorkQueue q; ForkJoinTask<?> t; int b;
! while ((t = w.nextLocalTask()) != null) {
! if (w.base - w.top < 0)
! signalWork(w);
! t.doExec();
! }
! if ((q = findNonEmptyStealQueue(w.nextSeed())) != null) {
if (!active) { // re-establish active count
active = true;
do {} while (!U.compareAndSwapLong
(this, CTL, c = ctl, c + AC_UNIT));
}
! if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) {
! if (q.base - q.top < 0)
! signalWork(q);
w.runSubtask(t);
}
! }
! else if (active) { // decrement active count without queuing
! long nc = (c = ctl) - AC_UNIT;
! if ((int)(nc >> AC_SHIFT) + (config & SMASK) == 0)
! return; // bypass decrement-then-increment
! if (U.compareAndSwapLong(this, CTL, c, nc))
active = false;
}
! else if ((int)((c = ctl) >> AC_SHIFT) + (config & SMASK) == 0 &&
! U.compareAndSwapLong(this, CTL, c, c + AC_UNIT))
! return;
}
}
/**
* Gets and removes a local or stolen task for the given worker.
*
* @return a task, if available
*** 2203,2216 ****
WorkQueue q; int b;
if ((t = w.nextLocalTask()) != null)
return t;
if ((q = findNonEmptyStealQueue(w.nextSeed())) == null)
return null;
! if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null)
return t;
}
}
/**
* Returns a cheap heuristic guide for task partitioning when
* programmers, frameworks, tools, or languages have little or no
* idea about task granularity. In essence by offering this
--- 2144,2160 ----
WorkQueue q; int b;
if ((t = w.nextLocalTask()) != null)
return t;
if ((q = findNonEmptyStealQueue(w.nextSeed())) == null)
return null;
! if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) {
! if (q.base - q.top < 0)
! signalWork(q);
return t;
}
}
+ }
/**
* Returns a cheap heuristic guide for task partitioning when
* programmers, frameworks, tools, or languages have little or no
* idea about task granularity. In essence by offering this
*** 2233,2243 ****
* bottom. In perfect steady state, each thread is at
* approximately the same level of computation tree. However,
* producing extra tasks amortizes the uncertainty of progress and
* diffusion assumptions.
*
! * So, users will want to use values larger, but not much larger
* than 1 to both smooth over transient shortages and hedge
* against uneven progress; as traded off against the cost of
* extra task overhead. We leave the user to pick a threshold
* value to compare with the results of this call to guide
* decisions, but recommend values such as 3.
--- 2177,2187 ----
* bottom. In perfect steady state, each thread is at
* approximately the same level of computation tree. However,
* producing extra tasks amortizes the uncertainty of progress and
* diffusion assumptions.
*
! * So, users will want to use values larger (but not much larger)
* than 1 to both smooth over transient shortages and hedge
* against uneven progress; as traded off against the cost of
* extra task overhead. We leave the user to pick a threshold
* value to compare with the results of this call to guide
* decisions, but recommend values such as 3.
*** 2286,2334 ****
* if no work and no active workers
* @param enable if true, enable shutdown when next possible
* @return true if now terminating or terminated
*/
private boolean tryTerminate(boolean now, boolean enable) {
! if (this == commonPool) // cannot shut down
return false;
for (long c;;) {
if (((c = ctl) & STOP_BIT) != 0) { // already terminating
if ((short)(c >>> TC_SHIFT) == -(config & SMASK)) {
synchronized (this) {
notifyAll(); // signal when 0 workers
}
}
return true;
}
- if (plock >= 0) { // not yet enabled
- int ps;
- if (!enable)
- return false;
- if (((ps = plock) & PL_LOCK) != 0 ||
- !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
- ps = acquirePlock();
- if (!U.compareAndSwapInt(this, PLOCK, ps, SHUTDOWN))
- releasePlock(SHUTDOWN);
- }
if (!now) { // check if idle & no tasks
! if ((int)(c >> AC_SHIFT) != -(config & SMASK) ||
! hasQueuedSubmissions())
return false;
! // Check for unqueued inactive workers. One pass suffices.
! WorkQueue[] ws = workQueues; WorkQueue w;
! if (ws != null) {
! for (int i = 1; i < ws.length; i += 2) {
! if ((w = ws[i]) != null && w.eventCount >= 0)
return false;
}
}
}
if (U.compareAndSwapLong(this, CTL, c, c | STOP_BIT)) {
for (int pass = 0; pass < 3; ++pass) {
! WorkQueue[] ws = workQueues;
! if (ws != null) {
! WorkQueue w; Thread wt;
int n = ws.length;
for (int i = 0; i < n; ++i) {
if ((w = ws[i]) != null) {
w.qlock = -1;
if (pass > 0) {
--- 2230,2282 ----
* if no work and no active workers
* @param enable if true, enable shutdown when next possible
* @return true if now terminating or terminated
*/
private boolean tryTerminate(boolean now, boolean enable) {
! int ps;
! if (this == common) // cannot shut down
return false;
+ if ((ps = plock) >= 0) { // enable by setting plock
+ if (!enable)
+ return false;
+ if ((ps & PL_LOCK) != 0 ||
+ !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
+ ps = acquirePlock();
+ int nps = ((ps + PL_LOCK) & ~SHUTDOWN) | SHUTDOWN;
+ if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
+ releasePlock(nps);
+ }
for (long c;;) {
if (((c = ctl) & STOP_BIT) != 0) { // already terminating
if ((short)(c >>> TC_SHIFT) == -(config & SMASK)) {
synchronized (this) {
notifyAll(); // signal when 0 workers
}
}
return true;
}
if (!now) { // check if idle & no tasks
! WorkQueue[] ws; WorkQueue w;
! if ((int)(c >> AC_SHIFT) != -(config & SMASK))
return false;
! if ((ws = workQueues) != null) {
! for (int i = 0; i < ws.length; ++i) {
! if ((w = ws[i]) != null) {
! if (!w.isEmpty()) { // signal unprocessed tasks
! signalWork(w);
return false;
}
+ if ((i & 1) != 0 && w.eventCount >= 0)
+ return false; // unqueued inactive worker
}
}
+ }
+ }
if (U.compareAndSwapLong(this, CTL, c, c | STOP_BIT)) {
for (int pass = 0; pass < 3; ++pass) {
! WorkQueue[] ws; WorkQueue w; Thread wt;
! if ((ws = workQueues) != null) {
int n = ws.length;
for (int i = 0; i < n; ++i) {
if ((w = ws[i]) != null) {
w.qlock = -1;
if (pass > 0) {
*** 2335,2345 ****
w.cancelAll();
if (pass > 1 && (wt = w.owner) != null) {
if (!wt.isInterrupted()) {
try {
wt.interrupt();
! } catch (SecurityException ignore) {
}
}
U.unpark(wt);
}
}
--- 2283,2293 ----
w.cancelAll();
if (pass > 1 && (wt = w.owner) != null) {
if (!wt.isInterrupted()) {
try {
wt.interrupt();
! } catch (Throwable ignore) {
}
}
U.unpark(wt);
}
}
*** 2346,2356 ****
}
}
// Wake up workers parked on event queue
int i, e; long cc; Thread p;
while ((e = (int)(cc = ctl) & E_MASK) != 0 &&
! (i = e & SMASK) < n &&
(w = ws[i]) != null) {
long nc = ((long)(w.nextWait & E_MASK) |
((cc + AC_UNIT) & AC_MASK) |
(cc & (TC_MASK|STOP_BIT)));
if (w.eventCount == (e | INT_SIGN) &&
--- 2294,2304 ----
}
}
// Wake up workers parked on event queue
int i, e; long cc; Thread p;
while ((e = (int)(cc = ctl) & E_MASK) != 0 &&
! (i = e & SMASK) < n && i >= 0 &&
(w = ws[i]) != null) {
long nc = ((long)(w.nextWait & E_MASK) |
((cc + AC_UNIT) & AC_MASK) |
(cc & (TC_MASK|STOP_BIT)));
if (w.eventCount == (e | INT_SIGN) &&
*** 2372,2401 ****
/**
* Returns common pool queue for a thread that has submitted at
* least one task.
*/
static WorkQueue commonSubmitterQueue() {
! ForkJoinPool p; WorkQueue[] ws; int m; Submitter z;
! return ((z = submitters.get()) != null &&
! (p = commonPool) != null &&
(ws = p.workQueues) != null &&
(m = ws.length - 1) >= 0) ?
! ws[m & z.seed & SQMASK] : null;
}
/**
* Tries to pop the given task from submitter's queue in common pool.
*/
static boolean tryExternalUnpush(ForkJoinTask<?> t) {
! ForkJoinPool p; WorkQueue[] ws; WorkQueue q; Submitter z;
! ForkJoinTask<?>[] a; int m, s;
if (t != null &&
! (z = submitters.get()) != null &&
! (p = commonPool) != null &&
(ws = p.workQueues) != null &&
(m = ws.length - 1) >= 0 &&
! (q = ws[m & z.seed & SQMASK]) != null &&
(s = q.top) != q.base &&
(a = q.array) != null) {
long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE;
if (U.getObject(a, j) == t &&
U.compareAndSwapInt(q, QLOCK, 0, 1)) {
--- 2320,2349 ----
/**
* Returns common pool queue for a thread that has submitted at
* least one task.
*/
static WorkQueue commonSubmitterQueue() {
! ForkJoinPool p; WorkQueue[] ws; int m, z;
! return ((z = ThreadLocalRandom.getProbe()) != 0 &&
! (p = common) != null &&
(ws = p.workQueues) != null &&
(m = ws.length - 1) >= 0) ?
! ws[m & z & SQMASK] : null;
}
/**
* Tries to pop the given task from submitter's queue in common pool.
*/
static boolean tryExternalUnpush(ForkJoinTask<?> t) {
! ForkJoinPool p; WorkQueue[] ws; WorkQueue q;
! ForkJoinTask<?>[] a; int m, s, z;
if (t != null &&
! (z = ThreadLocalRandom.getProbe()) != 0 &&
! (p = common) != null &&
(ws = p.workQueues) != null &&
(m = ws.length - 1) >= 0 &&
! (q = ws[m & z & SQMASK]) != null &&
(s = q.top) != q.base &&
(a = q.array) != null) {
long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE;
if (U.getObject(a, j) == t &&
U.compareAndSwapInt(q, QLOCK, 0, 1)) {
*** 2443,2453 ****
}
}
if (task != null)
task.doExec();
if (root.status < 0 ||
! (u = (int)(ctl >>> 32)) >= 0 || (u >> UAC_SHIFT) >= 0)
break;
if (task == null) {
helpSignal(root, q.poolIndex);
if (root.status >= 0)
helpComplete(root, SHARED_QUEUE);
--- 2391,2402 ----
}
}
if (task != null)
task.doExec();
if (root.status < 0 ||
! (config != 0 &&
! ((u = (int)(ctl >>> 32)) >= 0 || (u >> UAC_SHIFT) >= 0)))
break;
if (task == null) {
helpSignal(root, q.poolIndex);
if (root.status >= 0)
helpComplete(root, SHARED_QUEUE);
*** 2461,2478 ****
* Tries to help execute or signal availability of the given task
* from submitter's queue in common pool.
*/
static void externalHelpJoin(ForkJoinTask<?> t) {
// Some hard-to-avoid overlap with tryExternalUnpush
! ForkJoinPool p; WorkQueue[] ws; WorkQueue q, w; Submitter z;
! ForkJoinTask<?>[] a; int m, s, n;
if (t != null &&
! (z = submitters.get()) != null &&
! (p = commonPool) != null &&
(ws = p.workQueues) != null &&
(m = ws.length - 1) >= 0 &&
! (q = ws[m & z.seed & SQMASK]) != null &&
(a = q.array) != null) {
int am = a.length - 1;
if ((s = q.top) != q.base) {
long j = ((am & (s - 1)) << ASHIFT) + ABASE;
if (U.getObject(a, j) == t &&
--- 2410,2427 ----
* Tries to help execute or signal availability of the given task
* from submitter's queue in common pool.
*/
static void externalHelpJoin(ForkJoinTask<?> t) {
// Some hard-to-avoid overlap with tryExternalUnpush
! ForkJoinPool p; WorkQueue[] ws; WorkQueue q, w;
! ForkJoinTask<?>[] a; int m, s, n, z;
if (t != null &&
! (z = ThreadLocalRandom.getProbe()) != 0 &&
! (p = common) != null &&
(ws = p.workQueues) != null &&
(m = ws.length - 1) >= 0 &&
! (q = ws[m & z & SQMASK]) != null &&
(a = q.array) != null) {
int am = a.length - 1;
if ((s = q.top) != q.base) {
long j = ((am & (s - 1)) << ASHIFT) + ABASE;
if (U.getObject(a, j) == t &&
*** 2494,2515 ****
p.helpSignal(t, q.poolIndex);
}
}
}
- /**
- * Restricted version of helpQuiescePool for external callers
- */
- static void externalHelpQuiescePool() {
- ForkJoinPool p; ForkJoinTask<?> t; WorkQueue q; int b;
- if ((p = commonPool) != null &&
- (q = p.findNonEmptyStealQueue(1)) != null &&
- (b = q.base) - q.top < 0 &&
- (t = q.pollAt(b)) != null)
- t.doExec();
- }
-
// Exported methods
// Constructors
/**
--- 2443,2452 ----
*** 2522,2532 ****
* the caller is not permitted to modify threads
* because it does not hold {@link
* java.lang.RuntimePermission}{@code ("modifyThread")}
*/
public ForkJoinPool() {
! this(Runtime.getRuntime().availableProcessors(),
defaultForkJoinWorkerThreadFactory, null, false);
}
/**
* Creates a {@code ForkJoinPool} with the indicated parallelism
--- 2459,2469 ----
* the caller is not permitted to modify threads
* because it does not hold {@link
* java.lang.RuntimePermission}{@code ("modifyThread")}
*/
public ForkJoinPool() {
! this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
defaultForkJoinWorkerThreadFactory, null, false);
}
/**
* Creates a {@code ForkJoinPool} with the indicated parallelism
*** 2570,2623 ****
* because it does not hold {@link
* java.lang.RuntimePermission}{@code ("modifyThread")}
*/
public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
! Thread.UncaughtExceptionHandler handler,
boolean asyncMode) {
checkPermission();
! if (factory == null)
! throw new NullPointerException();
if (parallelism <= 0 || parallelism > MAX_CAP)
throw new IllegalArgumentException();
! this.factory = factory;
! this.ueh = handler;
! this.config = parallelism | (asyncMode ? (FIFO_QUEUE << 16) : 0);
! long np = (long)(-parallelism); // offset ctl counts
! this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
! int pn = nextPoolId();
! StringBuilder sb = new StringBuilder("ForkJoinPool-");
! sb.append(Integer.toString(pn));
! sb.append("-worker-");
! this.workerNamePrefix = sb.toString();
}
/**
! * Constructor for common pool, suitable only for static initialization.
! * Basically the same as above, but uses smallest possible initial footprint.
*/
! ForkJoinPool(int parallelism, long ctl,
ForkJoinWorkerThreadFactory factory,
! Thread.UncaughtExceptionHandler handler) {
! this.config = parallelism;
! this.ctl = ctl;
this.factory = factory;
this.ueh = handler;
! this.workerNamePrefix = "ForkJoinPool.commonPool-worker-";
}
/**
* Returns the common pool instance. This pool is statically
! * constructed; its run state is unaffected by attempts to
! * {@link #shutdown} or {@link #shutdownNow}.
*
* @return the common pool instance
* @since 1.8
*/
public static ForkJoinPool commonPool() {
! // assert commonPool != null : "static init error";
! return commonPool;
}
// Execution methods
/**
--- 2507,2573 ----
* because it does not hold {@link
* java.lang.RuntimePermission}{@code ("modifyThread")}
*/
public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
! UncaughtExceptionHandler handler,
boolean asyncMode) {
+ this(checkParallelism(parallelism),
+ checkFactory(factory),
+ handler,
+ asyncMode,
+ "ForkJoinPool-" + nextPoolId() + "-worker-");
checkPermission();
! }
!
! private static int checkParallelism(int parallelism) {
if (parallelism <= 0 || parallelism > MAX_CAP)
throw new IllegalArgumentException();
! return parallelism;
}
+ private static ForkJoinWorkerThreadFactory checkFactory
+ (ForkJoinWorkerThreadFactory factory) {
+ if (factory == null)
+ throw new NullPointerException();
+ return factory;
+ }
+
/**
! * Creates a {@code ForkJoinPool} with the given parameters, without
! * any security checks or parameter validation. Invoked directly by
! * makeCommonPool.
*/
! private ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
! UncaughtExceptionHandler handler,
! boolean asyncMode,
! String workerNamePrefix) {
! this.workerNamePrefix = workerNamePrefix;
this.factory = factory;
this.ueh = handler;
! this.config = parallelism | (asyncMode ? (FIFO_QUEUE << 16) : 0);
! long np = (long)(-parallelism); // offset ctl counts
! this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}
/**
* Returns the common pool instance. This pool is statically
! * constructed; its run state is unaffected by attempts to {@link
! * #shutdown} or {@link #shutdownNow}. However this pool and any
! * ongoing processing are automatically terminated upon program
! * {@link System#exit}. Any program that relies on asynchronous
! * task processing to complete before program termination should
! * invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence},
! * before exit.
*
* @return the common pool instance
* @since 1.8
*/
public static ForkJoinPool commonPool() {
! // assert common != null : "static init error";
! return common;
}
// Execution methods
/**
*** 2669,2679 ****
throw new NullPointerException();
ForkJoinTask<?> job;
if (task instanceof ForkJoinTask<?>) // avoid re-wrap
job = (ForkJoinTask<?>) task;
else
! job = new ForkJoinTask.AdaptedRunnableAction(task);
externalPush(job);
}
/**
* Submits a ForkJoinTask for execution.
--- 2619,2629 ----
throw new NullPointerException();
ForkJoinTask<?> job;
if (task instanceof ForkJoinTask<?>) // avoid re-wrap
job = (ForkJoinTask<?>) task;
else
! job = new ForkJoinTask.RunnableExecuteAction(task);
externalPush(job);
}
/**
* Submits a ForkJoinTask for execution.
*** 2736,2766 ****
*/
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
// In previous versions of this class, this method constructed
// a task to run ForkJoinTask.invokeAll, but now external
// invocation of multiple tasks is at least as efficient.
! List<ForkJoinTask<T>> fs = new ArrayList<ForkJoinTask<T>>(tasks.size());
! // Workaround needed because method wasn't declared with
! // wildcards in return type but should have been.
! @SuppressWarnings({"unchecked", "rawtypes"})
! List<Future<T>> futures = (List<Future<T>>) (List) fs;
boolean done = false;
try {
for (Callable<T> t : tasks) {
ForkJoinTask<T> f = new ForkJoinTask.AdaptedCallable<T>(t);
externalPush(f);
- fs.add(f);
}
! for (ForkJoinTask<T> f : fs)
! f.quietlyJoin();
done = true;
return futures;
} finally {
if (!done)
! for (ForkJoinTask<T> f : fs)
! f.cancel(false);
}
}
/**
* Returns the factory used for constructing new workers.
--- 2686,2712 ----
*/
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
// In previous versions of this class, this method constructed
// a task to run ForkJoinTask.invokeAll, but now external
// invocation of multiple tasks is at least as efficient.
! ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks) {
ForkJoinTask<T> f = new ForkJoinTask.AdaptedCallable<T>(t);
+ futures.add(f);
externalPush(f);
}
! for (int i = 0, size = futures.size(); i < size; i++)
! ((ForkJoinTask<?>)futures.get(i)).quietlyJoin();
done = true;
return futures;
} finally {
if (!done)
! for (int i = 0, size = futures.size(); i < size; i++)
! futures.get(i).cancel(false);
}
}
/**
* Returns the factory used for constructing new workers.
*** 2775,2805 ****
* Returns the handler for internal worker threads that terminate
* due to unrecoverable errors encountered while executing tasks.
*
* @return the handler, or {@code null} if none
*/
! public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
return ueh;
}
/**
* Returns the targeted parallelism level of this pool.
*
* @return the targeted parallelism level of this pool
*/
public int getParallelism() {
! return config & SMASK;
}
/**
* Returns the targeted parallelism level of the common pool.
*
* @return the targeted parallelism level of the common pool
* @since 1.8
*/
public static int getCommonPoolParallelism() {
! return commonPoolParallelism;
}
/**
* Returns the number of worker threads that have started but not
* yet terminated. The result returned by this method may differ
--- 2721,2752 ----
* Returns the handler for internal worker threads that terminate
* due to unrecoverable errors encountered while executing tasks.
*
* @return the handler, or {@code null} if none
*/
! public UncaughtExceptionHandler getUncaughtExceptionHandler() {
return ueh;
}
/**
* Returns the targeted parallelism level of this pool.
*
* @return the targeted parallelism level of this pool
*/
public int getParallelism() {
! int par = (config & SMASK);
! return (par > 0) ? par : 1;
}
/**
* Returns the targeted parallelism level of the common pool.
*
* @return the targeted parallelism level of the common pool
* @since 1.8
*/
public static int getCommonPoolParallelism() {
! return commonParallelism;
}
/**
* Returns the number of worker threads that have started but not
* yet terminated. The result returned by this method may differ
*** 3053,3063 ****
/**
* Possibly initiates an orderly shutdown in which previously
* submitted tasks are executed, but no new tasks will be
* accepted. Invocation has no effect on execution state if this
! * is the {@link #commonPool}, and no additional effect if
* already shut down. Tasks that are in the process of being
* submitted concurrently during the course of this method may or
* may not be rejected.
*
* @throws SecurityException if a security manager exists and
--- 3000,3010 ----
/**
* Possibly initiates an orderly shutdown in which previously
* submitted tasks are executed, but no new tasks will be
* accepted. Invocation has no effect on execution state if this
! * is the {@link #commonPool()}, and no additional effect if
* already shut down. Tasks that are in the process of being
* submitted concurrently during the course of this method may or
* may not be rejected.
*
* @throws SecurityException if a security manager exists and
*** 3071,3081 ****
}
/**
* Possibly attempts to cancel and/or stop all tasks, and reject
* all subsequently submitted tasks. Invocation has no effect on
! * execution state if this is the {@link #commonPool}, and no
* additional effect if already shut down. Otherwise, tasks that
* are in the process of being submitted or executed concurrently
* during the course of this method may or may not be
* rejected. This method cancels both existing and unexecuted
* tasks, in order to permit termination in the presence of task
--- 3018,3028 ----
}
/**
* Possibly attempts to cancel and/or stop all tasks, and reject
* all subsequently submitted tasks. Invocation has no effect on
! * execution state if this is the {@link #commonPool()}, and no
* additional effect if already shut down. Otherwise, tasks that
* are in the process of being submitted or executed concurrently
* during the course of this method may or may not be
* rejected. This method cancels both existing and unexecuted
* tasks, in order to permit termination in the presence of task
*** 3134,3155 ****
}
/**
* Blocks until all tasks have completed execution after a
* shutdown request, or the timeout occurs, or the current thread
! * is interrupted, whichever happens first. Note that the {@link
! * #commonPool()} never terminates until program shutdown so
! * this method will always time out.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return {@code true} if this executor terminated and
* {@code false} if the timeout elapsed before termination
* @throws InterruptedException if interrupted while waiting
*/
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
if (isTerminated())
return true;
long startTime = System.nanoTime();
boolean terminated = false;
--- 3081,3109 ----
}
/**
* Blocks until all tasks have completed execution after a
* shutdown request, or the timeout occurs, or the current thread
! * is interrupted, whichever happens first. Because the {@link
! * #commonPool()} never terminates until program shutdown, when
! * applied to the common pool, this method is equivalent to {@link
! * #awaitQuiescence(long, TimeUnit)} but always returns {@code false}.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return {@code true} if this executor terminated and
* {@code false} if the timeout elapsed before termination
* @throws InterruptedException if interrupted while waiting
*/
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
+ if (Thread.interrupted())
+ throw new InterruptedException();
+ if (this == common) {
+ awaitQuiescence(timeout, unit);
+ return false;
+ }
long nanos = unit.toNanos(timeout);
if (isTerminated())
return true;
long startTime = System.nanoTime();
boolean terminated = false;
*** 3165,3185 ****
}
return terminated;
}
/**
* Interface for extending managed parallelism for tasks running
* in {@link ForkJoinPool}s.
*
* <p>A {@code ManagedBlocker} provides two methods. Method
* {@code isReleasable} must return {@code true} if blocking is
* not necessary. Method {@code block} blocks the current thread
* if necessary (perhaps internally invoking {@code isReleasable}
* before actually blocking). These actions are performed by any
! * thread invoking {@link ForkJoinPool#managedBlock}. The
! * unusual methods in this API accommodate synchronizers that may,
! * but don't usually, block for long periods. Similarly, they
* allow more efficient internal handling of cases in which
* additional workers may be, but usually are not, needed to
* ensure sufficient parallelism. Toward this end,
* implementations of method {@code isReleasable} must be amenable
* to repeated invocation.
--- 3119,3195 ----
}
return terminated;
}
/**
+ * If called by a ForkJoinTask operating in this pool, equivalent
+ * in effect to {@link ForkJoinTask#helpQuiesce}. Otherwise,
+ * waits and/or attempts to assist performing tasks until this
+ * pool {@link #isQuiescent} or the indicated timeout elapses.
+ *
+ * @param timeout the maximum time to wait
+ * @param unit the time unit of the timeout argument
+ * @return {@code true} if quiescent; {@code false} if the
+ * timeout elapsed.
+ */
+ public boolean awaitQuiescence(long timeout, TimeUnit unit) {
+ long nanos = unit.toNanos(timeout);
+ ForkJoinWorkerThread wt;
+ Thread thread = Thread.currentThread();
+ if ((thread instanceof ForkJoinWorkerThread) &&
+ (wt = (ForkJoinWorkerThread)thread).pool == this) {
+ helpQuiescePool(wt.workQueue);
+ return true;
+ }
+ long startTime = System.nanoTime();
+ WorkQueue[] ws;
+ int r = 0, m;
+ boolean found = true;
+ while (!isQuiescent() && (ws = workQueues) != null &&
+ (m = ws.length - 1) >= 0) {
+ if (!found) {
+ if ((System.nanoTime() - startTime) > nanos)
+ return false;
+ Thread.yield(); // cannot block
+ }
+ found = false;
+ for (int j = (m + 1) << 2; j >= 0; --j) {
+ ForkJoinTask<?> t; WorkQueue q; int b;
+ if ((q = ws[r++ & m]) != null && (b = q.base) - q.top < 0) {
+ found = true;
+ if ((t = q.pollAt(b)) != null) {
+ if (q.base - q.top < 0)
+ signalWork(q);
+ t.doExec();
+ }
+ break;
+ }
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Waits and/or attempts to assist performing tasks indefinitely
+ * until the {@link #commonPool()} {@link #isQuiescent}.
+ */
+ static void quiesceCommonPool() {
+ common.awaitQuiescence(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
+ }
+
+ /**
* Interface for extending managed parallelism for tasks running
* in {@link ForkJoinPool}s.
*
* <p>A {@code ManagedBlocker} provides two methods. Method
* {@code isReleasable} must return {@code true} if blocking is
* not necessary. Method {@code block} blocks the current thread
* if necessary (perhaps internally invoking {@code isReleasable}
* before actually blocking). These actions are performed by any
! * thread invoking {@link ForkJoinPool#managedBlock(ManagedBlocker)}.
! * The unusual methods in this API accommodate synchronizers that
! * may, but don't usually, block for long periods. Similarly, they
* allow more efficient internal handling of cases in which
* additional workers may be, but usually are not, needed to
* ensure sufficient parallelism. Toward this end,
* implementations of method {@code isReleasable} must be amenable
* to repeated invocation.
*** 3233,3242 ****
--- 3243,3253 ----
*/
boolean block() throws InterruptedException;
/**
* Returns {@code true} if blocking is unnecessary.
+ * @return {@code true} if blocking is unnecessary
*/
boolean isReleasable();
}
/**
*** 3317,3327 ****
private static final long PLOCK;
private static final long INDEXSEED;
private static final long QLOCK;
static {
! int s; // initialize field offsets for CAS etc
try {
U = sun.misc.Unsafe.getUnsafe();
Class<?> k = ForkJoinPool.class;
CTL = U.objectFieldOffset
(k.getDeclaredField("ctl"));
--- 3328,3338 ----
private static final long PLOCK;
private static final long INDEXSEED;
private static final long QLOCK;
static {
! // initialize field offsets for CAS etc
try {
U = sun.misc.Unsafe.getUnsafe();
Class<?> k = ForkJoinPool.class;
CTL = U.objectFieldOffset
(k.getDeclaredField("ctl"));
*** 3337,3392 ****
Class<?> wk = WorkQueue.class;
QLOCK = U.objectFieldOffset
(wk.getDeclaredField("qlock"));
Class<?> ak = ForkJoinTask[].class;
ABASE = U.arrayBaseOffset(ak);
! s = U.arrayIndexScale(ak);
! ASHIFT = 31 - Integer.numberOfLeadingZeros(s);
} catch (Exception e) {
throw new Error(e);
}
- if ((s & (s-1)) != 0)
- throw new Error("data type scale not a power of two");
! submitters = new ThreadLocal<Submitter>();
! ForkJoinWorkerThreadFactory fac = defaultForkJoinWorkerThreadFactory =
new DefaultForkJoinWorkerThreadFactory();
modifyThreadPermission = new RuntimePermission("modifyThread");
! /*
! * Establish common pool parameters. For extra caution,
! * computations to set up common pool state are here; the
! * constructor just assigns these values to fields.
! */
! int par = 0;
! Thread.UncaughtExceptionHandler handler = null;
! try { // TBD: limit or report ignored exceptions?
String pp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.parallelism");
- String hp = System.getProperty
- ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
String fp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.threadFactory");
if (fp != null)
! fac = ((ForkJoinWorkerThreadFactory)ClassLoader.
getSystemClassLoader().loadClass(fp).newInstance());
if (hp != null)
! handler = ((Thread.UncaughtExceptionHandler)ClassLoader.
getSystemClassLoader().loadClass(hp).newInstance());
- if (pp != null)
- par = Integer.parseInt(pp);
} catch (Exception ignore) {
}
! if (par <= 0)
! par = Runtime.getRuntime().availableProcessors();
! if (par > MAX_CAP)
! par = MAX_CAP;
! commonPoolParallelism = par;
! long np = (long)(-par); // precompute initial ctl value
! long ct = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
!
! commonPool = new ForkJoinPool(par, ct, fac, handler);
}
}
--- 3348,3407 ----
Class<?> wk = WorkQueue.class;
QLOCK = U.objectFieldOffset
(wk.getDeclaredField("qlock"));
Class<?> ak = ForkJoinTask[].class;
ABASE = U.arrayBaseOffset(ak);
! int scale = U.arrayIndexScale(ak);
! if ((scale & (scale - 1)) != 0)
! throw new Error("data type scale not a power of two");
! ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
} catch (Exception e) {
throw new Error(e);
}
! defaultForkJoinWorkerThreadFactory =
new DefaultForkJoinWorkerThreadFactory();
modifyThreadPermission = new RuntimePermission("modifyThread");
! common = java.security.AccessController.doPrivileged
! (new java.security.PrivilegedAction<ForkJoinPool>() {
! public ForkJoinPool run() { return makeCommonPool(); }});
! int par = common.config; // report 1 even if threads disabled
! commonParallelism = par > 0 ? par : 1;
! }
! /**
! * Creates and returns the common pool, respecting user settings
! * specified via system properties.
! */
! private static ForkJoinPool makeCommonPool() {
! int parallelism = -1;
! ForkJoinWorkerThreadFactory factory
! = defaultForkJoinWorkerThreadFactory;
! UncaughtExceptionHandler handler = null;
! try { // ignore exceptions in accesing/parsing properties
String pp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.parallelism");
String fp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.threadFactory");
+ String hp = System.getProperty
+ ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
+ if (pp != null)
+ parallelism = Integer.parseInt(pp);
if (fp != null)
! factory = ((ForkJoinWorkerThreadFactory)ClassLoader.
getSystemClassLoader().loadClass(fp).newInstance());
if (hp != null)
! handler = ((UncaughtExceptionHandler)ClassLoader.
getSystemClassLoader().loadClass(hp).newInstance());
} catch (Exception ignore) {
}
! if (parallelism < 0)
! parallelism = Runtime.getRuntime().availableProcessors();
! if (parallelism > MAX_CAP)
! parallelism = MAX_CAP;
! return new ForkJoinPool(parallelism, factory, handler, false,
! "ForkJoinPool.commonPool-worker-");
}
}