--- old/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java 2021-02-09 06:27:03.831693002 -0800 +++ new/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java 2021-02-09 06:27:03.447695953 -0800 @@ -276,7 +276,7 @@ return (int)STATUS.getAndBitwiseOr(this, v); } private boolean casStatus(int c, int v) { - return STATUS.weakCompareAndSet(this, c, v); + return STATUS.compareAndSet(this, c, v); } private boolean casAux(Aux c, Aux v) { return AUX.compareAndSet(this, c, v); @@ -296,84 +296,6 @@ } /** - * Possibly blocks until task is done or interrupted or timed out. - * - * @param interruptible true if wait can be cancelled by interrupt - * @param deadline if non-zero use timed waits and possibly timeout - * @param pool if nonnull pool to uncompensate after unblocking - * @return status on exit, or ABNORMAL if interrupted while waiting - */ - private int awaitDone(boolean interruptible, long deadline, - ForkJoinPool pool) { - int s; - boolean interrupted = false, queued = false, parked = false; - Aux node = null; - while ((s = status) >= 0) { - Aux a; long ns; - if (parked && Thread.interrupted()) { - if (interruptible) { - s = ABNORMAL; - break; - } - interrupted = true; - } - else if (queued) { - if (deadline != 0L) { - if ((ns = deadline - System.nanoTime()) <= 0L) - break; - LockSupport.parkNanos(ns); - } - else - LockSupport.park(); - parked = true; - } - else if (node != null) { - if ((a = aux) != null && a.ex != null) - Thread.onSpinWait(); // exception in progress - else if (queued = casAux(node.next = a, node)) - LockSupport.setCurrentBlocker(this); - } - else { - try { - node = new Aux(Thread.currentThread(), null); - } catch (Throwable ex) { // try to cancel if cannot create - casStatus(s, s | (DONE | ABNORMAL)); - } - } - } - if (pool != null) - pool.uncompensate(); - - if (queued) { - LockSupport.setCurrentBlocker(null); - if (s >= 0) { // cancellation similar to AbstractQueuedSynchronizer - outer: for (Aux a; (a = aux) != null && a.ex == null; ) { - for (Aux trail = null;;) { - Aux next = a.next; - if (a == node) { - if (trail != null) - trail.casNext(trail, next); - else if (casAux(a, next)) - break outer; // cannot be re-encountered - break; // restart - } else { - trail = a; - if ((a = next) == null) - break outer; - } - } - } - } - else { - signalWaiters(); // help clean or signal - if (interrupted) - Thread.currentThread().interrupt(); - } - } - return s; - } - - /** * Sets DONE status and wakes up threads waiting to join this task. * @return status on exit */ @@ -463,27 +385,36 @@ * Helps and/or waits for completion from join, get, or invoke; * called from either internal or external threads. * + * @param pool if nonnull, known submitted pool, else assumes current pool * @param ran true if task known to have been exec'd * @param interruptible true if park interruptibly when external * @param timed true if use timed wait * @param nanos if timed, timeout value * @return ABNORMAL if interrupted, else status on exit */ - private int awaitJoin(boolean ran, boolean interruptible, boolean timed, + private int awaitDone(ForkJoinPool pool, boolean ran, + boolean interruptible, boolean timed, long nanos) { - boolean internal; ForkJoinPool p; ForkJoinPool.WorkQueue q; int s; - Thread t; ForkJoinWorkerThread wt; - if (internal = ((t = Thread.currentThread()) - instanceof ForkJoinWorkerThread)) { - p = (wt = (ForkJoinWorkerThread)t).pool; - q = wt.workQueue; + ForkJoinPool p; boolean internal; int s; Thread t; + ForkJoinPool.WorkQueue q = null; + if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) { + ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t; + p = wt.pool; + if (pool == null) + pool = p; + if (internal = (pool == p)) + q = wt.workQueue; } else { + internal = false; p = ForkJoinPool.common; - q = ForkJoinPool.commonQueue(); - if (interruptible && Thread.interrupted()) - return ABNORMAL; + if (pool == null) + pool = p; + if (pool == p && p != null) + q = p.externalQueue(); } + if (interruptible && Thread.interrupted()) + return ABNORMAL; if ((s = status) < 0) return s; long deadline = 0L; @@ -493,22 +424,94 @@ else if ((deadline = nanos + System.nanoTime()) == 0L) deadline = 1L; } - ForkJoinPool uncompensate = null; - if (q != null && p != null) { // try helping - if ((!timed || p.isSaturated()) && - ((this instanceof CountedCompleter) ? - (s = p.helpComplete(this, q, internal)) < 0 : - (q.tryRemove(this, internal) && (s = doExec()) < 0))) - return s; + boolean uncompensate = false; + if (q != null && p != null) { // try helping + // help even in timed mode if pool has no parallelism + boolean canHelp = !timed || (p.mode & SMASK) == 0; + if (canHelp) { + if ((this instanceof CountedCompleter) && + (s = p.helpComplete(this, q, internal)) < 0) + return s; + if (!ran && ((!internal && q.externalTryUnpush(this)) || + q.tryRemove(this, internal)) && (s = doExec()) < 0) + return s; + } if (internal) { - if ((s = p.helpJoin(this, q)) < 0) + if ((s = p.helpJoin(this, q, canHelp)) < 0) return s; if (s == UNCOMPENSATE) - uncompensate = p; - interruptible = false; + uncompensate = true; } } - return awaitDone(interruptible, deadline, uncompensate); + // block until done or cancelled wait + boolean interrupted = false, queued = false; + boolean parked = false, fail = false; + Aux node = null; + while ((s = status) >= 0) { + Aux a; long ns; + if (fail || (fail = (pool != null && pool.mode < 0))) + casStatus(s, s | (DONE | ABNORMAL)); // try to cancel + else if (parked && Thread.interrupted()) { + if (interruptible) { + s = ABNORMAL; + break; + } + interrupted = true; + } + else if (queued) { + if (deadline != 0L) { + if ((ns = deadline - System.nanoTime()) <= 0L) + break; + LockSupport.parkNanos(ns); + } + else + LockSupport.park(); + parked = true; + } + else if (node != null) { + if ((a = aux) != null && a.ex != null) + Thread.onSpinWait(); // exception in progress + else if (queued = casAux(node.next = a, node)) + LockSupport.setCurrentBlocker(this); + } + else { + try { + node = new Aux(Thread.currentThread(), null); + } catch (Throwable ex) { // cannot create + fail = true; + } + } + } + if (pool != null && uncompensate) + pool.uncompensate(); + + if (queued) { + LockSupport.setCurrentBlocker(null); + if (s >= 0) { // cancellation similar to AbstractQueuedSynchronizer + outer: for (Aux a; (a = aux) != null && a.ex == null; ) { + for (Aux trail = null;;) { + Aux next = a.next; + if (a == node) { + if (trail != null) + trail.casNext(trail, next); + else if (casAux(a, next)) + break outer; // cannot be re-encountered + break; // restart + } else { + trail = a; + if ((a = next) == null) + break outer; + } + } + } + } + else { + signalWaiters(); // help clean or signal + if (interrupted) + Thread.currentThread().interrupt(); + } + } + return s; } /** @@ -664,7 +667,7 @@ public final V join() { int s; if ((s = status) >= 0) - s = awaitJoin(false, false, false, 0L); + s = awaitDone(null, false, false, false, 0L); if ((s & ABNORMAL) != 0) reportException(s); return getRawResult(); @@ -681,7 +684,7 @@ public final V invoke() { int s; if ((s = doExec()) >= 0) - s = awaitJoin(true, false, false, 0L); + s = awaitDone(null, true, false, false, 0L); if ((s & ABNORMAL) != 0) reportException(s); return getRawResult(); @@ -710,12 +713,12 @@ throw new NullPointerException(); t2.fork(); if ((s1 = t1.doExec()) >= 0) - s1 = t1.awaitJoin(true, false, false, 0L); + s1 = t1.awaitDone(null, true, false, false, 0L); if ((s1 & ABNORMAL) != 0) { cancelIgnoringExceptions(t2); t1.reportException(s1); } - else if (((s2 = t2.awaitJoin(false, false, false, 0L)) & ABNORMAL) != 0) + else if (((s2 = t2.awaitDone(null, false, false, false, 0L)) & ABNORMAL) != 0) t2.reportException(s2); } @@ -746,7 +749,7 @@ if (i == 0) { int s; if ((s = t.doExec()) >= 0) - s = t.awaitJoin(true, false, false, 0L); + s = t.awaitDone(null, true, false, false, 0L); if ((s & ABNORMAL) != 0) ex = t.getException(s); break; @@ -759,7 +762,7 @@ if ((t = tasks[i]) != null) { int s; if ((s = t.status) >= 0) - s = t.awaitJoin(false, false, false, 0L); + s = t.awaitDone(null, false, false, false, 0L); if ((s & ABNORMAL) != 0 && (ex = t.getException(s)) != null) break; } @@ -809,7 +812,7 @@ if (i == 0) { int s; if ((s = t.doExec()) >= 0) - s = t.awaitJoin(true, false, false, 0L); + s = t.awaitDone(null, true, false, false, 0L); if ((s & ABNORMAL) != 0) ex = t.getException(s); break; @@ -822,7 +825,7 @@ if ((t = ts.get(i)) != null) { int s; if ((s = t.status) >= 0) - s = t.awaitJoin(false, false, false, 0L); + s = t.awaitDone(null, false, false, false, 0L); if ((s & ABNORMAL) != 0 && (ex = t.getException(s)) != null) break; } @@ -973,8 +976,8 @@ * member of a ForkJoinPool and was interrupted while waiting */ public final V get() throws InterruptedException, ExecutionException { - int s; - if (((s = awaitJoin(false, true, false, 0L)) & ABNORMAL) != 0) + int s = awaitDone(null, false, true, false, 0L); + if ((s & ABNORMAL) != 0) reportExecutionException(s); return getRawResult(); } @@ -995,9 +998,9 @@ */ public final V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - int s; - if ((s = awaitJoin(false, true, true, unit.toNanos(timeout))) >= 0 || - (s & ABNORMAL) != 0) + long nanos = unit.toNanos(timeout); + int s = awaitDone(null, false, true, true, nanos); + if (s >= 0 || (s & ABNORMAL) != 0) reportExecutionException(s); return getRawResult(); } @@ -1010,9 +1013,10 @@ */ public final void quietlyJoin() { if (status >= 0) - awaitJoin(false, false, false, 0L); + awaitDone(null, false, false, false, 0L); } + /** * Commences performing this task and awaits its completion if * necessary, without returning its result or throwing its @@ -1020,7 +1024,37 @@ */ public final void quietlyInvoke() { if (doExec() >= 0) - awaitJoin(true, false, false, 0L); + awaitDone(null, true, false, false, 0L); + } + + // Versions of join/get for pool.invoke* methods that use external, + // possibly-non-commonPool submits + + final void awaitPoolInvoke(ForkJoinPool pool) { + awaitDone(pool, false, false, false, 0L); + } + final void awaitPoolInvoke(ForkJoinPool pool, long nanos) { + awaitDone(pool, false, true, true, nanos); + } + final V joinForPoolInvoke(ForkJoinPool pool) { + int s = awaitDone(pool, false, false, false, 0L); + if ((s & ABNORMAL) != 0) + reportException(s); + return getRawResult(); + } + final V getForPoolInvoke(ForkJoinPool pool) + throws InterruptedException, ExecutionException { + int s = awaitDone(pool, false, true, false, 0L); + if ((s & ABNORMAL) != 0) + reportExecutionException(s); + return getRawResult(); + } + final V getForPoolInvoke(ForkJoinPool pool, long nanos) + throws InterruptedException, ExecutionException, TimeoutException { + int s = awaitDone(pool, false, true, true, nanos); + if (s >= 0 || (s & ABNORMAL) != 0) + reportExecutionException(s); + return getRawResult(); } /**