< prev index next >

src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java

Print this page
8259800: timeout in tck test testForkJoin(ForkJoinPool8Test)
Reviewed-by: martin

*** 1044,1060 **** /** * Locking version of tryUnpush. */ final boolean externalTryUnpush(ForkJoinTask<?> task) { boolean taken = false; int s = top, cap, k; ForkJoinTask<?>[] a; ! if ((a = array) != null && (cap = a.length) > 0 && ! a[k = (cap - 1) & (s - 1)] == task && tryLock()) { ! if (top == s && array == a && ! (taken = casSlotToNull(a, k, task))) top = s - 1; ! source = 0; // release lock } return taken; } /** --- 1044,1069 ---- /** * Locking version of tryUnpush. */ final boolean externalTryUnpush(ForkJoinTask<?> task) { boolean taken = false; + for (;;) { int s = top, cap, k; ForkJoinTask<?>[] a; ! if ((a = array) == null || (cap = a.length) <= 0 || ! a[k = (cap - 1) & (s - 1)] != task) ! break; ! if (tryLock()) { ! if (top == s && array == a) { ! if (taken = casSlotToNull(a, k, task)) { top = s - 1; ! source = 0; ! break; ! } ! } ! source = 0; // release lock for retry ! } ! Thread.yield(); // trylock failure } return taken; } /**
*** 1192,1210 **** if (top == p && array == a && (taken = casSlotToNull(a, k, t))) top = s; source = 0; } break; } else if ((f = f.completer) == null) break; } ! if (!taken) ! break; ! t.doExec(); ! if (limit != 0 && --limit == 0) break; } return status; } --- 1201,1220 ---- if (top == p && array == a && (taken = casSlotToNull(a, k, t))) top = s; source = 0; } + if (taken) + t.doExec(); + else if (!owned) + Thread.yield(); // tryLock failure break; } else if ((f = f.completer) == null) break; } ! if (taken && limit != 0 && --limit == 0) break; } return status; }
*** 1584,1594 **** * See above for explanation. * * @param w caller's WorkQueue (may be null on failed initialization) */ final void runWorker(WorkQueue w) { ! if (w != null) { // skip on failed init w.config |= SRC; // mark as valid source int r = w.stackPred, src = 0; // use seed from registerWorker do { r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift } while ((src = scan(w, src, r)) >= 0 || --- 1594,1604 ---- * See above for explanation. * * @param w caller's WorkQueue (may be null on failed initialization) */ final void runWorker(WorkQueue w) { ! if (mode >= 0 && w != null) { // skip on failed init w.config |= SRC; // mark as valid source int r = w.stackPred, src = 0; // use seed from registerWorker do { r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift } while ((src = scan(w, src, r)) >= 0 ||
*** 1709,1734 **** } // Utilities used by ForkJoinTask /** - * Returns true if all workers are busy, possibly creating one if allowed - */ - final boolean isSaturated() { - int maxTotal = bounds >>> SWIDTH; - for (long c;;) { - if (((int)(c = ctl) & ~UNSIGNALLED) != 0) - return false; - if ((short)(c >>> TC_SHIFT) >= maxTotal) - return true; - long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK); - if (compareAndSetCtl(c, nc)) - return !createWorker(); - } - } - - /** * Returns true if can start terminating if enabled, or already terminated */ final boolean canStop() { outer: for (long oldSum = 0L;;) { // repeat until stable int md; WorkQueue[] qs; long c; --- 1719,1728 ----
*** 1763,1779 **** * @param c incoming ctl value * @return UNCOMPENSATE: block then adjust, 0: block, -1 : retry */ private int tryCompensate(long c) { Predicate<? super ForkJoinPool> sat; ! int b = bounds; // counts are signed; centered at parallelism level == 0 int minActive = (short)(b & SMASK), maxTotal = b >>> SWIDTH, active = (int)(c >> RC_SHIFT), total = (short)(c >>> TC_SHIFT), sp = (int)c & ~UNSIGNALLED; ! if (total >= 0) { if (sp != 0) { // activate idle worker WorkQueue[] qs; int n; WorkQueue v; if ((qs = queues) != null && (n = qs.length) > 0 && (v = qs[sp & (n - 1)]) != null) { Thread vt = v.owner; --- 1757,1776 ---- * @param c incoming ctl value * @return UNCOMPENSATE: block then adjust, 0: block, -1 : retry */ private int tryCompensate(long c) { Predicate<? super ForkJoinPool> sat; ! int md = mode, b = bounds; ! // counts are signed; centered at parallelism level == 0 int minActive = (short)(b & SMASK), maxTotal = b >>> SWIDTH, active = (int)(c >> RC_SHIFT), total = (short)(c >>> TC_SHIFT), sp = (int)c & ~UNSIGNALLED; ! if ((md & SMASK) == 0) ! return 0; // cannot compensate if parallelism zero ! else if (total >= 0) { if (sp != 0) { // activate idle worker WorkQueue[] qs; int n; WorkQueue v; if ((qs = queues) != null && (n = qs.length) > 0 && (v = qs[sp & (n - 1)]) != null) { Thread vt = v.owner;
*** 1817,1829 **** * queues for a task produced by one of w's stealers; returning * compensated blocking sentinel if none are found. * * @param task the task * @param w caller's WorkQueue * @return task status on exit, or UNCOMPENSATE for compensated blocking */ ! final int helpJoin(ForkJoinTask<?> task, WorkQueue w) { int s = 0; if (task != null && w != null) { int wsrc = w.source, wid = w.config & SMASK, r = wid + 2; boolean scan = true; long c = 0L; // track ctl stability --- 1814,1827 ---- * queues for a task produced by one of w's stealers; returning * compensated blocking sentinel if none are found. * * @param task the task * @param w caller's WorkQueue + * @param canHelp if false, compensate only * @return task status on exit, or UNCOMPENSATE for compensated blocking */ ! final int helpJoin(ForkJoinTask<?> task, WorkQueue w, boolean canHelp) { int s = 0; if (task != null && w != null) { int wsrc = w.source, wid = w.config & SMASK, r = wid + 2; boolean scan = true; long c = 0L; // track ctl stability
*** 1834,1844 **** if (mode < 0) ForkJoinTask.cancelIgnoringExceptions(task); else if (c == (c = ctl) && (s = tryCompensate(c)) >= 0) break; // block } ! else { // scan for subtasks WorkQueue[] qs = queues; int n = (qs == null) ? 0 : qs.length, m = n - 1; for (int i = n; i > 0; i -= 2, r += 2) { int j; WorkQueue q, x, y; ForkJoinTask<?>[] a; if ((q = qs[j = r & m]) != null) { --- 1832,1842 ---- if (mode < 0) ForkJoinTask.cancelIgnoringExceptions(task); else if (c == (c = ctl) && (s = tryCompensate(c)) >= 0) break; // block } ! else if (canHelp) { // scan for subtasks WorkQueue[] qs = queues; int n = (qs == null) ? 0 : qs.length, m = n - 1; for (int i = n; i > 0; i -= 2, r += 2) { int j; WorkQueue q, x, y; ForkJoinTask<?>[] a; if ((q = qs[j = r & m]) != null) {
*** 2193,2214 **** (n = qs.length) > 0 && r != 0) ? qs[(n - 1) & (r << 1)] : null; } /** * If the given executor is a ForkJoinPool, poll and execute * AsynchronousCompletionTasks from worker's queue until none are * available or blocker is released. */ static void helpAsyncBlocker(Executor e, ManagedBlocker blocker) { WorkQueue w = null; Thread t; ForkJoinWorkerThread wt; if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) { if ((wt = (ForkJoinWorkerThread)t).pool == e) w = wt.workQueue; } ! else if (e == common) ! w = commonQueue(); if (w != null) w.helpAsyncBlocker(blocker); } /** --- 2191,2222 ---- (n = qs.length) > 0 && r != 0) ? qs[(n - 1) & (r << 1)] : null; } /** + * Returns queue for an external thread, if one exists + */ + final WorkQueue externalQueue() { + WorkQueue[] qs; + int r = ThreadLocalRandom.getProbe(), n; + return ((qs = queues) != null && (n = qs.length) > 0 && r != 0) ? + qs[(n - 1) & (r << 1)] : null; + } + + /** * If the given executor is a ForkJoinPool, poll and execute * AsynchronousCompletionTasks from worker's queue until none are * available or blocker is released. */ static void helpAsyncBlocker(Executor e, ManagedBlocker blocker) { WorkQueue w = null; Thread t; ForkJoinWorkerThread wt; if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) { if ((wt = (ForkJoinWorkerThread)t).pool == e) w = wt.workQueue; } ! else if (e instanceof ForkJoinPool) ! w = ((ForkJoinPool)e).externalQueue(); if (w != null) w.helpAsyncBlocker(blocker); } /**
*** 2290,2307 **** if ((md & STOP) == 0) { if (!now && !canStop()) return false; md = getAndBitwiseOrMode(STOP); } ! for (int k = 0; k < 2; ++k) { // twice in case of lagging qs updates ! for (ForkJoinTask<?> t; (t = pollScan(false)) != null; ) ForkJoinTask.cancelIgnoringExceptions(t); // help cancel WorkQueue[] qs; int n; WorkQueue q; Thread thread; if ((qs = queues) != null && (n = qs.length) > 0) { for (int j = 1; j < n; j += 2) { // unblock other workers if ((q = qs[j]) != null && (thread = q.owner) != null && !thread.isInterrupted()) { try { thread.interrupt(); } catch (Throwable ignore) { } } --- 2298,2319 ---- if ((md & STOP) == 0) { if (!now && !canStop()) return false; md = getAndBitwiseOrMode(STOP); } ! for (boolean rescan = true;;) { // repeat until no changes ! boolean changed = false; ! for (ForkJoinTask<?> t; (t = pollScan(false)) != null; ) { ! changed = true; ForkJoinTask.cancelIgnoringExceptions(t); // help cancel + } WorkQueue[] qs; int n; WorkQueue q; Thread thread; if ((qs = queues) != null && (n = qs.length) > 0) { for (int j = 1; j < n; j += 2) { // unblock other workers if ((q = qs[j]) != null && (thread = q.owner) != null && !thread.isInterrupted()) { + changed = true; try { thread.interrupt(); } catch (Throwable ignore) { } }
*** 2315,2324 **** --- 2327,2342 ---- lock.lock(); if ((cond = termination) != null) cond.signalAll(); lock.unlock(); } + if (changed) + rescan = true; + else if (rescan) + rescan = false; + else + break; } return true; } // Exported methods
*** 2538,2555 **** if (pp != null) parallelism = Integer.parseInt(pp); } catch (Exception ignore) { } int p = this.mode = Math.min(Math.max(parallelism, 0), MAX_CAP); int size = 1 << (33 - Integer.numberOfLeadingZeros(p > 0 ? p - 1 : 1)); this.factory = (fac != null) ? fac : new DefaultCommonPoolForkJoinWorkerThreadFactory(); this.ueh = handler; this.keepAlive = DEFAULT_KEEPALIVE; this.saturate = null; this.workerNamePrefix = null; ! this.bounds = ((1 - p) & SMASK) | (COMMON_MAX_SPARES << SWIDTH); this.ctl = ((((long)(-p) << TC_SHIFT) & TC_MASK) | (((long)(-p) << RC_SHIFT) & RC_MASK)); this.queues = new WorkQueue[size]; this.registrationLock = new ReentrantLock(); } --- 2556,2575 ---- if (pp != null) parallelism = Integer.parseInt(pp); } catch (Exception ignore) { } int p = this.mode = Math.min(Math.max(parallelism, 0), MAX_CAP); + int maxSpares = (p == 0) ? 0 : COMMON_MAX_SPARES; + int bnds = ((1 - p) & SMASK) | (maxSpares << SWIDTH); int size = 1 << (33 - Integer.numberOfLeadingZeros(p > 0 ? p - 1 : 1)); this.factory = (fac != null) ? fac : new DefaultCommonPoolForkJoinWorkerThreadFactory(); this.ueh = handler; this.keepAlive = DEFAULT_KEEPALIVE; this.saturate = null; this.workerNamePrefix = null; ! this.bounds = bnds; this.ctl = ((((long)(-p) << TC_SHIFT) & TC_MASK) | (((long)(-p) << RC_SHIFT) & RC_MASK)); this.queues = new WorkQueue[size]; this.registrationLock = new ReentrantLock(); }
*** 2591,2601 **** * @throws RejectedExecutionException if the task cannot be * scheduled for execution */ public <T> T invoke(ForkJoinTask<T> task) { externalSubmit(task); ! return task.join(); } /** * Arranges for (asynchronous) execution of the given task. * --- 2611,2621 ---- * @throws RejectedExecutionException if the task cannot be * scheduled for execution */ public <T> T invoke(ForkJoinTask<T> task) { externalSubmit(task); ! return task.joinForPoolInvoke(this); } /** * Arranges for (asynchronous) execution of the given task. *
*** 2683,2693 **** new ForkJoinTask.AdaptedInterruptibleCallable<T>(t); futures.add(f); externalSubmit(f); } for (int i = futures.size() - 1; i >= 0; --i) ! ((ForkJoinTask<?>)futures.get(i)).quietlyJoin(); return futures; } catch (Throwable t) { for (Future<T> e : futures) ForkJoinTask.cancelIgnoringExceptions(e); throw t; --- 2703,2713 ---- new ForkJoinTask.AdaptedInterruptibleCallable<T>(t); futures.add(f); externalSubmit(f); } for (int i = futures.size() - 1; i >= 0; --i) ! ((ForkJoinTask<?>)futures.get(i)).awaitPoolInvoke(this); return futures; } catch (Throwable t) { for (Future<T> e : futures) ForkJoinTask.cancelIgnoringExceptions(e); throw t;
*** 2713,2727 **** Future<T> f = futures.get(i); if (!f.isDone()) { if (timedOut) ForkJoinTask.cancelIgnoringExceptions(f); else { ! try { ! f.get(ns, TimeUnit.NANOSECONDS); ! } catch (CancellationException | TimeoutException | ! ExecutionException ok) { ! } if ((ns = nanos - (System.nanoTime() - startTime)) < 0L) timedOut = true; } } } --- 2733,2743 ---- Future<T> f = futures.get(i); if (!f.isDone()) { if (timedOut) ForkJoinTask.cancelIgnoringExceptions(f); else { ! ((ForkJoinTask<T>)f).awaitPoolInvoke(this, ns); if ((ns = nanos - (System.nanoTime() - startTime)) < 0L) timedOut = true; } } }
*** 2744,2758 **** pool = p; count = new AtomicInteger(n); } final void tryComplete(Callable<E> c) { // called by InvokeAnyTasks Throwable ex = null; ! boolean failed = (c == null || isCancelled() || ! (pool != null && pool.mode < 0)); ! if (!failed && !isDone()) { try { complete(c.call()); } catch (Throwable tx) { ex = tx; failed = true; } } --- 2760,2779 ---- pool = p; count = new AtomicInteger(n); } final void tryComplete(Callable<E> c) { // called by InvokeAnyTasks Throwable ex = null; ! boolean failed; ! if (c == null || Thread.interrupted() || ! (pool != null && pool.mode < 0)) ! failed = true; ! else if (isDone()) ! failed = false; ! else { try { complete(c.call()); + failed = false; } catch (Throwable tx) { ex = tx; failed = true; } }
*** 2815,2825 **** fs.add(f); externalSubmit(f); if (root.isDone()) break; } ! return root.get(); } finally { for (InvokeAnyTask<T> f : fs) ForkJoinTask.cancelIgnoringExceptions(f); } } --- 2836,2846 ---- fs.add(f); externalSubmit(f); if (root.isDone()) break; } ! return root.getForPoolInvoke(this); } finally { for (InvokeAnyTask<T> f : fs) ForkJoinTask.cancelIgnoringExceptions(f); } }
*** 2842,2852 **** fs.add(f); externalSubmit(f); if (root.isDone()) break; } ! return root.get(nanos, TimeUnit.NANOSECONDS); } finally { for (InvokeAnyTask<T> f : fs) ForkJoinTask.cancelIgnoringExceptions(f); } } --- 2863,2873 ---- fs.add(f); externalSubmit(f); if (root.isDone()) break; } ! return root.getForPoolInvoke(this, nanos); } finally { for (InvokeAnyTask<T> f : fs) ForkJoinTask.cancelIgnoringExceptions(f); } }
< prev index next >