diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java --- a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java +++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java @@ -1046,13 +1046,22 @@ */ final boolean externalTryUnpush(ForkJoinTask task) { boolean taken = false; - int s = top, cap, k; ForkJoinTask[] a; - if ((a = array) != null && (cap = a.length) > 0 && - a[k = (cap - 1) & (s - 1)] == task && tryLock()) { - if (top == s && array == a && - (taken = casSlotToNull(a, k, task))) - top = s - 1; - source = 0; // release lock + for (;;) { + int s = top, cap, k; ForkJoinTask[] a; + if ((a = array) == null || (cap = a.length) <= 0 || + a[k = (cap - 1) & (s - 1)] != task) + break; + if (tryLock()) { + if (top == s && array == a) { + if (taken = casSlotToNull(a, k, task)) { + top = s - 1; + source = 0; + break; + } + } + source = 0; // release lock for retry + } + Thread.yield(); // trylock failure } return taken; } @@ -1194,15 +1203,16 @@ top = s; source = 0; } + if (taken) + t.doExec(); + else if (!owned) + Thread.yield(); // tryLock failure break; } else if ((f = f.completer) == null) break; } - if (!taken) - break; - t.doExec(); - if (limit != 0 && --limit == 0) + if (taken && limit != 0 && --limit == 0) break; } return status; @@ -1586,7 +1596,7 @@ * @param w caller's WorkQueue (may be null on failed initialization) */ final void runWorker(WorkQueue w) { - if (w != null) { // skip on failed init + if (mode >= 0 && w != null) { // skip on failed init w.config |= SRC; // mark as valid source int r = w.stackPred, src = 0; // use seed from registerWorker do { @@ -1711,22 +1721,6 @@ // Utilities used by ForkJoinTask /** - * Returns true if all workers are busy, possibly creating one if allowed - */ - final boolean isSaturated() { - int maxTotal = bounds >>> SWIDTH; - for (long c;;) { - if (((int)(c = ctl) & ~UNSIGNALLED) != 0) - return false; - if ((short)(c >>> TC_SHIFT) >= maxTotal) - return true; - long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK); - if (compareAndSetCtl(c, nc)) - return !createWorker(); - } - } - - /** * Returns true if can start terminating if enabled, or already terminated */ final boolean canStop() { @@ -1765,13 +1759,16 @@ */ private int tryCompensate(long c) { Predicate sat; - int b = bounds; // counts are signed; centered at parallelism level == 0 + int md = mode, b = bounds; + // counts are signed; centered at parallelism level == 0 int minActive = (short)(b & SMASK), maxTotal = b >>> SWIDTH, active = (int)(c >> RC_SHIFT), total = (short)(c >>> TC_SHIFT), sp = (int)c & ~UNSIGNALLED; - if (total >= 0) { + if ((md & SMASK) == 0) + return 0; // cannot compensate if parallelism zero + else if (total >= 0) { if (sp != 0) { // activate idle worker WorkQueue[] qs; int n; WorkQueue v; if ((qs = queues) != null && (n = qs.length) > 0 && @@ -1819,9 +1816,10 @@ * * @param task the task * @param w caller's WorkQueue + * @param canHelp if false, compensate only * @return task status on exit, or UNCOMPENSATE for compensated blocking */ - final int helpJoin(ForkJoinTask task, WorkQueue w) { + final int helpJoin(ForkJoinTask task, WorkQueue w, boolean canHelp) { int s = 0; if (task != null && w != null) { int wsrc = w.source, wid = w.config & SMASK, r = wid + 2; @@ -1836,7 +1834,7 @@ else if (c == (c = ctl) && (s = tryCompensate(c)) >= 0) break; // block } - else { // scan for subtasks + else if (canHelp) { // scan for subtasks WorkQueue[] qs = queues; int n = (qs == null) ? 0 : qs.length, m = n - 1; for (int i = n; i > 0; i -= 2, r += 2) { @@ -2195,6 +2193,16 @@ } /** + * Returns queue for an external thread, if one exists + */ + final WorkQueue externalQueue() { + WorkQueue[] qs; + int r = ThreadLocalRandom.getProbe(), n; + return ((qs = queues) != null && (n = qs.length) > 0 && r != 0) ? + qs[(n - 1) & (r << 1)] : null; + } + + /** * If the given executor is a ForkJoinPool, poll and execute * AsynchronousCompletionTasks from worker's queue until none are * available or blocker is released. @@ -2205,8 +2213,8 @@ if ((wt = (ForkJoinWorkerThread)t).pool == e) w = wt.workQueue; } - else if (e == common) - w = commonQueue(); + else if (e instanceof ForkJoinPool) + w = ((ForkJoinPool)e).externalQueue(); if (w != null) w.helpAsyncBlocker(blocker); } @@ -2292,14 +2300,18 @@ return false; md = getAndBitwiseOrMode(STOP); } - for (int k = 0; k < 2; ++k) { // twice in case of lagging qs updates - for (ForkJoinTask t; (t = pollScan(false)) != null; ) + for (boolean rescan = true;;) { // repeat until no changes + boolean changed = false; + for (ForkJoinTask t; (t = pollScan(false)) != null; ) { + changed = true; ForkJoinTask.cancelIgnoringExceptions(t); // help cancel + } WorkQueue[] qs; int n; WorkQueue q; Thread thread; if ((qs = queues) != null && (n = qs.length) > 0) { for (int j = 1; j < n; j += 2) { // unblock other workers if ((q = qs[j]) != null && (thread = q.owner) != null && !thread.isInterrupted()) { + changed = true; try { thread.interrupt(); } catch (Throwable ignore) { @@ -2317,6 +2329,12 @@ cond.signalAll(); lock.unlock(); } + if (changed) + rescan = true; + else if (rescan) + rescan = false; + else + break; } return true; } @@ -2540,6 +2558,8 @@ } catch (Exception ignore) { } int p = this.mode = Math.min(Math.max(parallelism, 0), MAX_CAP); + int maxSpares = (p == 0) ? 0 : COMMON_MAX_SPARES; + int bnds = ((1 - p) & SMASK) | (maxSpares << SWIDTH); int size = 1 << (33 - Integer.numberOfLeadingZeros(p > 0 ? p - 1 : 1)); this.factory = (fac != null) ? fac : new DefaultCommonPoolForkJoinWorkerThreadFactory(); @@ -2547,7 +2567,7 @@ this.keepAlive = DEFAULT_KEEPALIVE; this.saturate = null; this.workerNamePrefix = null; - this.bounds = ((1 - p) & SMASK) | (COMMON_MAX_SPARES << SWIDTH); + this.bounds = bnds; this.ctl = ((((long)(-p) << TC_SHIFT) & TC_MASK) | (((long)(-p) << RC_SHIFT) & RC_MASK)); this.queues = new WorkQueue[size]; @@ -2593,7 +2613,7 @@ */ public T invoke(ForkJoinTask task) { externalSubmit(task); - return task.join(); + return task.joinForPoolInvoke(this); } /** @@ -2685,7 +2705,7 @@ externalSubmit(f); } for (int i = futures.size() - 1; i >= 0; --i) - ((ForkJoinTask)futures.get(i)).quietlyJoin(); + ((ForkJoinTask)futures.get(i)).awaitPoolInvoke(this); return futures; } catch (Throwable t) { for (Future e : futures) @@ -2715,11 +2735,7 @@ if (timedOut) ForkJoinTask.cancelIgnoringExceptions(f); else { - try { - f.get(ns, TimeUnit.NANOSECONDS); - } catch (CancellationException | TimeoutException | - ExecutionException ok) { - } + ((ForkJoinTask)f).awaitPoolInvoke(this, ns); if ((ns = nanos - (System.nanoTime() - startTime)) < 0L) timedOut = true; } @@ -2746,11 +2762,16 @@ } final void tryComplete(Callable c) { // called by InvokeAnyTasks Throwable ex = null; - boolean failed = (c == null || isCancelled() || - (pool != null && pool.mode < 0)); - if (!failed && !isDone()) { + boolean failed; + if (c == null || Thread.interrupted() || + (pool != null && pool.mode < 0)) + failed = true; + else if (isDone()) + failed = false; + else { try { complete(c.call()); + failed = false; } catch (Throwable tx) { ex = tx; failed = true; @@ -2817,7 +2838,7 @@ if (root.isDone()) break; } - return root.get(); + return root.getForPoolInvoke(this); } finally { for (InvokeAnyTask f : fs) ForkJoinTask.cancelIgnoringExceptions(f); @@ -2844,7 +2865,7 @@ if (root.isDone()) break; } - return root.get(nanos, TimeUnit.NANOSECONDS); + return root.getForPoolInvoke(this, nanos); } finally { for (InvokeAnyTask f : fs) ForkJoinTask.cancelIgnoringExceptions(f); diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java --- a/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java +++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java @@ -276,7 +276,7 @@ return (int)STATUS.getAndBitwiseOr(this, v); } private boolean casStatus(int c, int v) { - return STATUS.weakCompareAndSet(this, c, v); + return STATUS.compareAndSet(this, c, v); } private boolean casAux(Aux c, Aux v) { return AUX.compareAndSet(this, c, v); @@ -296,84 +296,6 @@ } /** - * Possibly blocks until task is done or interrupted or timed out. - * - * @param interruptible true if wait can be cancelled by interrupt - * @param deadline if non-zero use timed waits and possibly timeout - * @param pool if nonnull pool to uncompensate after unblocking - * @return status on exit, or ABNORMAL if interrupted while waiting - */ - private int awaitDone(boolean interruptible, long deadline, - ForkJoinPool pool) { - int s; - boolean interrupted = false, queued = false, parked = false; - Aux node = null; - while ((s = status) >= 0) { - Aux a; long ns; - if (parked && Thread.interrupted()) { - if (interruptible) { - s = ABNORMAL; - break; - } - interrupted = true; - } - else if (queued) { - if (deadline != 0L) { - if ((ns = deadline - System.nanoTime()) <= 0L) - break; - LockSupport.parkNanos(ns); - } - else - LockSupport.park(); - parked = true; - } - else if (node != null) { - if ((a = aux) != null && a.ex != null) - Thread.onSpinWait(); // exception in progress - else if (queued = casAux(node.next = a, node)) - LockSupport.setCurrentBlocker(this); - } - else { - try { - node = new Aux(Thread.currentThread(), null); - } catch (Throwable ex) { // try to cancel if cannot create - casStatus(s, s | (DONE | ABNORMAL)); - } - } - } - if (pool != null) - pool.uncompensate(); - - if (queued) { - LockSupport.setCurrentBlocker(null); - if (s >= 0) { // cancellation similar to AbstractQueuedSynchronizer - outer: for (Aux a; (a = aux) != null && a.ex == null; ) { - for (Aux trail = null;;) { - Aux next = a.next; - if (a == node) { - if (trail != null) - trail.casNext(trail, next); - else if (casAux(a, next)) - break outer; // cannot be re-encountered - break; // restart - } else { - trail = a; - if ((a = next) == null) - break outer; - } - } - } - } - else { - signalWaiters(); // help clean or signal - if (interrupted) - Thread.currentThread().interrupt(); - } - } - return s; - } - - /** * Sets DONE status and wakes up threads waiting to join this task. * @return status on exit */ @@ -463,27 +385,36 @@ * Helps and/or waits for completion from join, get, or invoke; * called from either internal or external threads. * + * @param pool if nonnull, known submitted pool, else assumes current pool * @param ran true if task known to have been exec'd * @param interruptible true if park interruptibly when external * @param timed true if use timed wait * @param nanos if timed, timeout value * @return ABNORMAL if interrupted, else status on exit */ - private int awaitJoin(boolean ran, boolean interruptible, boolean timed, + private int awaitDone(ForkJoinPool pool, boolean ran, + boolean interruptible, boolean timed, long nanos) { - boolean internal; ForkJoinPool p; ForkJoinPool.WorkQueue q; int s; - Thread t; ForkJoinWorkerThread wt; - if (internal = ((t = Thread.currentThread()) - instanceof ForkJoinWorkerThread)) { - p = (wt = (ForkJoinWorkerThread)t).pool; - q = wt.workQueue; + ForkJoinPool p; boolean internal; int s; Thread t; + ForkJoinPool.WorkQueue q = null; + if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) { + ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t; + p = wt.pool; + if (pool == null) + pool = p; + if (internal = (pool == p)) + q = wt.workQueue; } else { + internal = false; p = ForkJoinPool.common; - q = ForkJoinPool.commonQueue(); - if (interruptible && Thread.interrupted()) - return ABNORMAL; + if (pool == null) + pool = p; + if (pool == p && p != null) + q = p.externalQueue(); } + if (interruptible && Thread.interrupted()) + return ABNORMAL; if ((s = status) < 0) return s; long deadline = 0L; @@ -493,22 +424,94 @@ else if ((deadline = nanos + System.nanoTime()) == 0L) deadline = 1L; } - ForkJoinPool uncompensate = null; - if (q != null && p != null) { // try helping - if ((!timed || p.isSaturated()) && - ((this instanceof CountedCompleter) ? - (s = p.helpComplete(this, q, internal)) < 0 : - (q.tryRemove(this, internal) && (s = doExec()) < 0))) - return s; + boolean uncompensate = false; + if (q != null && p != null) { // try helping + // help even in timed mode if pool has no parallelism + boolean canHelp = !timed || (p.mode & SMASK) == 0; + if (canHelp) { + if ((this instanceof CountedCompleter) && + (s = p.helpComplete(this, q, internal)) < 0) + return s; + if (!ran && ((!internal && q.externalTryUnpush(this)) || + q.tryRemove(this, internal)) && (s = doExec()) < 0) + return s; + } if (internal) { - if ((s = p.helpJoin(this, q)) < 0) + if ((s = p.helpJoin(this, q, canHelp)) < 0) return s; if (s == UNCOMPENSATE) - uncompensate = p; - interruptible = false; + uncompensate = true; } } - return awaitDone(interruptible, deadline, uncompensate); + // block until done or cancelled wait + boolean interrupted = false, queued = false; + boolean parked = false, fail = false; + Aux node = null; + while ((s = status) >= 0) { + Aux a; long ns; + if (fail || (fail = (pool != null && pool.mode < 0))) + casStatus(s, s | (DONE | ABNORMAL)); // try to cancel + else if (parked && Thread.interrupted()) { + if (interruptible) { + s = ABNORMAL; + break; + } + interrupted = true; + } + else if (queued) { + if (deadline != 0L) { + if ((ns = deadline - System.nanoTime()) <= 0L) + break; + LockSupport.parkNanos(ns); + } + else + LockSupport.park(); + parked = true; + } + else if (node != null) { + if ((a = aux) != null && a.ex != null) + Thread.onSpinWait(); // exception in progress + else if (queued = casAux(node.next = a, node)) + LockSupport.setCurrentBlocker(this); + } + else { + try { + node = new Aux(Thread.currentThread(), null); + } catch (Throwable ex) { // cannot create + fail = true; + } + } + } + if (pool != null && uncompensate) + pool.uncompensate(); + + if (queued) { + LockSupport.setCurrentBlocker(null); + if (s >= 0) { // cancellation similar to AbstractQueuedSynchronizer + outer: for (Aux a; (a = aux) != null && a.ex == null; ) { + for (Aux trail = null;;) { + Aux next = a.next; + if (a == node) { + if (trail != null) + trail.casNext(trail, next); + else if (casAux(a, next)) + break outer; // cannot be re-encountered + break; // restart + } else { + trail = a; + if ((a = next) == null) + break outer; + } + } + } + } + else { + signalWaiters(); // help clean or signal + if (interrupted) + Thread.currentThread().interrupt(); + } + } + return s; } /** @@ -664,7 +667,7 @@ public final V join() { int s; if ((s = status) >= 0) - s = awaitJoin(false, false, false, 0L); + s = awaitDone(null, false, false, false, 0L); if ((s & ABNORMAL) != 0) reportException(s); return getRawResult(); @@ -681,7 +684,7 @@ public final V invoke() { int s; if ((s = doExec()) >= 0) - s = awaitJoin(true, false, false, 0L); + s = awaitDone(null, true, false, false, 0L); if ((s & ABNORMAL) != 0) reportException(s); return getRawResult(); @@ -710,12 +713,12 @@ throw new NullPointerException(); t2.fork(); if ((s1 = t1.doExec()) >= 0) - s1 = t1.awaitJoin(true, false, false, 0L); + s1 = t1.awaitDone(null, true, false, false, 0L); if ((s1 & ABNORMAL) != 0) { cancelIgnoringExceptions(t2); t1.reportException(s1); } - else if (((s2 = t2.awaitJoin(false, false, false, 0L)) & ABNORMAL) != 0) + else if (((s2 = t2.awaitDone(null, false, false, false, 0L)) & ABNORMAL) != 0) t2.reportException(s2); } @@ -746,7 +749,7 @@ if (i == 0) { int s; if ((s = t.doExec()) >= 0) - s = t.awaitJoin(true, false, false, 0L); + s = t.awaitDone(null, true, false, false, 0L); if ((s & ABNORMAL) != 0) ex = t.getException(s); break; @@ -759,7 +762,7 @@ if ((t = tasks[i]) != null) { int s; if ((s = t.status) >= 0) - s = t.awaitJoin(false, false, false, 0L); + s = t.awaitDone(null, false, false, false, 0L); if ((s & ABNORMAL) != 0 && (ex = t.getException(s)) != null) break; } @@ -809,7 +812,7 @@ if (i == 0) { int s; if ((s = t.doExec()) >= 0) - s = t.awaitJoin(true, false, false, 0L); + s = t.awaitDone(null, true, false, false, 0L); if ((s & ABNORMAL) != 0) ex = t.getException(s); break; @@ -822,7 +825,7 @@ if ((t = ts.get(i)) != null) { int s; if ((s = t.status) >= 0) - s = t.awaitJoin(false, false, false, 0L); + s = t.awaitDone(null, false, false, false, 0L); if ((s & ABNORMAL) != 0 && (ex = t.getException(s)) != null) break; } @@ -973,8 +976,8 @@ * member of a ForkJoinPool and was interrupted while waiting */ public final V get() throws InterruptedException, ExecutionException { - int s; - if (((s = awaitJoin(false, true, false, 0L)) & ABNORMAL) != 0) + int s = awaitDone(null, false, true, false, 0L); + if ((s & ABNORMAL) != 0) reportExecutionException(s); return getRawResult(); } @@ -995,9 +998,9 @@ */ public final V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - int s; - if ((s = awaitJoin(false, true, true, unit.toNanos(timeout))) >= 0 || - (s & ABNORMAL) != 0) + long nanos = unit.toNanos(timeout); + int s = awaitDone(null, false, true, true, nanos); + if (s >= 0 || (s & ABNORMAL) != 0) reportExecutionException(s); return getRawResult(); } @@ -1010,9 +1013,10 @@ */ public final void quietlyJoin() { if (status >= 0) - awaitJoin(false, false, false, 0L); + awaitDone(null, false, false, false, 0L); } + /** * Commences performing this task and awaits its completion if * necessary, without returning its result or throwing its @@ -1020,7 +1024,37 @@ */ public final void quietlyInvoke() { if (doExec() >= 0) - awaitJoin(true, false, false, 0L); + awaitDone(null, true, false, false, 0L); + } + + // Versions of join/get for pool.invoke* methods that use external, + // possibly-non-commonPool submits + + final void awaitPoolInvoke(ForkJoinPool pool) { + awaitDone(pool, false, false, false, 0L); + } + final void awaitPoolInvoke(ForkJoinPool pool, long nanos) { + awaitDone(pool, false, true, true, nanos); + } + final V joinForPoolInvoke(ForkJoinPool pool) { + int s = awaitDone(pool, false, false, false, 0L); + if ((s & ABNORMAL) != 0) + reportException(s); + return getRawResult(); + } + final V getForPoolInvoke(ForkJoinPool pool) + throws InterruptedException, ExecutionException { + int s = awaitDone(pool, false, true, false, 0L); + if ((s & ABNORMAL) != 0) + reportExecutionException(s); + return getRawResult(); + } + final V getForPoolInvoke(ForkJoinPool pool, long nanos) + throws InterruptedException, ExecutionException, TimeoutException { + int s = awaitDone(pool, false, true, true, nanos); + if (s >= 0 || (s & ABNORMAL) != 0) + reportExecutionException(s); + return getRawResult(); } /** diff --git a/src/java.base/share/classes/java/util/concurrent/Phaser.java b/src/java.base/share/classes/java/util/concurrent/Phaser.java --- a/src/java.base/share/classes/java/util/concurrent/Phaser.java +++ b/src/java.base/share/classes/java/util/concurrent/Phaser.java @@ -148,6 +148,12 @@ * returns snapshots of these state queries in a form convenient for * informal monitoring. * + *

Memory consistency effects: Actions prior to any form of arrive + * method + * happen-before a corresponding phase advance and + * onAdvance actions (if present), which in turn happen-before + * actions following the phase advance. + * *

Sample usages: * *

A {@code Phaser} may be used instead of a {@code CountDownLatch}