--- old/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java 2021-01-05 09:10:03.305686847 -0800 +++ new/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java 2021-01-05 09:10:02.805690765 -0800 @@ -38,13 +38,11 @@ import java.io.Serializable; import java.lang.invoke.MethodHandles; import java.lang.invoke.VarHandle; -import java.lang.ref.ReferenceQueue; -import java.lang.ref.WeakReference; import java.lang.reflect.Constructor; import java.util.Collection; import java.util.List; import java.util.RandomAccess; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.LockSupport; /** * Abstract base class for tasks that run within a {@link ForkJoinPool}. @@ -217,348 +215,313 @@ * (3) user-level methods that additionally report results. * This is sometimes hard to see because this file orders exported * methods in a way that flows well in javadocs. - */ - - /** - * The status field holds run control status bits packed into a - * single int to ensure atomicity. Status is initially zero, and - * takes on nonnegative values until completed, upon which it - * holds (sign bit) DONE, possibly with ABNORMAL (cancelled or - * exceptional) and THROWN (in which case an exception has been - * stored). Tasks with dependent blocked waiting joiners have the - * SIGNAL bit set. Completion of a task with SIGNAL set awakens - * any waiters via notifyAll. (Waiters also help signal others - * upon completion.) - * - * These control bits occupy only (some of) the upper half (16 - * bits) of status field. The lower bits are used for user-defined - * tags. - */ - volatile int status; // accessed directly by pool and workers - - private static final int DONE = 1 << 31; // must be negative - private static final int ABNORMAL = 1 << 18; // set atomically with DONE - private static final int THROWN = 1 << 17; // set atomically with ABNORMAL - private static final int SIGNAL = 1 << 16; // true if joiner waiting - private static final int SMASK = 0xffff; // short bits for tags - - /** - * Constructor for subclasses to call. - */ - public ForkJoinTask() {} - - static boolean isExceptionalStatus(int s) { // needed by subclasses - return (s & THROWN) != 0; - } - - /** - * Sets DONE status and wakes up threads waiting to join this task. * - * @return status on exit + * Revision notes: The use of "Aux" field replaces previous + * reliance on a table to hold exceptions and synchronized blocks + * and monitors to wait for completion. */ - private int setDone() { - int s; - if (((s = (int)STATUS.getAndBitwiseOr(this, DONE)) & SIGNAL) != 0) - synchronized (this) { notifyAll(); } - return s | DONE; - } /** - * Marks cancelled or exceptional completion unless already done. - * - * @param completion must be DONE | ABNORMAL, ORed with THROWN if exceptional - * @return status on exit - */ - private int abnormalCompletion(int completion) { - for (int s, ns;;) { - if ((s = status) < 0) - return s; - else if (STATUS.weakCompareAndSet(this, s, ns = s | completion)) { - if ((s & SIGNAL) != 0) - synchronized (this) { notifyAll(); } - return ns; + * Nodes for threads waiting for completion, or holding a thrown + * exception (never both). Waiting threads prepend nodes + * Treiber-stack-style. Signallers detach and unpark + * waiters. Cancelled waiters try to unsplice. + */ + static final class Aux { + final Thread thread; + final Throwable ex; // null if a waiter + Aux next; // accessed only via memory-acquire chains + Aux(Thread thread, Throwable ex) { + this.thread = thread; + this.ex = ex; + } + final boolean casNext(Aux c, Aux v) { // used only in cancellation + return NEXT.compareAndSet(this, c, v); + } + private static final VarHandle NEXT; + static { + try { + NEXT = MethodHandles.lookup() + .findVarHandle(Aux.class, "next", Aux.class); + } catch (ReflectiveOperationException e) { + throw new ExceptionInInitializerError(e); } } } - /** - * Primary execution method for stolen tasks. Unless done, calls - * exec and records status if completed, but doesn't wait for - * completion otherwise. - * - * @return status on exit from this method - */ - final int doExec() { - int s; boolean completed; - if ((s = status) >= 0) { - try { - completed = exec(); - } catch (Throwable rex) { - completed = false; - s = setExceptionalCompletion(rex); + /* + * The status field holds bits packed into a single int to ensure + * atomicity. Status is initially zero, and takes on nonnegative + * values until completed, upon which it holds (sign bit) DONE, + * possibly with ABNORMAL (cancelled or exceptional) and THROWN + * (in which case an exception has been stored). A value of + * ABNORMAL without DONE signifies an interrupted wait. These + * control bits occupy only (some of) the upper half (16 bits) of + * status field. The lower bits are used for user-defined tags. + */ + private static final int DONE = 1 << 31; // must be negative + private static final int ABNORMAL = 1 << 16; + private static final int THROWN = 1 << 17; + private static final int SMASK = 0xffff; // short bits for tags + private static final int UNCOMPENSATE = 1 << 16; // helpJoin return sentinel + + // Fields + volatile int status; // accessed directly by pool and workers + private transient volatile Aux aux; // either waiters or thrown Exception + + // Support for atomic operations + private static final VarHandle STATUS; + private static final VarHandle AUX; + private int getAndBitwiseOrStatus(int v) { + return (int)STATUS.getAndBitwiseOr(this, v); + } + private boolean casStatus(int c, int v) { + return STATUS.weakCompareAndSet(this, c, v); + } + private boolean casAux(Aux c, Aux v) { + return AUX.compareAndSet(this, c, v); + } + + /** Removes and unparks waiters */ + private void signalWaiters() { + for (Aux a; (a = aux) != null && a.ex == null; ) { + if (casAux(a, null)) { // detach entire list + for (Thread t; a != null; a = a.next) { + if ((t = a.thread) != Thread.currentThread() && t != null) + LockSupport.unpark(t); // don't self-signal + } + break; } - if (completed) - s = setDone(); } - return s; } /** - * If not done, sets SIGNAL status and performs Object.wait(timeout). - * This task may or may not be done on exit. Ignores interrupts. + * Possibly blocks until task is done or interrupted or timed out. * - * @param timeout using Object.wait conventions. + * @param interruptible true if wait can be cancelled by interrupt + * @param deadline if non-zero use timed waits and possibly timeout + * @param pool if nonnull pool to uncompensate after unblocking + * @return status on exit, or ABNORMAL if interrupted while waiting */ - final void internalWait(long timeout) { - if ((int)STATUS.getAndBitwiseOr(this, SIGNAL) >= 0) { - synchronized (this) { - if (status >= 0) - try { wait(timeout); } catch (InterruptedException ie) { } + private int awaitDone(boolean interruptible, long deadline, + ForkJoinPool pool) { + int s; + boolean interrupted = false, queued = false, parked = false; + Aux node = null; + while ((s = status) >= 0) { + Aux a; long ns; + if (parked && Thread.interrupted()) { + if (interruptible) { + s = ABNORMAL; + break; + } + interrupted = true; + } + else if (queued) { + if (deadline != 0L) { + if ((ns = deadline - System.nanoTime()) <= 0L) + break; + LockSupport.parkNanos(ns); + } else - notifyAll(); + LockSupport.park(); + parked = true; + } + else if (node != null) { + if ((a = aux) != null && a.ex != null) + Thread.onSpinWait(); // exception in progress + else if (queued = casAux(node.next = a, node)) + LockSupport.setCurrentBlocker(this); + } + else { + try { + node = new Aux(Thread.currentThread(), null); + } catch (Throwable ex) { // try to cancel if cannot create + casStatus(s, s | (DONE | ABNORMAL)); + } } } - } + if (pool != null) + pool.uncompensate(); - /** - * Blocks a non-worker-thread until completion. - * @return status upon completion - */ - private int externalAwaitDone() { - int s = tryExternalHelp(); - if (s >= 0 && (s = (int)STATUS.getAndBitwiseOr(this, SIGNAL)) >= 0) { - boolean interrupted = false; - synchronized (this) { - for (;;) { - if ((s = status) >= 0) { - try { - wait(0L); - } catch (InterruptedException ie) { - interrupted = true; + if (queued) { + LockSupport.setCurrentBlocker(null); + if (s >= 0) { // cancellation similar to AbstractQueuedSynchronizer + outer: for (Aux a; (a = aux) != null && a.ex == null; ) { + for (Aux trail = null;;) { + Aux next = a.next; + if (a == node) { + if (trail != null) + trail.casNext(trail, next); + else if (casAux(a, next)) + break outer; // cannot be re-encountered + break; // restart + } else { + trail = a; + if ((a = next) == null) + break outer; } } - else { - notifyAll(); - break; - } } } - if (interrupted) - Thread.currentThread().interrupt(); + else { + signalWaiters(); // help clean or signal + if (interrupted) + Thread.currentThread().interrupt(); + } } return s; } /** - * Blocks a non-worker-thread until completion or interruption. + * Sets DONE status and wakes up threads waiting to join this task. + * @return status on exit */ - private int externalInterruptibleAwaitDone() throws InterruptedException { - int s = tryExternalHelp(); - if (s >= 0 && (s = (int)STATUS.getAndBitwiseOr(this, SIGNAL)) >= 0) { - synchronized (this) { - for (;;) { - if ((s = status) >= 0) - wait(0L); - else { - notifyAll(); - break; - } - } - } - } - else if (Thread.interrupted()) - throw new InterruptedException(); + private int setDone() { + int s = getAndBitwiseOrStatus(DONE) | DONE; + signalWaiters(); return s; } /** - * Tries to help with tasks allowed for external callers. - * - * @return current status + * Sets ABNORMAL DONE status unless already done, and wakes up threads + * waiting to join this task. + * @return status on exit */ - private int tryExternalHelp() { + private int trySetCancelled() { int s; - return ((s = status) < 0 ? s: - (this instanceof CountedCompleter) ? - ForkJoinPool.common.externalHelpComplete( - (CountedCompleter)this, 0) : - ForkJoinPool.common.tryExternalUnpush(this) ? - doExec() : 0); + do {} while ((s = status) >= 0 && !casStatus(s, s |= (DONE | ABNORMAL))); + signalWaiters(); + return s; } /** - * Implementation for join, get, quietlyJoin. Directly handles - * only cases of already-completed, external wait, and - * unfork+exec. Others are relayed to ForkJoinPool.awaitJoin. + * Records exception and sets ABNORMAL THROWN DONE status unless + * already done, and wakes up threads waiting to join this task. + * If losing a race with setDone or trySetCancelled, the exception + * may be recorded but not reported. * - * @return status upon completion + * @return status on exit */ - private int doJoin() { - int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; - return (s = status) < 0 ? s : - ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? - (w = (wt = (ForkJoinWorkerThread)t).workQueue). - tryUnpush(this) && (s = doExec()) < 0 ? s : - wt.pool.awaitJoin(w, this, 0L) : - externalAwaitDone(); + final int trySetThrown(Throwable ex) { + Aux h = new Aux(Thread.currentThread(), ex), p = null; + boolean installed = false; + int s; + while ((s = status) >= 0) { + Aux a; + if (!installed && ((a = aux) == null || a.ex == null) && + (installed = casAux(a, h))) + p = a; // list of waiters replaced by h + if (installed && casStatus(s, s |= (DONE | ABNORMAL | THROWN))) + break; + } + for (; p != null; p = p.next) + LockSupport.unpark(p.thread); + return s; } /** - * Implementation for invoke, quietlyInvoke. + * Records exception unless already done. Overridable in subclasses. * - * @return status upon completion + * @return status on exit */ - private int doInvoke() { - int s; Thread t; ForkJoinWorkerThread wt; - return (s = doExec()) < 0 ? s : - ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? - (wt = (ForkJoinWorkerThread)t).pool. - awaitJoin(wt.workQueue, this, 0L) : - externalAwaitDone(); + int trySetException(Throwable ex) { + return trySetThrown(ex); } - // Exception table support - /** - * Hash table of exceptions thrown by tasks, to enable reporting - * by callers. Because exceptions are rare, we don't directly keep - * them with task objects, but instead use a weak ref table. Note - * that cancellation exceptions don't appear in the table, but are - * instead recorded as status values. - * - * The exception table has a fixed capacity. + * Constructor for subclasses to call. */ - private static final ExceptionNode[] exceptionTable - = new ExceptionNode[32]; - - /** Lock protecting access to exceptionTable. */ - private static final ReentrantLock exceptionTableLock - = new ReentrantLock(); - - /** Reference queue of stale exceptionally completed tasks. */ - private static final ReferenceQueue> exceptionTableRefQueue - = new ReferenceQueue<>(); + public ForkJoinTask() {} - /** - * Key-value nodes for exception table. The chained hash table - * uses identity comparisons, full locking, and weak references - * for keys. The table has a fixed capacity because it only - * maintains task exceptions long enough for joiners to access - * them, so should never become very large for sustained - * periods. However, since we do not know when the last joiner - * completes, we must use weak references and expunge them. We do - * so on each operation (hence full locking). Also, some thread in - * any ForkJoinPool will call helpExpungeStaleExceptions when its - * pool becomes isQuiescent. - */ - static final class ExceptionNode extends WeakReference> { - final Throwable ex; - ExceptionNode next; - final long thrower; // use id not ref to avoid weak cycles - final int hashCode; // store task hashCode before weak ref disappears - ExceptionNode(ForkJoinTask task, Throwable ex, ExceptionNode next, - ReferenceQueue> exceptionTableRefQueue) { - super(task, exceptionTableRefQueue); - this.ex = ex; - this.next = next; - this.thrower = Thread.currentThread().getId(); - this.hashCode = System.identityHashCode(task); - } + static boolean isExceptionalStatus(int s) { // needed by subclasses + return (s & THROWN) != 0; } /** - * Records exception and sets status. + * Unless done, calls exec and records status if completed, but + * doesn't wait for completion otherwise. * - * @return status on exit + * @return status on exit from this method */ - final int recordExceptionalCompletion(Throwable ex) { - int s; + final int doExec() { + int s; boolean completed; if ((s = status) >= 0) { - int h = System.identityHashCode(this); - final ReentrantLock lock = exceptionTableLock; - lock.lock(); try { - expungeStaleExceptions(); - ExceptionNode[] t = exceptionTable; - int i = h & (t.length - 1); - for (ExceptionNode e = t[i]; ; e = e.next) { - if (e == null) { - t[i] = new ExceptionNode(this, ex, t[i], - exceptionTableRefQueue); - break; - } - if (e.get() == this) // already present - break; - } - } finally { - lock.unlock(); + completed = exec(); + } catch (Throwable rex) { + s = trySetException(rex); + completed = false; } - s = abnormalCompletion(DONE | ABNORMAL | THROWN); + if (completed) + s = setDone(); } return s; } /** - * Records exception and possibly propagates. + * Helps and/or waits for completion from join, get, or invoke; + * called from either internal or external threads. * - * @return status on exit - */ - private int setExceptionalCompletion(Throwable ex) { - int s = recordExceptionalCompletion(ex); - if ((s & THROWN) != 0) - internalPropagateException(ex); - return s; - } - - /** - * Hook for exception propagation support for tasks with completers. - */ - void internalPropagateException(Throwable ex) { - } - - /** - * Cancels, ignoring any exceptions thrown by cancel. Used during - * worker and pool shutdown. Cancel is spec'ed not to throw any - * exceptions, but if it does anyway, we have no recourse during - * shutdown, so guard against this case. - */ - static final void cancelIgnoringExceptions(ForkJoinTask t) { - if (t != null && t.status >= 0) { - try { - t.cancel(false); - } catch (Throwable ignore) { + * @param ran true if task known to have been exec'd + * @param interruptible true if park interruptibly when external + * @param timed true if use timed wait + * @param nanos if timed, timeout value + * @return ABNORMAL if interrupted, else status on exit + */ + private int awaitJoin(boolean ran, boolean interruptible, boolean timed, + long nanos) { + boolean internal; ForkJoinPool p; ForkJoinPool.WorkQueue q; int s; + Thread t; ForkJoinWorkerThread wt; + if (internal = ((t = Thread.currentThread()) + instanceof ForkJoinWorkerThread)) { + p = (wt = (ForkJoinWorkerThread)t).pool; + q = wt.workQueue; + } + else { + p = ForkJoinPool.common; + q = ForkJoinPool.commonQueue(); + if (interruptible && Thread.interrupted()) + return ABNORMAL; + } + if ((s = status) < 0) + return s; + long deadline = 0L; + if (timed) { + if (nanos <= 0L) + return 0; + else if ((deadline = nanos + System.nanoTime()) == 0L) + deadline = 1L; + } + ForkJoinPool uncompensate = null; + if (q != null && p != null) { // try helping + if ((!timed || p.isSaturated()) && + ((this instanceof CountedCompleter) ? + (s = p.helpComplete(this, q, internal)) < 0 : + (q.tryRemove(this, internal) && (s = doExec()) < 0))) + return s; + if (internal) { + if ((s = p.helpJoin(this, q)) < 0) + return s; + if (s == UNCOMPENSATE) + uncompensate = p; + interruptible = false; } } + return awaitDone(interruptible, deadline, uncompensate); } /** - * Removes exception node and clears status. + * Cancels, ignoring any exceptions thrown by cancel. Cancel is + * spec'ed not to throw any exceptions, but if it does anyway, we + * have no recourse, so guard against this case. */ - private void clearExceptionalCompletion() { - int h = System.identityHashCode(this); - final ReentrantLock lock = exceptionTableLock; - lock.lock(); - try { - ExceptionNode[] t = exceptionTable; - int i = h & (t.length - 1); - ExceptionNode e = t[i]; - ExceptionNode pred = null; - while (e != null) { - ExceptionNode next = e.next; - if (e.get() == this) { - if (pred == null) - t[i] = next; - else - pred.next = next; - break; - } - pred = e; - e = next; + static final void cancelIgnoringExceptions(Future t) { + if (t != null) { + try { + t.cancel(true); + } catch (Throwable ignore) { } - expungeStaleExceptions(); - status = 0; - } finally { - lock.unlock(); } } @@ -577,37 +540,27 @@ * @return the exception, or null if none */ private Throwable getThrowableException() { - int h = System.identityHashCode(this); - ExceptionNode e; - final ReentrantLock lock = exceptionTableLock; - lock.lock(); - try { - expungeStaleExceptions(); - ExceptionNode[] t = exceptionTable; - e = t[h & (t.length - 1)]; - while (e != null && e.get() != this) - e = e.next; - } finally { - lock.unlock(); - } - Throwable ex; - if (e == null || (ex = e.ex) == null) - return null; - if (e.thrower != Thread.currentThread().getId()) { + Throwable ex; Aux a; + if ((a = aux) == null) + ex = null; + else if ((ex = a.ex) != null && a.thread != Thread.currentThread()) { try { - Constructor noArgCtor = null; - // public ctors only + Constructor noArgCtor = null, oneArgCtor = null; for (Constructor c : ex.getClass().getConstructors()) { Class[] ps = c.getParameterTypes(); if (ps.length == 0) noArgCtor = c; - else if (ps.length == 1 && ps[0] == Throwable.class) - return (Throwable)c.newInstance(ex); + else if (ps.length == 1 && ps[0] == Throwable.class) { + oneArgCtor = c; + break; + } } - if (noArgCtor != null) { - Throwable wx = (Throwable)noArgCtor.newInstance(); - wx.initCause(ex); - return wx; + if (oneArgCtor != null) + ex = (Throwable)oneArgCtor.newInstance(ex); + else if (noArgCtor != null) { + Throwable rx = (Throwable)noArgCtor.newInstance(); + rx.initCause(ex); + ex = rx; } } catch (Exception ignore) { } @@ -616,48 +569,43 @@ } /** - * Polls stale refs and removes them. Call only while holding lock. + * Returns exception associated with the given status, or null if none. */ - private static void expungeStaleExceptions() { - for (Object x; (x = exceptionTableRefQueue.poll()) != null;) { - if (x instanceof ExceptionNode) { - ExceptionNode[] t = exceptionTable; - int i = ((ExceptionNode)x).hashCode & (t.length - 1); - ExceptionNode e = t[i]; - ExceptionNode pred = null; - while (e != null) { - ExceptionNode next = e.next; - if (e == x) { - if (pred == null) - t[i] = next; - else - pred.next = next; - break; - } - pred = e; - e = next; - } - } - } + private Throwable getException(int s) { + Throwable ex = null; + if ((s & ABNORMAL) != 0 && + ((s & THROWN) == 0 || (ex = getThrowableException()) == null)) + ex = new CancellationException(); + return ex; } /** - * If lock is available, polls stale refs and removes them. - * Called from ForkJoinPool when pools become quiescent. + * Throws exception associated with the given status, or + * CancellationException if none recorded. */ - static final void helpExpungeStaleExceptions() { - final ReentrantLock lock = exceptionTableLock; - if (lock.tryLock()) { - try { - expungeStaleExceptions(); - } finally { - lock.unlock(); - } - } + private void reportException(int s) { + ForkJoinTask.uncheckedThrow( + (s & THROWN) != 0 ? getThrowableException() : null); + } + + /** + * Throws exception for (timed or untimed) get, wrapping if + * necessary in an ExecutionException. + */ + private void reportExecutionException(int s) { + Throwable ex = null; + if (s == ABNORMAL) + ex = new InterruptedException(); + else if (s >= 0) + ex = new TimeoutException(); + else if ((s & THROWN) != 0 && (ex = getThrowableException()) != null) + ex = new ExecutionException(ex); + ForkJoinTask.uncheckedThrow(ex); } /** - * A version of "sneaky throw" to relay exceptions. + * A version of "sneaky throw" to relay exceptions in other + * contexts. */ static void rethrow(Throwable ex) { ForkJoinTask.uncheckedThrow(ex); @@ -666,22 +614,14 @@ /** * The sneaky part of sneaky throw, relying on generics * limitations to evade compiler complaints about rethrowing - * unchecked exceptions. + * unchecked exceptions. If argument null, throws + * CancellationException. */ @SuppressWarnings("unchecked") static void uncheckedThrow(Throwable t) throws T { - if (t != null) - throw (T)t; // rely on vacuous cast - else - throw new Error("Unknown Exception"); - } - - /** - * Throws exception, if any, associated with the given status. - */ - private void reportException(int s) { - rethrow((s & THROWN) != 0 ? getThrowableException() : - new CancellationException()); + if (t == null) + t = new CancellationException(); + throw (T)t; // rely on vacuous cast } // public methods @@ -702,9 +642,9 @@ * @return {@code this}, to simplify usage */ public final ForkJoinTask fork() { - Thread t; + Thread t; ForkJoinWorkerThread w; if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) - ((ForkJoinWorkerThread)t).workQueue.push(this); + (w = (ForkJoinWorkerThread)t).workQueue.push(this, w.pool); else ForkJoinPool.common.externalPush(this); return this; @@ -723,7 +663,9 @@ */ public final V join() { int s; - if (((s = doJoin()) & ABNORMAL) != 0) + if ((s = status) >= 0) + s = awaitJoin(false, false, false, 0L); + if ((s & ABNORMAL) != 0) reportException(s); return getRawResult(); } @@ -738,7 +680,9 @@ */ public final V invoke() { int s; - if (((s = doInvoke()) & ABNORMAL) != 0) + if ((s = doExec()) >= 0) + s = awaitJoin(true, false, false, 0L); + if ((s & ABNORMAL) != 0) reportException(s); return getRawResult(); } @@ -762,10 +706,16 @@ */ public static void invokeAll(ForkJoinTask t1, ForkJoinTask t2) { int s1, s2; + if (t1 == null || t2 == null) + throw new NullPointerException(); t2.fork(); - if (((s1 = t1.doInvoke()) & ABNORMAL) != 0) + if ((s1 = t1.doExec()) >= 0) + s1 = t1.awaitJoin(true, false, false, 0L); + if ((s1 & ABNORMAL) != 0) { + cancelIgnoringExceptions(t2); t1.reportException(s1); - if (((s2 = t2.doJoin()) & ABNORMAL) != 0) + } + else if (((s2 = t2.awaitJoin(false, false, false, 0L)) & ABNORMAL) != 0) t2.reportException(s2); } @@ -788,27 +738,38 @@ Throwable ex = null; int last = tasks.length - 1; for (int i = last; i >= 0; --i) { - ForkJoinTask t = tasks[i]; - if (t == null) { - if (ex == null) - ex = new NullPointerException(); - } - else if (i != 0) - t.fork(); - else if ((t.doInvoke() & ABNORMAL) != 0 && ex == null) - ex = t.getException(); - } - for (int i = 1; i <= last; ++i) { - ForkJoinTask t = tasks[i]; - if (t != null) { - if (ex != null) - t.cancel(false); - else if ((t.doJoin() & ABNORMAL) != 0) - ex = t.getException(); + ForkJoinTask t; + if ((t = tasks[i]) == null) { + ex = new NullPointerException(); + break; } + if (i == 0) { + int s; + if ((s = t.doExec()) >= 0) + s = t.awaitJoin(true, false, false, 0L); + if ((s & ABNORMAL) != 0) + ex = t.getException(s); + break; + } + t.fork(); } - if (ex != null) + if (ex == null) { + for (int i = 1; i <= last; ++i) { + ForkJoinTask t; + if ((t = tasks[i]) != null) { + int s; + if ((s = t.status) >= 0) + s = t.awaitJoin(false, false, false, 0L); + if ((s & ABNORMAL) != 0 && (ex = t.getException(s)) != null) + break; + } + } + } + if (ex != null) { + for (int i = 1; i <= last; ++i) + cancelIgnoringExceptions(tasks[i]); rethrow(ex); + } } /** @@ -838,29 +799,40 @@ List> ts = (List>) tasks; Throwable ex = null; - int last = ts.size() - 1; + int last = ts.size() - 1; // nearly same as array version for (int i = last; i >= 0; --i) { - ForkJoinTask t = ts.get(i); - if (t == null) { - if (ex == null) - ex = new NullPointerException(); - } - else if (i != 0) - t.fork(); - else if ((t.doInvoke() & ABNORMAL) != 0 && ex == null) - ex = t.getException(); - } - for (int i = 1; i <= last; ++i) { - ForkJoinTask t = ts.get(i); - if (t != null) { - if (ex != null) - t.cancel(false); - else if ((t.doJoin() & ABNORMAL) != 0) - ex = t.getException(); + ForkJoinTask t; + if ((t = ts.get(i)) == null) { + ex = new NullPointerException(); + break; + } + if (i == 0) { + int s; + if ((s = t.doExec()) >= 0) + s = t.awaitJoin(true, false, false, 0L); + if ((s & ABNORMAL) != 0) + ex = t.getException(s); + break; } + t.fork(); } - if (ex != null) + if (ex == null) { + for (int i = 1; i <= last; ++i) { + ForkJoinTask t; + if ((t = ts.get(i)) != null) { + int s; + if ((s = t.status) >= 0) + s = t.awaitJoin(false, false, false, 0L); + if ((s & ABNORMAL) != 0 && (ex = t.getException(s)) != null) + break; + } + } + } + if (ex != null) { + for (int i = 1; i <= last; ++i) + cancelIgnoringExceptions(ts.get(i)); rethrow(ex); + } return tasks; } @@ -892,8 +864,7 @@ * @return {@code true} if this task is now cancelled */ public boolean cancel(boolean mayInterruptIfRunning) { - int s = abnormalCompletion(DONE | ABNORMAL); - return (s & (ABNORMAL | THROWN)) == ABNORMAL; + return (trySetCancelled() & (ABNORMAL | THROWN)) == ABNORMAL; } public final boolean isDone() { @@ -932,10 +903,7 @@ * @return the exception, or {@code null} if none */ public final Throwable getException() { - int s = status; - return ((s & ABNORMAL) == 0 ? null : - (s & THROWN) == 0 ? new CancellationException() : - getThrowableException()); + return getException(status); } /** @@ -953,9 +921,9 @@ * thrown will be a {@code RuntimeException} with cause {@code ex}. */ public void completeExceptionally(Throwable ex) { - setExceptionalCompletion((ex instanceof RuntimeException) || - (ex instanceof Error) ? ex : - new RuntimeException(ex)); + trySetException((ex instanceof RuntimeException) || + (ex instanceof Error) ? ex : + new RuntimeException(ex)); } /** @@ -975,7 +943,7 @@ try { setRawResult(value); } catch (Throwable rex) { - setExceptionalCompletion(rex); + trySetException(rex); return; } setDone(); @@ -1005,14 +973,10 @@ * member of a ForkJoinPool and was interrupted while waiting */ public final V get() throws InterruptedException, ExecutionException { - int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ? - doJoin() : externalInterruptibleAwaitDone(); - if ((s & THROWN) != 0) - throw new ExecutionException(getThrowableException()); - else if ((s & ABNORMAL) != 0) - throw new CancellationException(); - else - return getRawResult(); + int s; + if (((s = awaitJoin(false, true, false, 0L)) & ABNORMAL) != 0) + reportExecutionException(s); + return getRawResult(); } /** @@ -1032,45 +996,10 @@ public final V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { int s; - long nanos = unit.toNanos(timeout); - if (Thread.interrupted()) - throw new InterruptedException(); - if ((s = status) >= 0 && nanos > 0L) { - long d = System.nanoTime() + nanos; - long deadline = (d == 0L) ? 1L : d; // avoid 0 - Thread t = Thread.currentThread(); - if (t instanceof ForkJoinWorkerThread) { - ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t; - s = wt.pool.awaitJoin(wt.workQueue, this, deadline); - } - else if ((s = ((this instanceof CountedCompleter) ? - ForkJoinPool.common.externalHelpComplete( - (CountedCompleter)this, 0) : - ForkJoinPool.common.tryExternalUnpush(this) ? - doExec() : 0)) >= 0) { - long ns, ms; // measure in nanosecs, but wait in millisecs - while ((s = status) >= 0 && - (ns = deadline - System.nanoTime()) > 0L) { - if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) > 0L && - (s = (int)STATUS.getAndBitwiseOr(this, SIGNAL)) >= 0) { - synchronized (this) { - if (status >= 0) - wait(ms); // OK to throw InterruptedException - else - notifyAll(); - } - } - } - } - } - if (s >= 0) - throw new TimeoutException(); - else if ((s & THROWN) != 0) - throw new ExecutionException(getThrowableException()); - else if ((s & ABNORMAL) != 0) - throw new CancellationException(); - else - return getRawResult(); + if ((s = awaitJoin(false, true, true, unit.toNanos(timeout))) >= 0 || + (s & ABNORMAL) != 0) + reportExecutionException(s); + return getRawResult(); } /** @@ -1080,7 +1009,8 @@ * known to have aborted. */ public final void quietlyJoin() { - doJoin(); + if (status >= 0) + awaitJoin(false, false, false, 0L); } /** @@ -1089,7 +1019,8 @@ * exception. */ public final void quietlyInvoke() { - doInvoke(); + if (doExec() >= 0) + awaitJoin(true, false, false, 0L); } /** @@ -1100,13 +1031,12 @@ * all are processed. */ public static void helpQuiesce() { - Thread t; - if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) { - ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t; - wt.pool.helpQuiescePool(wt.workQueue); - } + Thread t; ForkJoinWorkerThread w; ForkJoinPool p; + if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread && + (p = (w = (ForkJoinWorkerThread)t).pool) != null) + p.helpQuiescePool(w.workQueue, Long.MAX_VALUE, false); else - ForkJoinPool.quiesceCommonPool(); + ForkJoinPool.common.externalHelpQuiescePool(Long.MAX_VALUE, false); } /** @@ -1126,10 +1056,8 @@ * setRawResult(null)}. */ public void reinitialize() { - if ((status & THROWN) != 0) - clearExceptionalCompletion(); - else - status = 0; + aux = null; + status = 0; } /** @@ -1142,9 +1070,9 @@ * @return the pool, or {@code null} if none */ public static ForkJoinPool getPool() { - Thread t = Thread.currentThread(); - return (t instanceof ForkJoinWorkerThread) ? - ((ForkJoinWorkerThread) t).pool : null; + Thread t; + return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? + ((ForkJoinWorkerThread) t).pool : null); } /** @@ -1170,10 +1098,12 @@ * @return {@code true} if unforked */ public boolean tryUnfork() { - Thread t; - return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? - ((ForkJoinWorkerThread)t).workQueue.tryUnpush(this) : - ForkJoinPool.common.tryExternalUnpush(this)); + Thread t; ForkJoinPool.WorkQueue q; + return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) + ? (q = ((ForkJoinWorkerThread)t).workQueue) != null + && q.tryUnpush(this) + : (q = ForkJoinPool.commonQueue()) != null + && q.externalTryUnpush(this); } /** @@ -1189,7 +1119,7 @@ if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) q = ((ForkJoinWorkerThread)t).workQueue; else - q = ForkJoinPool.commonSubmitterQueue(); + q = ForkJoinPool.commonQueue(); return (q == null) ? 0 : q.queueSize(); } @@ -1264,7 +1194,7 @@ if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) q = ((ForkJoinWorkerThread)t).workQueue; else - q = ForkJoinPool.commonSubmitterQueue(); + q = ForkJoinPool.commonQueue(); return (q == null) ? null : q.peek(); } @@ -1279,9 +1209,8 @@ */ protected static ForkJoinTask pollNextLocalTask() { Thread t; - return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? - ((ForkJoinWorkerThread)t).workQueue.nextLocalTask() : - null; + return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? + ((ForkJoinWorkerThread)t).workQueue.nextLocalTask() : null); } /** @@ -1298,10 +1227,10 @@ * @return a task, or {@code null} if none are available */ protected static ForkJoinTask pollTask() { - Thread t; ForkJoinWorkerThread wt; - return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? - (wt = (ForkJoinWorkerThread)t).pool.nextTaskFor(wt.workQueue) : - null; + Thread t; ForkJoinWorkerThread w; + return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? + (w = (ForkJoinWorkerThread)t).pool.nextTaskFor(w.workQueue) : + null); } /** @@ -1317,8 +1246,8 @@ */ protected static ForkJoinTask pollSubmission() { Thread t; - return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? - ((ForkJoinWorkerThread)t).pool.pollSubmission() : null; + return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? + ((ForkJoinWorkerThread)t).pool.pollSubmission() : null); } // tag operations @@ -1342,8 +1271,7 @@ */ public final short setForkJoinTaskTag(short newValue) { for (int s;;) { - if (STATUS.weakCompareAndSet(this, s = status, - (s & ~SMASK) | (newValue & SMASK))) + if (casStatus(s = status, (s & ~SMASK) | (newValue & SMASK))) return (short)s; } } @@ -1366,8 +1294,7 @@ for (int s;;) { if ((short)(s = status) != expect) return false; - if (STATUS.weakCompareAndSet(this, s, - (s & ~SMASK) | (update & SMASK))) + if (casStatus(s, (s & ~SMASK) | (update & SMASK))) return true; } } @@ -1432,8 +1359,17 @@ public final Void getRawResult() { return null; } public final void setRawResult(Void v) { } public final boolean exec() { runnable.run(); return true; } - void internalPropagateException(Throwable ex) { - rethrow(ex); // rethrow outside exec() catches. + int trySetException(Throwable ex) { // if a handler, invoke it + int s; Thread t; java.lang.Thread.UncaughtExceptionHandler h; + if (isExceptionalStatus(s = trySetThrown(ex)) && + (h = ((t = Thread.currentThread()). + getUncaughtExceptionHandler())) != null) { + try { + h.uncaughtException(t, ex); + } catch (Throwable ignore) { + } + } + return s; } private static final long serialVersionUID = 5232453952276885070L; } @@ -1470,6 +1406,53 @@ private static final long serialVersionUID = 2838392045355241008L; } + static final class AdaptedInterruptibleCallable extends ForkJoinTask + implements RunnableFuture { + @SuppressWarnings("serial") // Conditionally serializable + final Callable callable; + @SuppressWarnings("serial") // Conditionally serializable + transient volatile Thread runner; + T result; + AdaptedInterruptibleCallable(Callable callable) { + if (callable == null) throw new NullPointerException(); + this.callable = callable; + } + public final T getRawResult() { return result; } + public final void setRawResult(T v) { result = v; } + public final boolean exec() { + Thread.interrupted(); + runner = Thread.currentThread(); + try { + if (!isDone()) // recheck + result = callable.call(); + return true; + } catch (RuntimeException rex) { + throw rex; + } catch (Exception ex) { + throw new RuntimeException(ex); + } finally { + runner = null; + Thread.interrupted(); + } + } + public final void run() { invoke(); } + public final boolean cancel(boolean mayInterruptIfRunning) { + Thread t; + boolean stat = super.cancel(false); + if (mayInterruptIfRunning && (t = runner) != null) { + try { + t.interrupt(); + } catch (Throwable ignore) { + } + } + return stat; + } + public String toString() { + return super.toString() + "[Wrapped task = " + callable + "]"; + } + private static final long serialVersionUID = 2838392045355241008L; + } + /** * Returns a new {@code ForkJoinTask} that performs the {@code run} * method of the given {@code Runnable} as its action, and returns @@ -1510,6 +1493,26 @@ return new AdaptedCallable(callable); } + /** + * Returns a new {@code ForkJoinTask} that performs the {@code call} + * method of the given {@code Callable} as its action, and returns + * its result upon {@link #join}, translating any checked exceptions + * encountered into {@code RuntimeException}. Additionally, + * invocations of {@code cancel} with {@code mayInterruptIfRunning + * true} will attempt to interrupt the thread performing the task. + * + * @param callable the callable action + * @param the type of the callable's result + * @return the task + * + * @since 17 + */ + // adaptInterruptible deferred to its own independent change + // https://bugs.openjdk.java.net/browse/JDK-8246587 + /* TODO: public */ private static ForkJoinTask adaptInterruptible(Callable callable) { + return new AdaptedInterruptibleCallable(callable); + } + // Serialization support private static final long serialVersionUID = -7721805057305804111L; @@ -1524,8 +1527,9 @@ */ private void writeObject(java.io.ObjectOutputStream s) throws java.io.IOException { + Aux a; s.defaultWriteObject(); - s.writeObject(getException()); + s.writeObject((a = aux) == null ? null : a.ex); } /** @@ -1540,15 +1544,14 @@ s.defaultReadObject(); Object ex = s.readObject(); if (ex != null) - setExceptionalCompletion((Throwable)ex); + trySetThrown((Throwable)ex); } - // VarHandle mechanics - private static final VarHandle STATUS; static { try { MethodHandles.Lookup l = MethodHandles.lookup(); STATUS = l.findVarHandle(ForkJoinTask.class, "status", int.class); + AUX = l.findVarHandle(ForkJoinTask.class, "aux", Aux.class); } catch (ReflectiveOperationException e) { throw new ExceptionInInitializerError(e); }