< prev index next >
src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java
Print this page
8259800: timeout in tck test testForkJoin(ForkJoinPool8Test)
Reviewed-by: martin
@@ -274,11 +274,11 @@
private static final VarHandle AUX;
private int getAndBitwiseOrStatus(int v) {
return (int)STATUS.getAndBitwiseOr(this, v);
}
private boolean casStatus(int c, int v) {
- return STATUS.weakCompareAndSet(this, c, v);
+ return STATUS.compareAndSet(this, c, v);
}
private boolean casAux(Aux c, Aux v) {
return AUX.compareAndSet(this, c, v);
}
@@ -294,88 +294,10 @@
}
}
}
/**
- * Possibly blocks until task is done or interrupted or timed out.
- *
- * @param interruptible true if wait can be cancelled by interrupt
- * @param deadline if non-zero use timed waits and possibly timeout
- * @param pool if nonnull pool to uncompensate after unblocking
- * @return status on exit, or ABNORMAL if interrupted while waiting
- */
- private int awaitDone(boolean interruptible, long deadline,
- ForkJoinPool pool) {
- int s;
- boolean interrupted = false, queued = false, parked = false;
- Aux node = null;
- while ((s = status) >= 0) {
- Aux a; long ns;
- if (parked && Thread.interrupted()) {
- if (interruptible) {
- s = ABNORMAL;
- break;
- }
- interrupted = true;
- }
- else if (queued) {
- if (deadline != 0L) {
- if ((ns = deadline - System.nanoTime()) <= 0L)
- break;
- LockSupport.parkNanos(ns);
- }
- else
- LockSupport.park();
- parked = true;
- }
- else if (node != null) {
- if ((a = aux) != null && a.ex != null)
- Thread.onSpinWait(); // exception in progress
- else if (queued = casAux(node.next = a, node))
- LockSupport.setCurrentBlocker(this);
- }
- else {
- try {
- node = new Aux(Thread.currentThread(), null);
- } catch (Throwable ex) { // try to cancel if cannot create
- casStatus(s, s | (DONE | ABNORMAL));
- }
- }
- }
- if (pool != null)
- pool.uncompensate();
-
- if (queued) {
- LockSupport.setCurrentBlocker(null);
- if (s >= 0) { // cancellation similar to AbstractQueuedSynchronizer
- outer: for (Aux a; (a = aux) != null && a.ex == null; ) {
- for (Aux trail = null;;) {
- Aux next = a.next;
- if (a == node) {
- if (trail != null)
- trail.casNext(trail, next);
- else if (casAux(a, next))
- break outer; // cannot be re-encountered
- break; // restart
- } else {
- trail = a;
- if ((a = next) == null)
- break outer;
- }
- }
- }
- }
- else {
- signalWaiters(); // help clean or signal
- if (interrupted)
- Thread.currentThread().interrupt();
- }
- }
- return s;
- }
-
- /**
* Sets DONE status and wakes up threads waiting to join this task.
* @return status on exit
*/
private int setDone() {
int s = getAndBitwiseOrStatus(DONE) | DONE;
@@ -461,56 +383,137 @@
/**
* Helps and/or waits for completion from join, get, or invoke;
* called from either internal or external threads.
*
+ * @param pool if nonnull, known submitted pool, else assumes current pool
* @param ran true if task known to have been exec'd
* @param interruptible true if park interruptibly when external
* @param timed true if use timed wait
* @param nanos if timed, timeout value
* @return ABNORMAL if interrupted, else status on exit
*/
- private int awaitJoin(boolean ran, boolean interruptible, boolean timed,
+ private int awaitDone(ForkJoinPool pool, boolean ran,
+ boolean interruptible, boolean timed,
long nanos) {
- boolean internal; ForkJoinPool p; ForkJoinPool.WorkQueue q; int s;
- Thread t; ForkJoinWorkerThread wt;
- if (internal = ((t = Thread.currentThread())
- instanceof ForkJoinWorkerThread)) {
- p = (wt = (ForkJoinWorkerThread)t).pool;
+ ForkJoinPool p; boolean internal; int s; Thread t;
+ ForkJoinPool.WorkQueue q = null;
+ if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
+ ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
+ p = wt.pool;
+ if (pool == null)
+ pool = p;
+ if (internal = (pool == p))
q = wt.workQueue;
}
else {
+ internal = false;
p = ForkJoinPool.common;
- q = ForkJoinPool.commonQueue();
+ if (pool == null)
+ pool = p;
+ if (pool == p && p != null)
+ q = p.externalQueue();
+ }
if (interruptible && Thread.interrupted())
return ABNORMAL;
- }
if ((s = status) < 0)
return s;
long deadline = 0L;
if (timed) {
if (nanos <= 0L)
return 0;
else if ((deadline = nanos + System.nanoTime()) == 0L)
deadline = 1L;
}
- ForkJoinPool uncompensate = null;
+ boolean uncompensate = false;
if (q != null && p != null) { // try helping
- if ((!timed || p.isSaturated()) &&
- ((this instanceof CountedCompleter) ?
- (s = p.helpComplete(this, q, internal)) < 0 :
- (q.tryRemove(this, internal) && (s = doExec()) < 0)))
+ // help even in timed mode if pool has no parallelism
+ boolean canHelp = !timed || (p.mode & SMASK) == 0;
+ if (canHelp) {
+ if ((this instanceof CountedCompleter) &&
+ (s = p.helpComplete(this, q, internal)) < 0)
+ return s;
+ if (!ran && ((!internal && q.externalTryUnpush(this)) ||
+ q.tryRemove(this, internal)) && (s = doExec()) < 0)
return s;
+ }
if (internal) {
- if ((s = p.helpJoin(this, q)) < 0)
+ if ((s = p.helpJoin(this, q, canHelp)) < 0)
return s;
if (s == UNCOMPENSATE)
- uncompensate = p;
- interruptible = false;
+ uncompensate = true;
+ }
+ }
+ // block until done or cancelled wait
+ boolean interrupted = false, queued = false;
+ boolean parked = false, fail = false;
+ Aux node = null;
+ while ((s = status) >= 0) {
+ Aux a; long ns;
+ if (fail || (fail = (pool != null && pool.mode < 0)))
+ casStatus(s, s | (DONE | ABNORMAL)); // try to cancel
+ else if (parked && Thread.interrupted()) {
+ if (interruptible) {
+ s = ABNORMAL;
+ break;
+ }
+ interrupted = true;
+ }
+ else if (queued) {
+ if (deadline != 0L) {
+ if ((ns = deadline - System.nanoTime()) <= 0L)
+ break;
+ LockSupport.parkNanos(ns);
+ }
+ else
+ LockSupport.park();
+ parked = true;
+ }
+ else if (node != null) {
+ if ((a = aux) != null && a.ex != null)
+ Thread.onSpinWait(); // exception in progress
+ else if (queued = casAux(node.next = a, node))
+ LockSupport.setCurrentBlocker(this);
+ }
+ else {
+ try {
+ node = new Aux(Thread.currentThread(), null);
+ } catch (Throwable ex) { // cannot create
+ fail = true;
}
}
- return awaitDone(interruptible, deadline, uncompensate);
+ }
+ if (pool != null && uncompensate)
+ pool.uncompensate();
+
+ if (queued) {
+ LockSupport.setCurrentBlocker(null);
+ if (s >= 0) { // cancellation similar to AbstractQueuedSynchronizer
+ outer: for (Aux a; (a = aux) != null && a.ex == null; ) {
+ for (Aux trail = null;;) {
+ Aux next = a.next;
+ if (a == node) {
+ if (trail != null)
+ trail.casNext(trail, next);
+ else if (casAux(a, next))
+ break outer; // cannot be re-encountered
+ break; // restart
+ } else {
+ trail = a;
+ if ((a = next) == null)
+ break outer;
+ }
+ }
+ }
+ }
+ else {
+ signalWaiters(); // help clean or signal
+ if (interrupted)
+ Thread.currentThread().interrupt();
+ }
+ }
+ return s;
}
/**
* Cancels, ignoring any exceptions thrown by cancel. Cancel is
* spec'ed not to throw any exceptions, but if it does anyway, we
@@ -662,11 +665,11 @@
* @return the computed result
*/
public final V join() {
int s;
if ((s = status) >= 0)
- s = awaitJoin(false, false, false, 0L);
+ s = awaitDone(null, false, false, false, 0L);
if ((s & ABNORMAL) != 0)
reportException(s);
return getRawResult();
}
@@ -679,11 +682,11 @@
* @return the computed result
*/
public final V invoke() {
int s;
if ((s = doExec()) >= 0)
- s = awaitJoin(true, false, false, 0L);
+ s = awaitDone(null, true, false, false, 0L);
if ((s & ABNORMAL) != 0)
reportException(s);
return getRawResult();
}
@@ -708,16 +711,16 @@
int s1, s2;
if (t1 == null || t2 == null)
throw new NullPointerException();
t2.fork();
if ((s1 = t1.doExec()) >= 0)
- s1 = t1.awaitJoin(true, false, false, 0L);
+ s1 = t1.awaitDone(null, true, false, false, 0L);
if ((s1 & ABNORMAL) != 0) {
cancelIgnoringExceptions(t2);
t1.reportException(s1);
}
- else if (((s2 = t2.awaitJoin(false, false, false, 0L)) & ABNORMAL) != 0)
+ else if (((s2 = t2.awaitDone(null, false, false, false, 0L)) & ABNORMAL) != 0)
t2.reportException(s2);
}
/**
* Forks the given tasks, returning when {@code isDone} holds for
@@ -744,11 +747,11 @@
break;
}
if (i == 0) {
int s;
if ((s = t.doExec()) >= 0)
- s = t.awaitJoin(true, false, false, 0L);
+ s = t.awaitDone(null, true, false, false, 0L);
if ((s & ABNORMAL) != 0)
ex = t.getException(s);
break;
}
t.fork();
@@ -757,11 +760,11 @@
for (int i = 1; i <= last; ++i) {
ForkJoinTask<?> t;
if ((t = tasks[i]) != null) {
int s;
if ((s = t.status) >= 0)
- s = t.awaitJoin(false, false, false, 0L);
+ s = t.awaitDone(null, false, false, false, 0L);
if ((s & ABNORMAL) != 0 && (ex = t.getException(s)) != null)
break;
}
}
}
@@ -807,11 +810,11 @@
break;
}
if (i == 0) {
int s;
if ((s = t.doExec()) >= 0)
- s = t.awaitJoin(true, false, false, 0L);
+ s = t.awaitDone(null, true, false, false, 0L);
if ((s & ABNORMAL) != 0)
ex = t.getException(s);
break;
}
t.fork();
@@ -820,11 +823,11 @@
for (int i = 1; i <= last; ++i) {
ForkJoinTask<?> t;
if ((t = ts.get(i)) != null) {
int s;
if ((s = t.status) >= 0)
- s = t.awaitJoin(false, false, false, 0L);
+ s = t.awaitDone(null, false, false, false, 0L);
if ((s & ABNORMAL) != 0 && (ex = t.getException(s)) != null)
break;
}
}
}
@@ -971,12 +974,12 @@
* exception
* @throws InterruptedException if the current thread is not a
* member of a ForkJoinPool and was interrupted while waiting
*/
public final V get() throws InterruptedException, ExecutionException {
- int s;
- if (((s = awaitJoin(false, true, false, 0L)) & ABNORMAL) != 0)
+ int s = awaitDone(null, false, true, false, 0L);
+ if ((s & ABNORMAL) != 0)
reportExecutionException(s);
return getRawResult();
}
/**
@@ -993,13 +996,13 @@
* member of a ForkJoinPool and was interrupted while waiting
* @throws TimeoutException if the wait timed out
*/
public final V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
- int s;
- if ((s = awaitJoin(false, true, true, unit.toNanos(timeout))) >= 0 ||
- (s & ABNORMAL) != 0)
+ long nanos = unit.toNanos(timeout);
+ int s = awaitDone(null, false, true, true, nanos);
+ if (s >= 0 || (s & ABNORMAL) != 0)
reportExecutionException(s);
return getRawResult();
}
/**
@@ -1008,21 +1011,52 @@
* collections of tasks when some have been cancelled or otherwise
* known to have aborted.
*/
public final void quietlyJoin() {
if (status >= 0)
- awaitJoin(false, false, false, 0L);
+ awaitDone(null, false, false, false, 0L);
}
+
/**
* Commences performing this task and awaits its completion if
* necessary, without returning its result or throwing its
* exception.
*/
public final void quietlyInvoke() {
if (doExec() >= 0)
- awaitJoin(true, false, false, 0L);
+ awaitDone(null, true, false, false, 0L);
+ }
+
+ // Versions of join/get for pool.invoke* methods that use external,
+ // possibly-non-commonPool submits
+
+ final void awaitPoolInvoke(ForkJoinPool pool) {
+ awaitDone(pool, false, false, false, 0L);
+ }
+ final void awaitPoolInvoke(ForkJoinPool pool, long nanos) {
+ awaitDone(pool, false, true, true, nanos);
+ }
+ final V joinForPoolInvoke(ForkJoinPool pool) {
+ int s = awaitDone(pool, false, false, false, 0L);
+ if ((s & ABNORMAL) != 0)
+ reportException(s);
+ return getRawResult();
+ }
+ final V getForPoolInvoke(ForkJoinPool pool)
+ throws InterruptedException, ExecutionException {
+ int s = awaitDone(pool, false, true, false, 0L);
+ if ((s & ABNORMAL) != 0)
+ reportExecutionException(s);
+ return getRawResult();
+ }
+ final V getForPoolInvoke(ForkJoinPool pool, long nanos)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ int s = awaitDone(pool, false, true, true, nanos);
+ if (s >= 0 || (s & ABNORMAL) != 0)
+ reportExecutionException(s);
+ return getRawResult();
}
/**
* Possibly executes tasks until the pool hosting the current task
* {@linkplain ForkJoinPool#isQuiescent is quiescent}. This
< prev index next >