--- old/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java 2021-02-09 06:27:02.079706462 -0800 +++ new/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java 2021-02-09 06:27:01.707709321 -0800 @@ -1046,13 +1046,22 @@ */ final boolean externalTryUnpush(ForkJoinTask task) { boolean taken = false; - int s = top, cap, k; ForkJoinTask[] a; - if ((a = array) != null && (cap = a.length) > 0 && - a[k = (cap - 1) & (s - 1)] == task && tryLock()) { - if (top == s && array == a && - (taken = casSlotToNull(a, k, task))) - top = s - 1; - source = 0; // release lock + for (;;) { + int s = top, cap, k; ForkJoinTask[] a; + if ((a = array) == null || (cap = a.length) <= 0 || + a[k = (cap - 1) & (s - 1)] != task) + break; + if (tryLock()) { + if (top == s && array == a) { + if (taken = casSlotToNull(a, k, task)) { + top = s - 1; + source = 0; + break; + } + } + source = 0; // release lock for retry + } + Thread.yield(); // trylock failure } return taken; } @@ -1194,15 +1203,16 @@ top = s; source = 0; } + if (taken) + t.doExec(); + else if (!owned) + Thread.yield(); // tryLock failure break; } else if ((f = f.completer) == null) break; } - if (!taken) - break; - t.doExec(); - if (limit != 0 && --limit == 0) + if (taken && limit != 0 && --limit == 0) break; } return status; @@ -1586,7 +1596,7 @@ * @param w caller's WorkQueue (may be null on failed initialization) */ final void runWorker(WorkQueue w) { - if (w != null) { // skip on failed init + if (mode >= 0 && w != null) { // skip on failed init w.config |= SRC; // mark as valid source int r = w.stackPred, src = 0; // use seed from registerWorker do { @@ -1711,22 +1721,6 @@ // Utilities used by ForkJoinTask /** - * Returns true if all workers are busy, possibly creating one if allowed - */ - final boolean isSaturated() { - int maxTotal = bounds >>> SWIDTH; - for (long c;;) { - if (((int)(c = ctl) & ~UNSIGNALLED) != 0) - return false; - if ((short)(c >>> TC_SHIFT) >= maxTotal) - return true; - long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK); - if (compareAndSetCtl(c, nc)) - return !createWorker(); - } - } - - /** * Returns true if can start terminating if enabled, or already terminated */ final boolean canStop() { @@ -1765,13 +1759,16 @@ */ private int tryCompensate(long c) { Predicate sat; - int b = bounds; // counts are signed; centered at parallelism level == 0 + int md = mode, b = bounds; + // counts are signed; centered at parallelism level == 0 int minActive = (short)(b & SMASK), maxTotal = b >>> SWIDTH, active = (int)(c >> RC_SHIFT), total = (short)(c >>> TC_SHIFT), sp = (int)c & ~UNSIGNALLED; - if (total >= 0) { + if ((md & SMASK) == 0) + return 0; // cannot compensate if parallelism zero + else if (total >= 0) { if (sp != 0) { // activate idle worker WorkQueue[] qs; int n; WorkQueue v; if ((qs = queues) != null && (n = qs.length) > 0 && @@ -1819,9 +1816,10 @@ * * @param task the task * @param w caller's WorkQueue + * @param canHelp if false, compensate only * @return task status on exit, or UNCOMPENSATE for compensated blocking */ - final int helpJoin(ForkJoinTask task, WorkQueue w) { + final int helpJoin(ForkJoinTask task, WorkQueue w, boolean canHelp) { int s = 0; if (task != null && w != null) { int wsrc = w.source, wid = w.config & SMASK, r = wid + 2; @@ -1836,7 +1834,7 @@ else if (c == (c = ctl) && (s = tryCompensate(c)) >= 0) break; // block } - else { // scan for subtasks + else if (canHelp) { // scan for subtasks WorkQueue[] qs = queues; int n = (qs == null) ? 0 : qs.length, m = n - 1; for (int i = n; i > 0; i -= 2, r += 2) { @@ -2195,6 +2193,16 @@ } /** + * Returns queue for an external thread, if one exists + */ + final WorkQueue externalQueue() { + WorkQueue[] qs; + int r = ThreadLocalRandom.getProbe(), n; + return ((qs = queues) != null && (n = qs.length) > 0 && r != 0) ? + qs[(n - 1) & (r << 1)] : null; + } + + /** * If the given executor is a ForkJoinPool, poll and execute * AsynchronousCompletionTasks from worker's queue until none are * available or blocker is released. @@ -2205,8 +2213,8 @@ if ((wt = (ForkJoinWorkerThread)t).pool == e) w = wt.workQueue; } - else if (e == common) - w = commonQueue(); + else if (e instanceof ForkJoinPool) + w = ((ForkJoinPool)e).externalQueue(); if (w != null) w.helpAsyncBlocker(blocker); } @@ -2292,14 +2300,18 @@ return false; md = getAndBitwiseOrMode(STOP); } - for (int k = 0; k < 2; ++k) { // twice in case of lagging qs updates - for (ForkJoinTask t; (t = pollScan(false)) != null; ) + for (boolean rescan = true;;) { // repeat until no changes + boolean changed = false; + for (ForkJoinTask t; (t = pollScan(false)) != null; ) { + changed = true; ForkJoinTask.cancelIgnoringExceptions(t); // help cancel + } WorkQueue[] qs; int n; WorkQueue q; Thread thread; if ((qs = queues) != null && (n = qs.length) > 0) { for (int j = 1; j < n; j += 2) { // unblock other workers if ((q = qs[j]) != null && (thread = q.owner) != null && !thread.isInterrupted()) { + changed = true; try { thread.interrupt(); } catch (Throwable ignore) { @@ -2317,6 +2329,12 @@ cond.signalAll(); lock.unlock(); } + if (changed) + rescan = true; + else if (rescan) + rescan = false; + else + break; } return true; } @@ -2540,6 +2558,8 @@ } catch (Exception ignore) { } int p = this.mode = Math.min(Math.max(parallelism, 0), MAX_CAP); + int maxSpares = (p == 0) ? 0 : COMMON_MAX_SPARES; + int bnds = ((1 - p) & SMASK) | (maxSpares << SWIDTH); int size = 1 << (33 - Integer.numberOfLeadingZeros(p > 0 ? p - 1 : 1)); this.factory = (fac != null) ? fac : new DefaultCommonPoolForkJoinWorkerThreadFactory(); @@ -2547,7 +2567,7 @@ this.keepAlive = DEFAULT_KEEPALIVE; this.saturate = null; this.workerNamePrefix = null; - this.bounds = ((1 - p) & SMASK) | (COMMON_MAX_SPARES << SWIDTH); + this.bounds = bnds; this.ctl = ((((long)(-p) << TC_SHIFT) & TC_MASK) | (((long)(-p) << RC_SHIFT) & RC_MASK)); this.queues = new WorkQueue[size]; @@ -2593,7 +2613,7 @@ */ public T invoke(ForkJoinTask task) { externalSubmit(task); - return task.join(); + return task.joinForPoolInvoke(this); } /** @@ -2685,7 +2705,7 @@ externalSubmit(f); } for (int i = futures.size() - 1; i >= 0; --i) - ((ForkJoinTask)futures.get(i)).quietlyJoin(); + ((ForkJoinTask)futures.get(i)).awaitPoolInvoke(this); return futures; } catch (Throwable t) { for (Future e : futures) @@ -2715,11 +2735,7 @@ if (timedOut) ForkJoinTask.cancelIgnoringExceptions(f); else { - try { - f.get(ns, TimeUnit.NANOSECONDS); - } catch (CancellationException | TimeoutException | - ExecutionException ok) { - } + ((ForkJoinTask)f).awaitPoolInvoke(this, ns); if ((ns = nanos - (System.nanoTime() - startTime)) < 0L) timedOut = true; } @@ -2746,11 +2762,16 @@ } final void tryComplete(Callable c) { // called by InvokeAnyTasks Throwable ex = null; - boolean failed = (c == null || isCancelled() || - (pool != null && pool.mode < 0)); - if (!failed && !isDone()) { + boolean failed; + if (c == null || Thread.interrupted() || + (pool != null && pool.mode < 0)) + failed = true; + else if (isDone()) + failed = false; + else { try { complete(c.call()); + failed = false; } catch (Throwable tx) { ex = tx; failed = true; @@ -2817,7 +2838,7 @@ if (root.isDone()) break; } - return root.get(); + return root.getForPoolInvoke(this); } finally { for (InvokeAnyTask f : fs) ForkJoinTask.cancelIgnoringExceptions(f); @@ -2844,7 +2865,7 @@ if (root.isDone()) break; } - return root.get(nanos, TimeUnit.NANOSECONDS); + return root.getForPoolInvoke(this, nanos); } finally { for (InvokeAnyTask f : fs) ForkJoinTask.cancelIgnoringExceptions(f);