< prev index next >
src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java
Print this page
8246585: ForkJoin updates
Reviewed-by: martin
@@ -36,17 +36,15 @@
package java.util.concurrent;
import java.io.Serializable;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
-import java.lang.ref.ReferenceQueue;
-import java.lang.ref.WeakReference;
import java.lang.reflect.Constructor;
import java.util.Collection;
import java.util.List;
import java.util.RandomAccess;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.LockSupport;
/**
* Abstract base class for tasks that run within a {@link ForkJoinPool}.
* A {@code ForkJoinTask} is a thread-like entity that is much
* lighter weight than a normal thread. Huge numbers of tasks and
@@ -215,352 +213,317 @@
* (1) basic status maintenance
* (2) execution and awaiting completion
* (3) user-level methods that additionally report results.
* This is sometimes hard to see because this file orders exported
* methods in a way that flows well in javadocs.
+ *
+ * Revision notes: The use of "Aux" field replaces previous
+ * reliance on a table to hold exceptions and synchronized blocks
+ * and monitors to wait for completion.
*/
/**
- * The status field holds run control status bits packed into a
- * single int to ensure atomicity. Status is initially zero, and
- * takes on nonnegative values until completed, upon which it
- * holds (sign bit) DONE, possibly with ABNORMAL (cancelled or
- * exceptional) and THROWN (in which case an exception has been
- * stored). Tasks with dependent blocked waiting joiners have the
- * SIGNAL bit set. Completion of a task with SIGNAL set awakens
- * any waiters via notifyAll. (Waiters also help signal others
- * upon completion.)
- *
- * These control bits occupy only (some of) the upper half (16
- * bits) of status field. The lower bits are used for user-defined
- * tags.
- */
- volatile int status; // accessed directly by pool and workers
+ * Nodes for threads waiting for completion, or holding a thrown
+ * exception (never both). Waiting threads prepend nodes
+ * Treiber-stack-style. Signallers detach and unpark
+ * waiters. Cancelled waiters try to unsplice.
+ */
+ static final class Aux {
+ final Thread thread;
+ final Throwable ex; // null if a waiter
+ Aux next; // accessed only via memory-acquire chains
+ Aux(Thread thread, Throwable ex) {
+ this.thread = thread;
+ this.ex = ex;
+ }
+ final boolean casNext(Aux c, Aux v) { // used only in cancellation
+ return NEXT.compareAndSet(this, c, v);
+ }
+ private static final VarHandle NEXT;
+ static {
+ try {
+ NEXT = MethodHandles.lookup()
+ .findVarHandle(Aux.class, "next", Aux.class);
+ } catch (ReflectiveOperationException e) {
+ throw new ExceptionInInitializerError(e);
+ }
+ }
+ }
+ /*
+ * The status field holds bits packed into a single int to ensure
+ * atomicity. Status is initially zero, and takes on nonnegative
+ * values until completed, upon which it holds (sign bit) DONE,
+ * possibly with ABNORMAL (cancelled or exceptional) and THROWN
+ * (in which case an exception has been stored). A value of
+ * ABNORMAL without DONE signifies an interrupted wait. These
+ * control bits occupy only (some of) the upper half (16 bits) of
+ * status field. The lower bits are used for user-defined tags.
+ */
private static final int DONE = 1 << 31; // must be negative
- private static final int ABNORMAL = 1 << 18; // set atomically with DONE
- private static final int THROWN = 1 << 17; // set atomically with ABNORMAL
- private static final int SIGNAL = 1 << 16; // true if joiner waiting
+ private static final int ABNORMAL = 1 << 16;
+ private static final int THROWN = 1 << 17;
private static final int SMASK = 0xffff; // short bits for tags
+ private static final int UNCOMPENSATE = 1 << 16; // helpJoin return sentinel
- /**
- * Constructor for subclasses to call.
- */
- public ForkJoinTask() {}
-
- static boolean isExceptionalStatus(int s) { // needed by subclasses
- return (s & THROWN) != 0;
- }
+ // Fields
+ volatile int status; // accessed directly by pool and workers
+ private transient volatile Aux aux; // either waiters or thrown Exception
- /**
- * Sets DONE status and wakes up threads waiting to join this task.
- *
- * @return status on exit
- */
- private int setDone() {
- int s;
- if (((s = (int)STATUS.getAndBitwiseOr(this, DONE)) & SIGNAL) != 0)
- synchronized (this) { notifyAll(); }
- return s | DONE;
+ // Support for atomic operations
+ private static final VarHandle STATUS;
+ private static final VarHandle AUX;
+ private int getAndBitwiseOrStatus(int v) {
+ return (int)STATUS.getAndBitwiseOr(this, v);
+ }
+ private boolean casStatus(int c, int v) {
+ return STATUS.weakCompareAndSet(this, c, v);
+ }
+ private boolean casAux(Aux c, Aux v) {
+ return AUX.compareAndSet(this, c, v);
+ }
+
+ /** Removes and unparks waiters */
+ private void signalWaiters() {
+ for (Aux a; (a = aux) != null && a.ex == null; ) {
+ if (casAux(a, null)) { // detach entire list
+ for (Thread t; a != null; a = a.next) {
+ if ((t = a.thread) != Thread.currentThread() && t != null)
+ LockSupport.unpark(t); // don't self-signal
}
-
- /**
- * Marks cancelled or exceptional completion unless already done.
- *
- * @param completion must be DONE | ABNORMAL, ORed with THROWN if exceptional
- * @return status on exit
- */
- private int abnormalCompletion(int completion) {
- for (int s, ns;;) {
- if ((s = status) < 0)
- return s;
- else if (STATUS.weakCompareAndSet(this, s, ns = s | completion)) {
- if ((s & SIGNAL) != 0)
- synchronized (this) { notifyAll(); }
- return ns;
+ break;
}
}
}
/**
- * Primary execution method for stolen tasks. Unless done, calls
- * exec and records status if completed, but doesn't wait for
- * completion otherwise.
+ * Possibly blocks until task is done or interrupted or timed out.
*
- * @return status on exit from this method
+ * @param interruptible true if wait can be cancelled by interrupt
+ * @param deadline if non-zero use timed waits and possibly timeout
+ * @param pool if nonnull pool to uncompensate after unblocking
+ * @return status on exit, or ABNORMAL if interrupted while waiting
*/
- final int doExec() {
- int s; boolean completed;
- if ((s = status) >= 0) {
- try {
- completed = exec();
- } catch (Throwable rex) {
- completed = false;
- s = setExceptionalCompletion(rex);
+ private int awaitDone(boolean interruptible, long deadline,
+ ForkJoinPool pool) {
+ int s;
+ boolean interrupted = false, queued = false, parked = false;
+ Aux node = null;
+ while ((s = status) >= 0) {
+ Aux a; long ns;
+ if (parked && Thread.interrupted()) {
+ if (interruptible) {
+ s = ABNORMAL;
+ break;
}
- if (completed)
- s = setDone();
+ interrupted = true;
}
- return s;
+ else if (queued) {
+ if (deadline != 0L) {
+ if ((ns = deadline - System.nanoTime()) <= 0L)
+ break;
+ LockSupport.parkNanos(ns);
}
-
- /**
- * If not done, sets SIGNAL status and performs Object.wait(timeout).
- * This task may or may not be done on exit. Ignores interrupts.
- *
- * @param timeout using Object.wait conventions.
- */
- final void internalWait(long timeout) {
- if ((int)STATUS.getAndBitwiseOr(this, SIGNAL) >= 0) {
- synchronized (this) {
- if (status >= 0)
- try { wait(timeout); } catch (InterruptedException ie) { }
else
- notifyAll();
+ LockSupport.park();
+ parked = true;
}
+ else if (node != null) {
+ if ((a = aux) != null && a.ex != null)
+ Thread.onSpinWait(); // exception in progress
+ else if (queued = casAux(node.next = a, node))
+ LockSupport.setCurrentBlocker(this);
}
- }
-
- /**
- * Blocks a non-worker-thread until completion.
- * @return status upon completion
- */
- private int externalAwaitDone() {
- int s = tryExternalHelp();
- if (s >= 0 && (s = (int)STATUS.getAndBitwiseOr(this, SIGNAL)) >= 0) {
- boolean interrupted = false;
- synchronized (this) {
- for (;;) {
- if ((s = status) >= 0) {
+ else {
try {
- wait(0L);
- } catch (InterruptedException ie) {
- interrupted = true;
+ node = new Aux(Thread.currentThread(), null);
+ } catch (Throwable ex) { // try to cancel if cannot create
+ casStatus(s, s | (DONE | ABNORMAL));
}
}
- else {
- notifyAll();
- break;
+ }
+ if (pool != null)
+ pool.uncompensate();
+
+ if (queued) {
+ LockSupport.setCurrentBlocker(null);
+ if (s >= 0) { // cancellation similar to AbstractQueuedSynchronizer
+ outer: for (Aux a; (a = aux) != null && a.ex == null; ) {
+ for (Aux trail = null;;) {
+ Aux next = a.next;
+ if (a == node) {
+ if (trail != null)
+ trail.casNext(trail, next);
+ else if (casAux(a, next))
+ break outer; // cannot be re-encountered
+ break; // restart
+ } else {
+ trail = a;
+ if ((a = next) == null)
+ break outer;
+ }
}
}
}
+ else {
+ signalWaiters(); // help clean or signal
if (interrupted)
Thread.currentThread().interrupt();
}
+ }
return s;
}
/**
- * Blocks a non-worker-thread until completion or interruption.
+ * Sets DONE status and wakes up threads waiting to join this task.
+ * @return status on exit
*/
- private int externalInterruptibleAwaitDone() throws InterruptedException {
- int s = tryExternalHelp();
- if (s >= 0 && (s = (int)STATUS.getAndBitwiseOr(this, SIGNAL)) >= 0) {
- synchronized (this) {
- for (;;) {
- if ((s = status) >= 0)
- wait(0L);
- else {
- notifyAll();
- break;
- }
- }
- }
- }
- else if (Thread.interrupted())
- throw new InterruptedException();
+ private int setDone() {
+ int s = getAndBitwiseOrStatus(DONE) | DONE;
+ signalWaiters();
return s;
}
/**
- * Tries to help with tasks allowed for external callers.
- *
- * @return current status
+ * Sets ABNORMAL DONE status unless already done, and wakes up threads
+ * waiting to join this task.
+ * @return status on exit
*/
- private int tryExternalHelp() {
+ private int trySetCancelled() {
int s;
- return ((s = status) < 0 ? s:
- (this instanceof CountedCompleter) ?
- ForkJoinPool.common.externalHelpComplete(
- (CountedCompleter<?>)this, 0) :
- ForkJoinPool.common.tryExternalUnpush(this) ?
- doExec() : 0);
+ do {} while ((s = status) >= 0 && !casStatus(s, s |= (DONE | ABNORMAL)));
+ signalWaiters();
+ return s;
}
/**
- * Implementation for join, get, quietlyJoin. Directly handles
- * only cases of already-completed, external wait, and
- * unfork+exec. Others are relayed to ForkJoinPool.awaitJoin.
+ * Records exception and sets ABNORMAL THROWN DONE status unless
+ * already done, and wakes up threads waiting to join this task.
+ * If losing a race with setDone or trySetCancelled, the exception
+ * may be recorded but not reported.
*
- * @return status upon completion
+ * @return status on exit
*/
- private int doJoin() {
- int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
- return (s = status) < 0 ? s :
- ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
- (w = (wt = (ForkJoinWorkerThread)t).workQueue).
- tryUnpush(this) && (s = doExec()) < 0 ? s :
- wt.pool.awaitJoin(w, this, 0L) :
- externalAwaitDone();
+ final int trySetThrown(Throwable ex) {
+ Aux h = new Aux(Thread.currentThread(), ex), p = null;
+ boolean installed = false;
+ int s;
+ while ((s = status) >= 0) {
+ Aux a;
+ if (!installed && ((a = aux) == null || a.ex == null) &&
+ (installed = casAux(a, h)))
+ p = a; // list of waiters replaced by h
+ if (installed && casStatus(s, s |= (DONE | ABNORMAL | THROWN)))
+ break;
+ }
+ for (; p != null; p = p.next)
+ LockSupport.unpark(p.thread);
+ return s;
}
/**
- * Implementation for invoke, quietlyInvoke.
+ * Records exception unless already done. Overridable in subclasses.
*
- * @return status upon completion
+ * @return status on exit
*/
- private int doInvoke() {
- int s; Thread t; ForkJoinWorkerThread wt;
- return (s = doExec()) < 0 ? s :
- ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
- (wt = (ForkJoinWorkerThread)t).pool.
- awaitJoin(wt.workQueue, this, 0L) :
- externalAwaitDone();
+ int trySetException(Throwable ex) {
+ return trySetThrown(ex);
}
- // Exception table support
-
/**
- * Hash table of exceptions thrown by tasks, to enable reporting
- * by callers. Because exceptions are rare, we don't directly keep
- * them with task objects, but instead use a weak ref table. Note
- * that cancellation exceptions don't appear in the table, but are
- * instead recorded as status values.
- *
- * The exception table has a fixed capacity.
+ * Constructor for subclasses to call.
*/
- private static final ExceptionNode[] exceptionTable
- = new ExceptionNode[32];
-
- /** Lock protecting access to exceptionTable. */
- private static final ReentrantLock exceptionTableLock
- = new ReentrantLock();
-
- /** Reference queue of stale exceptionally completed tasks. */
- private static final ReferenceQueue<ForkJoinTask<?>> exceptionTableRefQueue
- = new ReferenceQueue<>();
+ public ForkJoinTask() {}
- /**
- * Key-value nodes for exception table. The chained hash table
- * uses identity comparisons, full locking, and weak references
- * for keys. The table has a fixed capacity because it only
- * maintains task exceptions long enough for joiners to access
- * them, so should never become very large for sustained
- * periods. However, since we do not know when the last joiner
- * completes, we must use weak references and expunge them. We do
- * so on each operation (hence full locking). Also, some thread in
- * any ForkJoinPool will call helpExpungeStaleExceptions when its
- * pool becomes isQuiescent.
- */
- static final class ExceptionNode extends WeakReference<ForkJoinTask<?>> {
- final Throwable ex;
- ExceptionNode next;
- final long thrower; // use id not ref to avoid weak cycles
- final int hashCode; // store task hashCode before weak ref disappears
- ExceptionNode(ForkJoinTask<?> task, Throwable ex, ExceptionNode next,
- ReferenceQueue<ForkJoinTask<?>> exceptionTableRefQueue) {
- super(task, exceptionTableRefQueue);
- this.ex = ex;
- this.next = next;
- this.thrower = Thread.currentThread().getId();
- this.hashCode = System.identityHashCode(task);
- }
+ static boolean isExceptionalStatus(int s) { // needed by subclasses
+ return (s & THROWN) != 0;
}
/**
- * Records exception and sets status.
+ * Unless done, calls exec and records status if completed, but
+ * doesn't wait for completion otherwise.
*
- * @return status on exit
+ * @return status on exit from this method
*/
- final int recordExceptionalCompletion(Throwable ex) {
- int s;
+ final int doExec() {
+ int s; boolean completed;
if ((s = status) >= 0) {
- int h = System.identityHashCode(this);
- final ReentrantLock lock = exceptionTableLock;
- lock.lock();
try {
- expungeStaleExceptions();
- ExceptionNode[] t = exceptionTable;
- int i = h & (t.length - 1);
- for (ExceptionNode e = t[i]; ; e = e.next) {
- if (e == null) {
- t[i] = new ExceptionNode(this, ex, t[i],
- exceptionTableRefQueue);
- break;
- }
- if (e.get() == this) // already present
- break;
- }
- } finally {
- lock.unlock();
+ completed = exec();
+ } catch (Throwable rex) {
+ s = trySetException(rex);
+ completed = false;
}
- s = abnormalCompletion(DONE | ABNORMAL | THROWN);
+ if (completed)
+ s = setDone();
}
return s;
}
/**
- * Records exception and possibly propagates.
+ * Helps and/or waits for completion from join, get, or invoke;
+ * called from either internal or external threads.
*
- * @return status on exit
- */
- private int setExceptionalCompletion(Throwable ex) {
- int s = recordExceptionalCompletion(ex);
- if ((s & THROWN) != 0)
- internalPropagateException(ex);
- return s;
+ * @param ran true if task known to have been exec'd
+ * @param interruptible true if park interruptibly when external
+ * @param timed true if use timed wait
+ * @param nanos if timed, timeout value
+ * @return ABNORMAL if interrupted, else status on exit
+ */
+ private int awaitJoin(boolean ran, boolean interruptible, boolean timed,
+ long nanos) {
+ boolean internal; ForkJoinPool p; ForkJoinPool.WorkQueue q; int s;
+ Thread t; ForkJoinWorkerThread wt;
+ if (internal = ((t = Thread.currentThread())
+ instanceof ForkJoinWorkerThread)) {
+ p = (wt = (ForkJoinWorkerThread)t).pool;
+ q = wt.workQueue;
}
-
- /**
- * Hook for exception propagation support for tasks with completers.
- */
- void internalPropagateException(Throwable ex) {
+ else {
+ p = ForkJoinPool.common;
+ q = ForkJoinPool.commonQueue();
+ if (interruptible && Thread.interrupted())
+ return ABNORMAL;
}
-
- /**
- * Cancels, ignoring any exceptions thrown by cancel. Used during
- * worker and pool shutdown. Cancel is spec'ed not to throw any
- * exceptions, but if it does anyway, we have no recourse during
- * shutdown, so guard against this case.
- */
- static final void cancelIgnoringExceptions(ForkJoinTask<?> t) {
- if (t != null && t.status >= 0) {
- try {
- t.cancel(false);
- } catch (Throwable ignore) {
+ if ((s = status) < 0)
+ return s;
+ long deadline = 0L;
+ if (timed) {
+ if (nanos <= 0L)
+ return 0;
+ else if ((deadline = nanos + System.nanoTime()) == 0L)
+ deadline = 1L;
+ }
+ ForkJoinPool uncompensate = null;
+ if (q != null && p != null) { // try helping
+ if ((!timed || p.isSaturated()) &&
+ ((this instanceof CountedCompleter) ?
+ (s = p.helpComplete(this, q, internal)) < 0 :
+ (q.tryRemove(this, internal) && (s = doExec()) < 0)))
+ return s;
+ if (internal) {
+ if ((s = p.helpJoin(this, q)) < 0)
+ return s;
+ if (s == UNCOMPENSATE)
+ uncompensate = p;
+ interruptible = false;
}
}
+ return awaitDone(interruptible, deadline, uncompensate);
}
/**
- * Removes exception node and clears status.
+ * Cancels, ignoring any exceptions thrown by cancel. Cancel is
+ * spec'ed not to throw any exceptions, but if it does anyway, we
+ * have no recourse, so guard against this case.
*/
- private void clearExceptionalCompletion() {
- int h = System.identityHashCode(this);
- final ReentrantLock lock = exceptionTableLock;
- lock.lock();
+ static final void cancelIgnoringExceptions(Future<?> t) {
+ if (t != null) {
try {
- ExceptionNode[] t = exceptionTable;
- int i = h & (t.length - 1);
- ExceptionNode e = t[i];
- ExceptionNode pred = null;
- while (e != null) {
- ExceptionNode next = e.next;
- if (e.get() == this) {
- if (pred == null)
- t[i] = next;
- else
- pred.next = next;
- break;
+ t.cancel(true);
+ } catch (Throwable ignore) {
}
- pred = e;
- e = next;
- }
- expungeStaleExceptions();
- status = 0;
- } finally {
- lock.unlock();
}
}
/**
* Returns a rethrowable exception for this task, if available.
@@ -575,115 +538,92 @@
* trace.
*
* @return the exception, or null if none
*/
private Throwable getThrowableException() {
- int h = System.identityHashCode(this);
- ExceptionNode e;
- final ReentrantLock lock = exceptionTableLock;
- lock.lock();
+ Throwable ex; Aux a;
+ if ((a = aux) == null)
+ ex = null;
+ else if ((ex = a.ex) != null && a.thread != Thread.currentThread()) {
try {
- expungeStaleExceptions();
- ExceptionNode[] t = exceptionTable;
- e = t[h & (t.length - 1)];
- while (e != null && e.get() != this)
- e = e.next;
- } finally {
- lock.unlock();
- }
- Throwable ex;
- if (e == null || (ex = e.ex) == null)
- return null;
- if (e.thrower != Thread.currentThread().getId()) {
- try {
- Constructor<?> noArgCtor = null;
- // public ctors only
+ Constructor<?> noArgCtor = null, oneArgCtor = null;
for (Constructor<?> c : ex.getClass().getConstructors()) {
Class<?>[] ps = c.getParameterTypes();
if (ps.length == 0)
noArgCtor = c;
- else if (ps.length == 1 && ps[0] == Throwable.class)
- return (Throwable)c.newInstance(ex);
+ else if (ps.length == 1 && ps[0] == Throwable.class) {
+ oneArgCtor = c;
+ break;
}
- if (noArgCtor != null) {
- Throwable wx = (Throwable)noArgCtor.newInstance();
- wx.initCause(ex);
- return wx;
+ }
+ if (oneArgCtor != null)
+ ex = (Throwable)oneArgCtor.newInstance(ex);
+ else if (noArgCtor != null) {
+ Throwable rx = (Throwable)noArgCtor.newInstance();
+ rx.initCause(ex);
+ ex = rx;
}
} catch (Exception ignore) {
}
}
return ex;
}
/**
- * Polls stale refs and removes them. Call only while holding lock.
+ * Returns exception associated with the given status, or null if none.
*/
- private static void expungeStaleExceptions() {
- for (Object x; (x = exceptionTableRefQueue.poll()) != null;) {
- if (x instanceof ExceptionNode) {
- ExceptionNode[] t = exceptionTable;
- int i = ((ExceptionNode)x).hashCode & (t.length - 1);
- ExceptionNode e = t[i];
- ExceptionNode pred = null;
- while (e != null) {
- ExceptionNode next = e.next;
- if (e == x) {
- if (pred == null)
- t[i] = next;
- else
- pred.next = next;
- break;
- }
- pred = e;
- e = next;
- }
- }
- }
+ private Throwable getException(int s) {
+ Throwable ex = null;
+ if ((s & ABNORMAL) != 0 &&
+ ((s & THROWN) == 0 || (ex = getThrowableException()) == null))
+ ex = new CancellationException();
+ return ex;
}
/**
- * If lock is available, polls stale refs and removes them.
- * Called from ForkJoinPool when pools become quiescent.
+ * Throws exception associated with the given status, or
+ * CancellationException if none recorded.
*/
- static final void helpExpungeStaleExceptions() {
- final ReentrantLock lock = exceptionTableLock;
- if (lock.tryLock()) {
- try {
- expungeStaleExceptions();
- } finally {
- lock.unlock();
- }
+ private void reportException(int s) {
+ ForkJoinTask.<RuntimeException>uncheckedThrow(
+ (s & THROWN) != 0 ? getThrowableException() : null);
}
+
+ /**
+ * Throws exception for (timed or untimed) get, wrapping if
+ * necessary in an ExecutionException.
+ */
+ private void reportExecutionException(int s) {
+ Throwable ex = null;
+ if (s == ABNORMAL)
+ ex = new InterruptedException();
+ else if (s >= 0)
+ ex = new TimeoutException();
+ else if ((s & THROWN) != 0 && (ex = getThrowableException()) != null)
+ ex = new ExecutionException(ex);
+ ForkJoinTask.<RuntimeException>uncheckedThrow(ex);
}
/**
- * A version of "sneaky throw" to relay exceptions.
+ * A version of "sneaky throw" to relay exceptions in other
+ * contexts.
*/
static void rethrow(Throwable ex) {
ForkJoinTask.<RuntimeException>uncheckedThrow(ex);
}
/**
* The sneaky part of sneaky throw, relying on generics
* limitations to evade compiler complaints about rethrowing
- * unchecked exceptions.
+ * unchecked exceptions. If argument null, throws
+ * CancellationException.
*/
@SuppressWarnings("unchecked") static <T extends Throwable>
void uncheckedThrow(Throwable t) throws T {
- if (t != null)
+ if (t == null)
+ t = new CancellationException();
throw (T)t; // rely on vacuous cast
- else
- throw new Error("Unknown Exception");
- }
-
- /**
- * Throws exception, if any, associated with the given status.
- */
- private void reportException(int s) {
- rethrow((s & THROWN) != 0 ? getThrowableException() :
- new CancellationException());
}
// public methods
/**
@@ -700,13 +640,13 @@
* true}.
*
* @return {@code this}, to simplify usage
*/
public final ForkJoinTask<V> fork() {
- Thread t;
+ Thread t; ForkJoinWorkerThread w;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
- ((ForkJoinWorkerThread)t).workQueue.push(this);
+ (w = (ForkJoinWorkerThread)t).workQueue.push(this, w.pool);
else
ForkJoinPool.common.externalPush(this);
return this;
}
@@ -721,11 +661,13 @@
*
* @return the computed result
*/
public final V join() {
int s;
- if (((s = doJoin()) & ABNORMAL) != 0)
+ if ((s = status) >= 0)
+ s = awaitJoin(false, false, false, 0L);
+ if ((s & ABNORMAL) != 0)
reportException(s);
return getRawResult();
}
/**
@@ -736,11 +678,13 @@
*
* @return the computed result
*/
public final V invoke() {
int s;
- if (((s = doInvoke()) & ABNORMAL) != 0)
+ if ((s = doExec()) >= 0)
+ s = awaitJoin(true, false, false, 0L);
+ if ((s & ABNORMAL) != 0)
reportException(s);
return getRawResult();
}
/**
@@ -760,14 +704,20 @@
* @param t2 the second task
* @throws NullPointerException if any task is null
*/
public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
int s1, s2;
+ if (t1 == null || t2 == null)
+ throw new NullPointerException();
t2.fork();
- if (((s1 = t1.doInvoke()) & ABNORMAL) != 0)
+ if ((s1 = t1.doExec()) >= 0)
+ s1 = t1.awaitJoin(true, false, false, 0L);
+ if ((s1 & ABNORMAL) != 0) {
+ cancelIgnoringExceptions(t2);
t1.reportException(s1);
- if (((s2 = t2.doJoin()) & ABNORMAL) != 0)
+ }
+ else if (((s2 = t2.awaitJoin(false, false, false, 0L)) & ABNORMAL) != 0)
t2.reportException(s2);
}
/**
* Forks the given tasks, returning when {@code isDone} holds for
@@ -786,32 +736,43 @@
*/
public static void invokeAll(ForkJoinTask<?>... tasks) {
Throwable ex = null;
int last = tasks.length - 1;
for (int i = last; i >= 0; --i) {
- ForkJoinTask<?> t = tasks[i];
- if (t == null) {
- if (ex == null)
+ ForkJoinTask<?> t;
+ if ((t = tasks[i]) == null) {
ex = new NullPointerException();
+ break;
+ }
+ if (i == 0) {
+ int s;
+ if ((s = t.doExec()) >= 0)
+ s = t.awaitJoin(true, false, false, 0L);
+ if ((s & ABNORMAL) != 0)
+ ex = t.getException(s);
+ break;
}
- else if (i != 0)
t.fork();
- else if ((t.doInvoke() & ABNORMAL) != 0 && ex == null)
- ex = t.getException();
}
+ if (ex == null) {
for (int i = 1; i <= last; ++i) {
- ForkJoinTask<?> t = tasks[i];
- if (t != null) {
- if (ex != null)
- t.cancel(false);
- else if ((t.doJoin() & ABNORMAL) != 0)
- ex = t.getException();
+ ForkJoinTask<?> t;
+ if ((t = tasks[i]) != null) {
+ int s;
+ if ((s = t.status) >= 0)
+ s = t.awaitJoin(false, false, false, 0L);
+ if ((s & ABNORMAL) != 0 && (ex = t.getException(s)) != null)
+ break;
}
}
- if (ex != null)
+ }
+ if (ex != null) {
+ for (int i = 1; i <= last; ++i)
+ cancelIgnoringExceptions(tasks[i]);
rethrow(ex);
}
+ }
/**
* Forks all tasks in the specified collection, returning when
* {@code isDone} holds for each task or an (unchecked) exception
* is encountered, in which case the exception is rethrown. If
@@ -836,33 +797,44 @@
}
@SuppressWarnings("unchecked")
List<? extends ForkJoinTask<?>> ts =
(List<? extends ForkJoinTask<?>>) tasks;
Throwable ex = null;
- int last = ts.size() - 1;
+ int last = ts.size() - 1; // nearly same as array version
for (int i = last; i >= 0; --i) {
- ForkJoinTask<?> t = ts.get(i);
- if (t == null) {
- if (ex == null)
+ ForkJoinTask<?> t;
+ if ((t = ts.get(i)) == null) {
ex = new NullPointerException();
+ break;
+ }
+ if (i == 0) {
+ int s;
+ if ((s = t.doExec()) >= 0)
+ s = t.awaitJoin(true, false, false, 0L);
+ if ((s & ABNORMAL) != 0)
+ ex = t.getException(s);
+ break;
}
- else if (i != 0)
t.fork();
- else if ((t.doInvoke() & ABNORMAL) != 0 && ex == null)
- ex = t.getException();
}
+ if (ex == null) {
for (int i = 1; i <= last; ++i) {
- ForkJoinTask<?> t = ts.get(i);
- if (t != null) {
- if (ex != null)
- t.cancel(false);
- else if ((t.doJoin() & ABNORMAL) != 0)
- ex = t.getException();
+ ForkJoinTask<?> t;
+ if ((t = ts.get(i)) != null) {
+ int s;
+ if ((s = t.status) >= 0)
+ s = t.awaitJoin(false, false, false, 0L);
+ if ((s & ABNORMAL) != 0 && (ex = t.getException(s)) != null)
+ break;
}
}
- if (ex != null)
+ }
+ if (ex != null) {
+ for (int i = 1; i <= last; ++i)
+ cancelIgnoringExceptions(ts.get(i));
rethrow(ex);
+ }
return tasks;
}
/**
* Attempts to cancel execution of this task. This attempt will
@@ -890,12 +862,11 @@
* control cancellation.
*
* @return {@code true} if this task is now cancelled
*/
public boolean cancel(boolean mayInterruptIfRunning) {
- int s = abnormalCompletion(DONE | ABNORMAL);
- return (s & (ABNORMAL | THROWN)) == ABNORMAL;
+ return (trySetCancelled() & (ABNORMAL | THROWN)) == ABNORMAL;
}
public final boolean isDone() {
return status < 0;
}
@@ -930,14 +901,11 @@
* none or if the method has not yet completed.
*
* @return the exception, or {@code null} if none
*/
public final Throwable getException() {
- int s = status;
- return ((s & ABNORMAL) == 0 ? null :
- (s & THROWN) == 0 ? new CancellationException() :
- getThrowableException());
+ return getException(status);
}
/**
* Completes this task abnormally, and if not already aborted or
* cancelled, causes it to throw the given exception upon
@@ -951,11 +919,11 @@
* @param ex the exception to throw. If this exception is not a
* {@code RuntimeException} or {@code Error}, the actual exception
* thrown will be a {@code RuntimeException} with cause {@code ex}.
*/
public void completeExceptionally(Throwable ex) {
- setExceptionalCompletion((ex instanceof RuntimeException) ||
+ trySetException((ex instanceof RuntimeException) ||
(ex instanceof Error) ? ex :
new RuntimeException(ex));
}
/**
@@ -973,11 +941,11 @@
*/
public void complete(V value) {
try {
setRawResult(value);
} catch (Throwable rex) {
- setExceptionalCompletion(rex);
+ trySetException(rex);
return;
}
setDone();
}
@@ -1003,17 +971,13 @@
* exception
* @throws InterruptedException if the current thread is not a
* member of a ForkJoinPool and was interrupted while waiting
*/
public final V get() throws InterruptedException, ExecutionException {
- int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
- doJoin() : externalInterruptibleAwaitDone();
- if ((s & THROWN) != 0)
- throw new ExecutionException(getThrowableException());
- else if ((s & ABNORMAL) != 0)
- throw new CancellationException();
- else
+ int s;
+ if (((s = awaitJoin(false, true, false, 0L)) & ABNORMAL) != 0)
+ reportExecutionException(s);
return getRawResult();
}
/**
* Waits if necessary for at most the given time for the computation
@@ -1030,85 +994,51 @@
* @throws TimeoutException if the wait timed out
*/
public final V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
int s;
- long nanos = unit.toNanos(timeout);
- if (Thread.interrupted())
- throw new InterruptedException();
- if ((s = status) >= 0 && nanos > 0L) {
- long d = System.nanoTime() + nanos;
- long deadline = (d == 0L) ? 1L : d; // avoid 0
- Thread t = Thread.currentThread();
- if (t instanceof ForkJoinWorkerThread) {
- ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
- s = wt.pool.awaitJoin(wt.workQueue, this, deadline);
- }
- else if ((s = ((this instanceof CountedCompleter) ?
- ForkJoinPool.common.externalHelpComplete(
- (CountedCompleter<?>)this, 0) :
- ForkJoinPool.common.tryExternalUnpush(this) ?
- doExec() : 0)) >= 0) {
- long ns, ms; // measure in nanosecs, but wait in millisecs
- while ((s = status) >= 0 &&
- (ns = deadline - System.nanoTime()) > 0L) {
- if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) > 0L &&
- (s = (int)STATUS.getAndBitwiseOr(this, SIGNAL)) >= 0) {
- synchronized (this) {
- if (status >= 0)
- wait(ms); // OK to throw InterruptedException
- else
- notifyAll();
- }
- }
- }
- }
- }
- if (s >= 0)
- throw new TimeoutException();
- else if ((s & THROWN) != 0)
- throw new ExecutionException(getThrowableException());
- else if ((s & ABNORMAL) != 0)
- throw new CancellationException();
- else
+ if ((s = awaitJoin(false, true, true, unit.toNanos(timeout))) >= 0 ||
+ (s & ABNORMAL) != 0)
+ reportExecutionException(s);
return getRawResult();
}
/**
* Joins this task, without returning its result or throwing its
* exception. This method may be useful when processing
* collections of tasks when some have been cancelled or otherwise
* known to have aborted.
*/
public final void quietlyJoin() {
- doJoin();
+ if (status >= 0)
+ awaitJoin(false, false, false, 0L);
}
/**
* Commences performing this task and awaits its completion if
* necessary, without returning its result or throwing its
* exception.
*/
public final void quietlyInvoke() {
- doInvoke();
+ if (doExec() >= 0)
+ awaitJoin(true, false, false, 0L);
}
/**
* Possibly executes tasks until the pool hosting the current task
* {@linkplain ForkJoinPool#isQuiescent is quiescent}. This
* method may be of use in designs in which many tasks are forked,
* but none are explicitly joined, instead executing them until
* all are processed.
*/
public static void helpQuiesce() {
- Thread t;
- if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
- ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
- wt.pool.helpQuiescePool(wt.workQueue);
- }
+ Thread t; ForkJoinWorkerThread w; ForkJoinPool p;
+ if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread &&
+ (p = (w = (ForkJoinWorkerThread)t).pool) != null)
+ p.helpQuiescePool(w.workQueue, Long.MAX_VALUE, false);
else
- ForkJoinPool.quiesceCommonPool();
+ ForkJoinPool.common.externalHelpQuiescePool(Long.MAX_VALUE, false);
}
/**
* Resets the internal bookkeeping state of this task, allowing a
* subsequent {@code fork}. This method allows repeated reuse of
@@ -1124,13 +1054,11 @@
* null}. However, the value returned by {@code getRawResult} is
* unaffected. To clear this value, you can invoke {@code
* setRawResult(null)}.
*/
public void reinitialize() {
- if ((status & THROWN) != 0)
- clearExceptionalCompletion();
- else
+ aux = null;
status = 0;
}
/**
* Returns the pool hosting the current thread, or {@code null}
@@ -1140,13 +1068,13 @@
* #inForkJoinPool} returns {@code false}.
*
* @return the pool, or {@code null} if none
*/
public static ForkJoinPool getPool() {
- Thread t = Thread.currentThread();
- return (t instanceof ForkJoinWorkerThread) ?
- ((ForkJoinWorkerThread) t).pool : null;
+ Thread t;
+ return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
+ ((ForkJoinWorkerThread) t).pool : null);
}
/**
* Returns {@code true} if the current thread is a {@link
* ForkJoinWorkerThread} executing as a ForkJoinPool computation.
@@ -1168,14 +1096,16 @@
* that could have been, but were not, stolen.
*
* @return {@code true} if unforked
*/
public boolean tryUnfork() {
- Thread t;
- return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
- ((ForkJoinWorkerThread)t).workQueue.tryUnpush(this) :
- ForkJoinPool.common.tryExternalUnpush(this));
+ Thread t; ForkJoinPool.WorkQueue q;
+ return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
+ ? (q = ((ForkJoinWorkerThread)t).workQueue) != null
+ && q.tryUnpush(this)
+ : (q = ForkJoinPool.commonQueue()) != null
+ && q.externalTryUnpush(this);
}
/**
* Returns an estimate of the number of tasks that have been
* forked by the current worker thread but not yet executed. This
@@ -1187,11 +1117,11 @@
public static int getQueuedTaskCount() {
Thread t; ForkJoinPool.WorkQueue q;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
q = ((ForkJoinWorkerThread)t).workQueue;
else
- q = ForkJoinPool.commonSubmitterQueue();
+ q = ForkJoinPool.commonQueue();
return (q == null) ? 0 : q.queueSize();
}
/**
* Returns an estimate of how many more locally queued tasks are
@@ -1262,11 +1192,11 @@
protected static ForkJoinTask<?> peekNextLocalTask() {
Thread t; ForkJoinPool.WorkQueue q;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
q = ((ForkJoinWorkerThread)t).workQueue;
else
- q = ForkJoinPool.commonSubmitterQueue();
+ q = ForkJoinPool.commonQueue();
return (q == null) ? null : q.peek();
}
/**
* Unschedules and returns, without executing, the next task
@@ -1277,13 +1207,12 @@
*
* @return the next task, or {@code null} if none are available
*/
protected static ForkJoinTask<?> pollNextLocalTask() {
Thread t;
- return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
- ((ForkJoinWorkerThread)t).workQueue.nextLocalTask() :
- null;
+ return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
+ ((ForkJoinWorkerThread)t).workQueue.nextLocalTask() : null);
}
/**
* If the current thread is operating in a ForkJoinPool,
* unschedules and returns, without executing, the next task
@@ -1296,14 +1225,14 @@
* otherwise.
*
* @return a task, or {@code null} if none are available
*/
protected static ForkJoinTask<?> pollTask() {
- Thread t; ForkJoinWorkerThread wt;
- return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
- (wt = (ForkJoinWorkerThread)t).pool.nextTaskFor(wt.workQueue) :
- null;
+ Thread t; ForkJoinWorkerThread w;
+ return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
+ (w = (ForkJoinWorkerThread)t).pool.nextTaskFor(w.workQueue) :
+ null);
}
/**
* If the current thread is operating in a ForkJoinPool,
* unschedules and returns, without executing, a task externally
@@ -1315,12 +1244,12 @@
* @return a task, or {@code null} if none are available
* @since 9
*/
protected static ForkJoinTask<?> pollSubmission() {
Thread t;
- return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
- ((ForkJoinWorkerThread)t).pool.pollSubmission() : null;
+ return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
+ ((ForkJoinWorkerThread)t).pool.pollSubmission() : null);
}
// tag operations
/**
@@ -1340,12 +1269,11 @@
* @return the previous value of the tag
* @since 1.8
*/
public final short setForkJoinTaskTag(short newValue) {
for (int s;;) {
- if (STATUS.weakCompareAndSet(this, s = status,
- (s & ~SMASK) | (newValue & SMASK)))
+ if (casStatus(s = status, (s & ~SMASK) | (newValue & SMASK)))
return (short)s;
}
}
/**
@@ -1364,12 +1292,11 @@
*/
public final boolean compareAndSetForkJoinTaskTag(short expect, short update) {
for (int s;;) {
if ((short)(s = status) != expect)
return false;
- if (STATUS.weakCompareAndSet(this, s,
- (s & ~SMASK) | (update & SMASK)))
+ if (casStatus(s, (s & ~SMASK) | (update & SMASK)))
return true;
}
}
/**
@@ -1430,12 +1357,21 @@
this.runnable = runnable;
}
public final Void getRawResult() { return null; }
public final void setRawResult(Void v) { }
public final boolean exec() { runnable.run(); return true; }
- void internalPropagateException(Throwable ex) {
- rethrow(ex); // rethrow outside exec() catches.
+ int trySetException(Throwable ex) { // if a handler, invoke it
+ int s; Thread t; java.lang.Thread.UncaughtExceptionHandler h;
+ if (isExceptionalStatus(s = trySetThrown(ex)) &&
+ (h = ((t = Thread.currentThread()).
+ getUncaughtExceptionHandler())) != null) {
+ try {
+ h.uncaughtException(t, ex);
+ } catch (Throwable ignore) {
+ }
+ }
+ return s;
}
private static final long serialVersionUID = 5232453952276885070L;
}
/**
@@ -1468,10 +1404,57 @@
return super.toString() + "[Wrapped task = " + callable + "]";
}
private static final long serialVersionUID = 2838392045355241008L;
}
+ static final class AdaptedInterruptibleCallable<T> extends ForkJoinTask<T>
+ implements RunnableFuture<T> {
+ @SuppressWarnings("serial") // Conditionally serializable
+ final Callable<? extends T> callable;
+ @SuppressWarnings("serial") // Conditionally serializable
+ transient volatile Thread runner;
+ T result;
+ AdaptedInterruptibleCallable(Callable<? extends T> callable) {
+ if (callable == null) throw new NullPointerException();
+ this.callable = callable;
+ }
+ public final T getRawResult() { return result; }
+ public final void setRawResult(T v) { result = v; }
+ public final boolean exec() {
+ Thread.interrupted();
+ runner = Thread.currentThread();
+ try {
+ if (!isDone()) // recheck
+ result = callable.call();
+ return true;
+ } catch (RuntimeException rex) {
+ throw rex;
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ } finally {
+ runner = null;
+ Thread.interrupted();
+ }
+ }
+ public final void run() { invoke(); }
+ public final boolean cancel(boolean mayInterruptIfRunning) {
+ Thread t;
+ boolean stat = super.cancel(false);
+ if (mayInterruptIfRunning && (t = runner) != null) {
+ try {
+ t.interrupt();
+ } catch (Throwable ignore) {
+ }
+ }
+ return stat;
+ }
+ public String toString() {
+ return super.toString() + "[Wrapped task = " + callable + "]";
+ }
+ private static final long serialVersionUID = 2838392045355241008L;
+ }
+
/**
* Returns a new {@code ForkJoinTask} that performs the {@code run}
* method of the given {@code Runnable} as its action, and returns
* a null result upon {@link #join}.
*
@@ -1508,10 +1491,30 @@
*/
public static <T> ForkJoinTask<T> adapt(Callable<? extends T> callable) {
return new AdaptedCallable<T>(callable);
}
+ /**
+ * Returns a new {@code ForkJoinTask} that performs the {@code call}
+ * method of the given {@code Callable} as its action, and returns
+ * its result upon {@link #join}, translating any checked exceptions
+ * encountered into {@code RuntimeException}. Additionally,
+ * invocations of {@code cancel} with {@code mayInterruptIfRunning
+ * true} will attempt to interrupt the thread performing the task.
+ *
+ * @param callable the callable action
+ * @param <T> the type of the callable's result
+ * @return the task
+ *
+ * @since 17
+ */
+ // adaptInterruptible deferred to its own independent change
+ // https://bugs.openjdk.java.net/browse/JDK-8246587
+ /* TODO: public */ private static <T> ForkJoinTask<T> adaptInterruptible(Callable<? extends T> callable) {
+ return new AdaptedInterruptibleCallable<T>(callable);
+ }
+
// Serialization support
private static final long serialVersionUID = -7721805057305804111L;
/**
@@ -1522,12 +1525,13 @@
* @serialData the current run status and the exception thrown
* during execution, or {@code null} if none
*/
private void writeObject(java.io.ObjectOutputStream s)
throws java.io.IOException {
+ Aux a;
s.defaultWriteObject();
- s.writeObject(getException());
+ s.writeObject((a = aux) == null ? null : a.ex);
}
/**
* Reconstitutes this task from a stream (that is, deserializes it).
* @param s the stream
@@ -1538,19 +1542,18 @@
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
Object ex = s.readObject();
if (ex != null)
- setExceptionalCompletion((Throwable)ex);
+ trySetThrown((Throwable)ex);
}
- // VarHandle mechanics
- private static final VarHandle STATUS;
static {
try {
MethodHandles.Lookup l = MethodHandles.lookup();
STATUS = l.findVarHandle(ForkJoinTask.class, "status", int.class);
+ AUX = l.findVarHandle(ForkJoinTask.class, "aux", Aux.class);
} catch (ReflectiveOperationException e) {
throw new ExceptionInInitializerError(e);
}
}
< prev index next >