< prev index next >
src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java
Print this page
8192944: Miscellaneous changes imported from jsr166 CVS 2017-12-08
Reviewed-by: martin, psandoz, chegar
*** 217,272 ****
* (3) user-level methods that additionally report results.
* This is sometimes hard to see because this file orders exported
* methods in a way that flows well in javadocs.
*/
! /*
* The status field holds run control status bits packed into a
! * single int to minimize footprint and to ensure atomicity (via
! * CAS). Status is initially zero, and takes on nonnegative
! * values until completed, upon which status (anded with
! * DONE_MASK) holds value NORMAL, CANCELLED, or EXCEPTIONAL. Tasks
! * undergoing blocking waits by other threads have the SIGNAL bit
! * set. Completion of a stolen task with SIGNAL set awakens any
! * waiters via notifyAll. Even though suboptimal for some
! * purposes, we use basic builtin wait/notify to take advantage of
! * "monitor inflation" in JVMs that we would otherwise need to
! * emulate to avoid adding further per-task bookkeeping overhead.
! * We want these monitors to be "fat", i.e., not use biasing or
! * thin-lock techniques, so use some odd coding idioms that tend
! * to avoid them, mainly by arranging that every synchronized
! * block performs a wait, notifyAll or both.
*
* These control bits occupy only (some of) the upper half (16
* bits) of status field. The lower bits are used for user-defined
* tags.
*/
-
- /** The run status of this task */
volatile int status; // accessed directly by pool and workers
! static final int DONE_MASK = 0xf0000000; // mask out non-completion bits
! static final int NORMAL = 0xf0000000; // must be negative
! static final int CANCELLED = 0xc0000000; // must be < NORMAL
! static final int EXCEPTIONAL = 0x80000000; // must be < CANCELLED
! static final int SIGNAL = 0x00010000; // must be >= 1 << 16
! static final int SMASK = 0x0000ffff; // short bits for tags
/**
! * Marks completion and wakes up threads waiting to join this
! * task.
*
! * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
! * @return completion status on exit
*/
! private int setCompletion(int completion) {
! for (int s;;) {
if ((s = status) < 0)
return s;
! if (STATUS.compareAndSet(this, s, s | completion)) {
! if ((s >>> 16) != 0)
synchronized (this) { notifyAll(); }
! return completion;
}
}
}
/**
--- 217,279 ----
* (3) user-level methods that additionally report results.
* This is sometimes hard to see because this file orders exported
* methods in a way that flows well in javadocs.
*/
! /**
* The status field holds run control status bits packed into a
! * single int to ensure atomicity. Status is initially zero, and
! * takes on nonnegative values until completed, upon which it
! * holds (sign bit) DONE, possibly with ABNORMAL (cancelled or
! * exceptional) and THROWN (in which case an exception has been
! * stored). Tasks with dependent blocked waiting joiners have the
! * SIGNAL bit set. Completion of a task with SIGNAL set awakens
! * any waiters via notifyAll. (Waiters also help signal others
! * upon completion.)
*
* These control bits occupy only (some of) the upper half (16
* bits) of status field. The lower bits are used for user-defined
* tags.
*/
volatile int status; // accessed directly by pool and workers
!
! private static final int DONE = 1 << 31; // must be negative
! private static final int ABNORMAL = 1 << 18; // set atomically with DONE
! private static final int THROWN = 1 << 17; // set atomically with ABNORMAL
! private static final int SIGNAL = 1 << 16; // true if joiner waiting
! private static final int SMASK = 0xffff; // short bits for tags
!
! static boolean isExceptionalStatus(int s) { // needed by subclasses
! return (s & THROWN) != 0;
! }
/**
! * Sets DONE status and wakes up threads waiting to join this task.
*
! * @return status on exit
*/
! private int setDone() {
! int s;
! if (((s = (int)STATUS.getAndBitwiseOr(this, DONE)) & SIGNAL) != 0)
! synchronized (this) { notifyAll(); }
! return s | DONE;
! }
!
! /**
! * Marks cancelled or exceptional completion unless already done.
! *
! * @param completion must be DONE | ABNORMAL, ORed with THROWN if exceptional
! * @return status on exit
! */
! private int abnormalCompletion(int completion) {
! for (int s, ns;;) {
if ((s = status) < 0)
return s;
! else if (STATUS.weakCompareAndSet(this, s, ns = s | completion)) {
! if ((s & SIGNAL) != 0)
synchronized (this) { notifyAll(); }
! return ns;
}
}
}
/**
*** 280,293 ****
int s; boolean completed;
if ((s = status) >= 0) {
try {
completed = exec();
} catch (Throwable rex) {
! return setExceptionalCompletion(rex);
}
if (completed)
! s = setCompletion(NORMAL);
}
return s;
}
/**
--- 287,301 ----
int s; boolean completed;
if ((s = status) >= 0) {
try {
completed = exec();
} catch (Throwable rex) {
! completed = false;
! s = setExceptionalCompletion(rex);
}
if (completed)
! s = setDone();
}
return s;
}
/**
*** 295,307 ****
* This task may or may not be done on exit. Ignores interrupts.
*
* @param timeout using Object.wait conventions.
*/
final void internalWait(long timeout) {
! int s;
! if ((s = status) >= 0 && // force completer to issue notify
! STATUS.compareAndSet(this, s, s | SIGNAL)) {
synchronized (this) {
if (status >= 0)
try { wait(timeout); } catch (InterruptedException ie) { }
else
notifyAll();
--- 303,313 ----
* This task may or may not be done on exit. Ignores interrupts.
*
* @param timeout using Object.wait conventions.
*/
final void internalWait(long timeout) {
! if ((int)STATUS.getAndBitwiseOr(this, SIGNAL) >= 0) {
synchronized (this) {
if (status >= 0)
try { wait(timeout); } catch (InterruptedException ie) { }
else
notifyAll();
*** 312,376 ****
/**
* Blocks a non-worker-thread until completion.
* @return status upon completion
*/
private int externalAwaitDone() {
! int s = ((this instanceof CountedCompleter) ? // try helping
! ForkJoinPool.common.externalHelpComplete(
! (CountedCompleter<?>)this, 0) :
! ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);
! if (s >= 0 && (s = status) >= 0) {
boolean interrupted = false;
- do {
- if (STATUS.compareAndSet(this, s, s | SIGNAL)) {
synchronized (this) {
! if (status >= 0) {
try {
wait(0L);
} catch (InterruptedException ie) {
interrupted = true;
}
}
! else
notifyAll();
}
}
- } while ((s = status) >= 0);
if (interrupted)
Thread.currentThread().interrupt();
}
return s;
}
/**
* Blocks a non-worker-thread until completion or interruption.
*/
private int externalInterruptibleAwaitDone() throws InterruptedException {
! int s;
! if (Thread.interrupted())
! throw new InterruptedException();
! if ((s = status) >= 0 &&
! (s = ((this instanceof CountedCompleter) ?
! ForkJoinPool.common.externalHelpComplete(
! (CountedCompleter<?>)this, 0) :
! ForkJoinPool.common.tryExternalUnpush(this) ? doExec() :
! 0)) >= 0) {
! while ((s = status) >= 0) {
! if (STATUS.compareAndSet(this, s, s | SIGNAL)) {
synchronized (this) {
! if (status >= 0)
wait(0L);
! else
notifyAll();
}
}
}
}
return s;
}
/**
* Implementation for join, get, quietlyJoin. Directly handles
* only cases of already-completed, external wait, and
* unfork+exec. Others are relayed to ForkJoinPool.awaitJoin.
*
* @return status upon completion
--- 318,389 ----
/**
* Blocks a non-worker-thread until completion.
* @return status upon completion
*/
private int externalAwaitDone() {
! int s = tryExternalHelp();
! if (s >= 0 && (s = (int)STATUS.getAndBitwiseOr(this, SIGNAL)) >= 0) {
boolean interrupted = false;
synchronized (this) {
! for (;;) {
! if ((s = status) >= 0) {
try {
wait(0L);
} catch (InterruptedException ie) {
interrupted = true;
}
}
! else {
notifyAll();
+ break;
+ }
}
}
if (interrupted)
Thread.currentThread().interrupt();
}
return s;
}
/**
* Blocks a non-worker-thread until completion or interruption.
*/
private int externalInterruptibleAwaitDone() throws InterruptedException {
! int s = tryExternalHelp();
! if (s >= 0 && (s = (int)STATUS.getAndBitwiseOr(this, SIGNAL)) >= 0) {
synchronized (this) {
! for (;;) {
! if ((s = status) >= 0)
wait(0L);
! else {
notifyAll();
+ break;
}
}
}
}
+ else if (Thread.interrupted())
+ throw new InterruptedException();
return s;
}
/**
+ * Tries to help with tasks allowed for external callers.
+ *
+ * @return current status
+ */
+ private int tryExternalHelp() {
+ int s;
+ return ((s = status) < 0 ? s:
+ (this instanceof CountedCompleter) ?
+ ForkJoinPool.common.externalHelpComplete(
+ (CountedCompleter<?>)this, 0) :
+ ForkJoinPool.common.tryExternalUnpush(this) ?
+ doExec() : 0);
+ }
+
+ /**
* Implementation for join, get, quietlyJoin. Directly handles
* only cases of already-completed, external wait, and
* unfork+exec. Others are relayed to ForkJoinPool.awaitJoin.
*
* @return status upon completion
*** 473,483 ****
break;
}
} finally {
lock.unlock();
}
! s = setCompletion(EXCEPTIONAL);
}
return s;
}
/**
--- 486,496 ----
break;
}
} finally {
lock.unlock();
}
! s = abnormalCompletion(DONE | ABNORMAL | THROWN);
}
return s;
}
/**
*** 485,495 ****
*
* @return status on exit
*/
private int setExceptionalCompletion(Throwable ex) {
int s = recordExceptionalCompletion(ex);
! if ((s & DONE_MASK) == EXCEPTIONAL)
internalPropagateException(ex);
return s;
}
/**
--- 498,508 ----
*
* @return status on exit
*/
private int setExceptionalCompletion(Throwable ex) {
int s = recordExceptionalCompletion(ex);
! if ((s & THROWN) != 0)
internalPropagateException(ex);
return s;
}
/**
*** 660,673 ****
/**
* Throws exception, if any, associated with the given status.
*/
private void reportException(int s) {
! if (s == CANCELLED)
! throw new CancellationException();
! if (s == EXCEPTIONAL)
! rethrow(getThrowableException());
}
// public methods
/**
--- 673,684 ----
/**
* Throws exception, if any, associated with the given status.
*/
private void reportException(int s) {
! rethrow((s & THROWN) != 0 ? getThrowableException() :
! new CancellationException());
}
// public methods
/**
*** 705,715 ****
*
* @return the computed result
*/
public final V join() {
int s;
! if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
/**
--- 716,726 ----
*
* @return the computed result
*/
public final V join() {
int s;
! if (((s = doJoin()) & ABNORMAL) != 0)
reportException(s);
return getRawResult();
}
/**
*** 720,730 ****
*
* @return the computed result
*/
public final V invoke() {
int s;
! if ((s = doInvoke() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
/**
--- 731,741 ----
*
* @return the computed result
*/
public final V invoke() {
int s;
! if (((s = doInvoke()) & ABNORMAL) != 0)
reportException(s);
return getRawResult();
}
/**
*** 745,757 ****
* @throws NullPointerException if any task is null
*/
public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
int s1, s2;
t2.fork();
! if ((s1 = t1.doInvoke() & DONE_MASK) != NORMAL)
t1.reportException(s1);
! if ((s2 = t2.doJoin() & DONE_MASK) != NORMAL)
t2.reportException(s2);
}
/**
* Forks the given tasks, returning when {@code isDone} holds for
--- 756,768 ----
* @throws NullPointerException if any task is null
*/
public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
int s1, s2;
t2.fork();
! if (((s1 = t1.doInvoke()) & ABNORMAL) != 0)
t1.reportException(s1);
! if (((s2 = t2.doJoin()) & ABNORMAL) != 0)
t2.reportException(s2);
}
/**
* Forks the given tasks, returning when {@code isDone} holds for
*** 777,795 ****
if (ex == null)
ex = new NullPointerException();
}
else if (i != 0)
t.fork();
! else if (t.doInvoke() < NORMAL && ex == null)
ex = t.getException();
}
for (int i = 1; i <= last; ++i) {
ForkJoinTask<?> t = tasks[i];
if (t != null) {
if (ex != null)
t.cancel(false);
! else if (t.doJoin() < NORMAL)
ex = t.getException();
}
}
if (ex != null)
rethrow(ex);
--- 788,806 ----
if (ex == null)
ex = new NullPointerException();
}
else if (i != 0)
t.fork();
! else if ((t.doInvoke() & ABNORMAL) != 0 && ex == null)
ex = t.getException();
}
for (int i = 1; i <= last; ++i) {
ForkJoinTask<?> t = tasks[i];
if (t != null) {
if (ex != null)
t.cancel(false);
! else if ((t.doJoin() & ABNORMAL) != 0)
ex = t.getException();
}
}
if (ex != null)
rethrow(ex);
*** 829,847 ****
if (ex == null)
ex = new NullPointerException();
}
else if (i != 0)
t.fork();
! else if (t.doInvoke() < NORMAL && ex == null)
ex = t.getException();
}
for (int i = 1; i <= last; ++i) {
ForkJoinTask<?> t = ts.get(i);
if (t != null) {
if (ex != null)
t.cancel(false);
! else if (t.doJoin() < NORMAL)
ex = t.getException();
}
}
if (ex != null)
rethrow(ex);
--- 840,858 ----
if (ex == null)
ex = new NullPointerException();
}
else if (i != 0)
t.fork();
! else if ((t.doInvoke() & ABNORMAL) != 0 && ex == null)
ex = t.getException();
}
for (int i = 1; i <= last; ++i) {
ForkJoinTask<?> t = ts.get(i);
if (t != null) {
if (ex != null)
t.cancel(false);
! else if ((t.doJoin() & ABNORMAL) != 0)
ex = t.getException();
}
}
if (ex != null)
rethrow(ex);
*** 874,925 ****
* control cancellation.
*
* @return {@code true} if this task is now cancelled
*/
public boolean cancel(boolean mayInterruptIfRunning) {
! return (setCompletion(CANCELLED) & DONE_MASK) == CANCELLED;
}
public final boolean isDone() {
return status < 0;
}
public final boolean isCancelled() {
! return (status & DONE_MASK) == CANCELLED;
}
/**
* Returns {@code true} if this task threw an exception or was cancelled.
*
* @return {@code true} if this task threw an exception or was cancelled
*/
public final boolean isCompletedAbnormally() {
! return status < NORMAL;
}
/**
* Returns {@code true} if this task completed without throwing an
* exception and was not cancelled.
*
* @return {@code true} if this task completed without throwing an
* exception and was not cancelled
*/
public final boolean isCompletedNormally() {
! return (status & DONE_MASK) == NORMAL;
}
/**
* Returns the exception thrown by the base computation, or a
* {@code CancellationException} if cancelled, or {@code null} if
* none or if the method has not yet completed.
*
* @return the exception, or {@code null} if none
*/
public final Throwable getException() {
! int s = status & DONE_MASK;
! return ((s >= NORMAL) ? null :
! (s == CANCELLED) ? new CancellationException() :
getThrowableException());
}
/**
* Completes this task abnormally, and if not already aborted or
--- 885,937 ----
* control cancellation.
*
* @return {@code true} if this task is now cancelled
*/
public boolean cancel(boolean mayInterruptIfRunning) {
! int s = abnormalCompletion(DONE | ABNORMAL);
! return (s & (ABNORMAL | THROWN)) == ABNORMAL;
}
public final boolean isDone() {
return status < 0;
}
public final boolean isCancelled() {
! return (status & (ABNORMAL | THROWN)) == ABNORMAL;
}
/**
* Returns {@code true} if this task threw an exception or was cancelled.
*
* @return {@code true} if this task threw an exception or was cancelled
*/
public final boolean isCompletedAbnormally() {
! return (status & ABNORMAL) != 0;
}
/**
* Returns {@code true} if this task completed without throwing an
* exception and was not cancelled.
*
* @return {@code true} if this task completed without throwing an
* exception and was not cancelled
*/
public final boolean isCompletedNormally() {
! return (status & (DONE | ABNORMAL)) == DONE;
}
/**
* Returns the exception thrown by the base computation, or a
* {@code CancellationException} if cancelled, or {@code null} if
* none or if the method has not yet completed.
*
* @return the exception, or {@code null} if none
*/
public final Throwable getException() {
! int s = status;
! return ((s & ABNORMAL) == 0 ? null :
! (s & THROWN) == 0 ? new CancellationException() :
getThrowableException());
}
/**
* Completes this task abnormally, and if not already aborted or
*** 959,969 ****
setRawResult(value);
} catch (Throwable rex) {
setExceptionalCompletion(rex);
return;
}
! setCompletion(NORMAL);
}
/**
* Completes this task normally without setting a value. The most
* recent value established by {@link #setRawResult} (or {@code
--- 971,981 ----
setRawResult(value);
} catch (Throwable rex) {
setExceptionalCompletion(rex);
return;
}
! setDone();
}
/**
* Completes this task normally without setting a value. The most
* recent value established by {@link #setRawResult} (or {@code
*** 971,981 ****
* invocations of {@code join} and related operations.
*
* @since 1.8
*/
public final void quietlyComplete() {
! setCompletion(NORMAL);
}
/**
* Waits if necessary for the computation to complete, and then
* retrieves its result.
--- 983,993 ----
* invocations of {@code join} and related operations.
*
* @since 1.8
*/
public final void quietlyComplete() {
! setDone();
}
/**
* Waits if necessary for the computation to complete, and then
* retrieves its result.
*** 988,1001 ****
* member of a ForkJoinPool and was interrupted while waiting
*/
public final V get() throws InterruptedException, ExecutionException {
int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
doJoin() : externalInterruptibleAwaitDone();
! if ((s &= DONE_MASK) == CANCELLED)
! throw new CancellationException();
! if (s == EXCEPTIONAL)
throw new ExecutionException(getThrowableException());
return getRawResult();
}
/**
* Waits if necessary for at most the given time for the computation
--- 1000,1014 ----
* member of a ForkJoinPool and was interrupted while waiting
*/
public final V get() throws InterruptedException, ExecutionException {
int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
doJoin() : externalInterruptibleAwaitDone();
! if ((s & THROWN) != 0)
throw new ExecutionException(getThrowableException());
+ else if ((s & ABNORMAL) != 0)
+ throw new CancellationException();
+ else
return getRawResult();
}
/**
* Waits if necessary for at most the given time for the computation
*** 1032,1042 ****
doExec() : 0)) >= 0) {
long ns, ms; // measure in nanosecs, but wait in millisecs
while ((s = status) >= 0 &&
(ns = deadline - System.nanoTime()) > 0L) {
if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) > 0L &&
! STATUS.compareAndSet(this, s, s | SIGNAL)) {
synchronized (this) {
if (status >= 0)
wait(ms); // OK to throw InterruptedException
else
notifyAll();
--- 1045,1055 ----
doExec() : 0)) >= 0) {
long ns, ms; // measure in nanosecs, but wait in millisecs
while ((s = status) >= 0 &&
(ns = deadline - System.nanoTime()) > 0L) {
if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) > 0L &&
! (s = (int)STATUS.getAndBitwiseOr(this, SIGNAL)) >= 0) {
synchronized (this) {
if (status >= 0)
wait(ms); // OK to throw InterruptedException
else
notifyAll();
*** 1044,1061 ****
}
}
}
}
if (s >= 0)
- s = status;
- if ((s &= DONE_MASK) != NORMAL) {
- if (s == CANCELLED)
- throw new CancellationException();
- if (s != EXCEPTIONAL)
throw new TimeoutException();
throw new ExecutionException(getThrowableException());
! }
return getRawResult();
}
/**
* Joins this task, without returning its result or throwing its
--- 1057,1072 ----
}
}
}
}
if (s >= 0)
throw new TimeoutException();
+ else if ((s & THROWN) != 0)
throw new ExecutionException(getThrowableException());
! else if ((s & ABNORMAL) != 0)
! throw new CancellationException();
! else
return getRawResult();
}
/**
* Joins this task, without returning its result or throwing its
*** 1108,1118 ****
* null}. However, the value returned by {@code getRawResult} is
* unaffected. To clear this value, you can invoke {@code
* setRawResult(null)}.
*/
public void reinitialize() {
! if ((status & DONE_MASK) == EXCEPTIONAL)
clearExceptionalCompletion();
else
status = 0;
}
--- 1119,1129 ----
* null}. However, the value returned by {@code getRawResult} is
* unaffected. To clear this value, you can invoke {@code
* setRawResult(null)}.
*/
public void reinitialize() {
! if ((status & THROWN) != 0)
clearExceptionalCompletion();
else
status = 0;
}
*** 1325,1335 ****
* @return the previous value of the tag
* @since 1.8
*/
public final short setForkJoinTaskTag(short newValue) {
for (int s;;) {
! if (STATUS.compareAndSet(this, s = status,
(s & ~SMASK) | (newValue & SMASK)))
return (short)s;
}
}
--- 1336,1346 ----
* @return the previous value of the tag
* @since 1.8
*/
public final short setForkJoinTaskTag(short newValue) {
for (int s;;) {
! if (STATUS.weakCompareAndSet(this, s = status,
(s & ~SMASK) | (newValue & SMASK)))
return (short)s;
}
}
*** 1349,1359 ****
*/
public final boolean compareAndSetForkJoinTaskTag(short expect, short update) {
for (int s;;) {
if ((short)(s = status) != expect)
return false;
! if (STATUS.compareAndSet(this, s,
(s & ~SMASK) | (update & SMASK)))
return true;
}
}
--- 1360,1370 ----
*/
public final boolean compareAndSetForkJoinTaskTag(short expect, short update) {
for (int s;;) {
if ((short)(s = status) != expect)
return false;
! if (STATUS.weakCompareAndSet(this, s,
(s & ~SMASK) | (update & SMASK)))
return true;
}
}
< prev index next >