< prev index next >

src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java

Print this page
8192944: Miscellaneous changes imported from jsr166 CVS 2017-12-08
Reviewed-by: martin, psandoz, chegar

*** 217,272 **** * (3) user-level methods that additionally report results. * This is sometimes hard to see because this file orders exported * methods in a way that flows well in javadocs. */ ! /* * The status field holds run control status bits packed into a ! * single int to minimize footprint and to ensure atomicity (via ! * CAS). Status is initially zero, and takes on nonnegative ! * values until completed, upon which status (anded with ! * DONE_MASK) holds value NORMAL, CANCELLED, or EXCEPTIONAL. Tasks ! * undergoing blocking waits by other threads have the SIGNAL bit ! * set. Completion of a stolen task with SIGNAL set awakens any ! * waiters via notifyAll. Even though suboptimal for some ! * purposes, we use basic builtin wait/notify to take advantage of ! * "monitor inflation" in JVMs that we would otherwise need to ! * emulate to avoid adding further per-task bookkeeping overhead. ! * We want these monitors to be "fat", i.e., not use biasing or ! * thin-lock techniques, so use some odd coding idioms that tend ! * to avoid them, mainly by arranging that every synchronized ! * block performs a wait, notifyAll or both. * * These control bits occupy only (some of) the upper half (16 * bits) of status field. The lower bits are used for user-defined * tags. */ - - /** The run status of this task */ volatile int status; // accessed directly by pool and workers ! static final int DONE_MASK = 0xf0000000; // mask out non-completion bits ! static final int NORMAL = 0xf0000000; // must be negative ! static final int CANCELLED = 0xc0000000; // must be < NORMAL ! static final int EXCEPTIONAL = 0x80000000; // must be < CANCELLED ! static final int SIGNAL = 0x00010000; // must be >= 1 << 16 ! static final int SMASK = 0x0000ffff; // short bits for tags /** ! * Marks completion and wakes up threads waiting to join this ! * task. * ! * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL ! * @return completion status on exit */ ! private int setCompletion(int completion) { ! for (int s;;) { if ((s = status) < 0) return s; ! if (STATUS.compareAndSet(this, s, s | completion)) { ! if ((s >>> 16) != 0) synchronized (this) { notifyAll(); } ! return completion; } } } /** --- 217,279 ---- * (3) user-level methods that additionally report results. * This is sometimes hard to see because this file orders exported * methods in a way that flows well in javadocs. */ ! /** * The status field holds run control status bits packed into a ! * single int to ensure atomicity. Status is initially zero, and ! * takes on nonnegative values until completed, upon which it ! * holds (sign bit) DONE, possibly with ABNORMAL (cancelled or ! * exceptional) and THROWN (in which case an exception has been ! * stored). Tasks with dependent blocked waiting joiners have the ! * SIGNAL bit set. Completion of a task with SIGNAL set awakens ! * any waiters via notifyAll. (Waiters also help signal others ! * upon completion.) * * These control bits occupy only (some of) the upper half (16 * bits) of status field. The lower bits are used for user-defined * tags. */ volatile int status; // accessed directly by pool and workers ! ! private static final int DONE = 1 << 31; // must be negative ! private static final int ABNORMAL = 1 << 18; // set atomically with DONE ! private static final int THROWN = 1 << 17; // set atomically with ABNORMAL ! private static final int SIGNAL = 1 << 16; // true if joiner waiting ! private static final int SMASK = 0xffff; // short bits for tags ! ! static boolean isExceptionalStatus(int s) { // needed by subclasses ! return (s & THROWN) != 0; ! } /** ! * Sets DONE status and wakes up threads waiting to join this task. * ! * @return status on exit */ ! private int setDone() { ! int s; ! if (((s = (int)STATUS.getAndBitwiseOr(this, DONE)) & SIGNAL) != 0) ! synchronized (this) { notifyAll(); } ! return s | DONE; ! } ! ! /** ! * Marks cancelled or exceptional completion unless already done. ! * ! * @param completion must be DONE | ABNORMAL, ORed with THROWN if exceptional ! * @return status on exit ! */ ! private int abnormalCompletion(int completion) { ! for (int s, ns;;) { if ((s = status) < 0) return s; ! else if (STATUS.weakCompareAndSet(this, s, ns = s | completion)) { ! if ((s & SIGNAL) != 0) synchronized (this) { notifyAll(); } ! return ns; } } } /**
*** 280,293 **** int s; boolean completed; if ((s = status) >= 0) { try { completed = exec(); } catch (Throwable rex) { ! return setExceptionalCompletion(rex); } if (completed) ! s = setCompletion(NORMAL); } return s; } /** --- 287,301 ---- int s; boolean completed; if ((s = status) >= 0) { try { completed = exec(); } catch (Throwable rex) { ! completed = false; ! s = setExceptionalCompletion(rex); } if (completed) ! s = setDone(); } return s; } /**
*** 295,307 **** * This task may or may not be done on exit. Ignores interrupts. * * @param timeout using Object.wait conventions. */ final void internalWait(long timeout) { ! int s; ! if ((s = status) >= 0 && // force completer to issue notify ! STATUS.compareAndSet(this, s, s | SIGNAL)) { synchronized (this) { if (status >= 0) try { wait(timeout); } catch (InterruptedException ie) { } else notifyAll(); --- 303,313 ---- * This task may or may not be done on exit. Ignores interrupts. * * @param timeout using Object.wait conventions. */ final void internalWait(long timeout) { ! if ((int)STATUS.getAndBitwiseOr(this, SIGNAL) >= 0) { synchronized (this) { if (status >= 0) try { wait(timeout); } catch (InterruptedException ie) { } else notifyAll();
*** 312,376 **** /** * Blocks a non-worker-thread until completion. * @return status upon completion */ private int externalAwaitDone() { ! int s = ((this instanceof CountedCompleter) ? // try helping ! ForkJoinPool.common.externalHelpComplete( ! (CountedCompleter<?>)this, 0) : ! ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0); ! if (s >= 0 && (s = status) >= 0) { boolean interrupted = false; - do { - if (STATUS.compareAndSet(this, s, s | SIGNAL)) { synchronized (this) { ! if (status >= 0) { try { wait(0L); } catch (InterruptedException ie) { interrupted = true; } } ! else notifyAll(); } } - } while ((s = status) >= 0); if (interrupted) Thread.currentThread().interrupt(); } return s; } /** * Blocks a non-worker-thread until completion or interruption. */ private int externalInterruptibleAwaitDone() throws InterruptedException { ! int s; ! if (Thread.interrupted()) ! throw new InterruptedException(); ! if ((s = status) >= 0 && ! (s = ((this instanceof CountedCompleter) ? ! ForkJoinPool.common.externalHelpComplete( ! (CountedCompleter<?>)this, 0) : ! ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : ! 0)) >= 0) { ! while ((s = status) >= 0) { ! if (STATUS.compareAndSet(this, s, s | SIGNAL)) { synchronized (this) { ! if (status >= 0) wait(0L); ! else notifyAll(); } } } } return s; } /** * Implementation for join, get, quietlyJoin. Directly handles * only cases of already-completed, external wait, and * unfork+exec. Others are relayed to ForkJoinPool.awaitJoin. * * @return status upon completion --- 318,389 ---- /** * Blocks a non-worker-thread until completion. * @return status upon completion */ private int externalAwaitDone() { ! int s = tryExternalHelp(); ! if (s >= 0 && (s = (int)STATUS.getAndBitwiseOr(this, SIGNAL)) >= 0) { boolean interrupted = false; synchronized (this) { ! for (;;) { ! if ((s = status) >= 0) { try { wait(0L); } catch (InterruptedException ie) { interrupted = true; } } ! else { notifyAll(); + break; + } } } if (interrupted) Thread.currentThread().interrupt(); } return s; } /** * Blocks a non-worker-thread until completion or interruption. */ private int externalInterruptibleAwaitDone() throws InterruptedException { ! int s = tryExternalHelp(); ! if (s >= 0 && (s = (int)STATUS.getAndBitwiseOr(this, SIGNAL)) >= 0) { synchronized (this) { ! for (;;) { ! if ((s = status) >= 0) wait(0L); ! else { notifyAll(); + break; } } } } + else if (Thread.interrupted()) + throw new InterruptedException(); return s; } /** + * Tries to help with tasks allowed for external callers. + * + * @return current status + */ + private int tryExternalHelp() { + int s; + return ((s = status) < 0 ? s: + (this instanceof CountedCompleter) ? + ForkJoinPool.common.externalHelpComplete( + (CountedCompleter<?>)this, 0) : + ForkJoinPool.common.tryExternalUnpush(this) ? + doExec() : 0); + } + + /** * Implementation for join, get, quietlyJoin. Directly handles * only cases of already-completed, external wait, and * unfork+exec. Others are relayed to ForkJoinPool.awaitJoin. * * @return status upon completion
*** 473,483 **** break; } } finally { lock.unlock(); } ! s = setCompletion(EXCEPTIONAL); } return s; } /** --- 486,496 ---- break; } } finally { lock.unlock(); } ! s = abnormalCompletion(DONE | ABNORMAL | THROWN); } return s; } /**
*** 485,495 **** * * @return status on exit */ private int setExceptionalCompletion(Throwable ex) { int s = recordExceptionalCompletion(ex); ! if ((s & DONE_MASK) == EXCEPTIONAL) internalPropagateException(ex); return s; } /** --- 498,508 ---- * * @return status on exit */ private int setExceptionalCompletion(Throwable ex) { int s = recordExceptionalCompletion(ex); ! if ((s & THROWN) != 0) internalPropagateException(ex); return s; } /**
*** 660,673 **** /** * Throws exception, if any, associated with the given status. */ private void reportException(int s) { ! if (s == CANCELLED) ! throw new CancellationException(); ! if (s == EXCEPTIONAL) ! rethrow(getThrowableException()); } // public methods /** --- 673,684 ---- /** * Throws exception, if any, associated with the given status. */ private void reportException(int s) { ! rethrow((s & THROWN) != 0 ? getThrowableException() : ! new CancellationException()); } // public methods /**
*** 705,715 **** * * @return the computed result */ public final V join() { int s; ! if ((s = doJoin() & DONE_MASK) != NORMAL) reportException(s); return getRawResult(); } /** --- 716,726 ---- * * @return the computed result */ public final V join() { int s; ! if (((s = doJoin()) & ABNORMAL) != 0) reportException(s); return getRawResult(); } /**
*** 720,730 **** * * @return the computed result */ public final V invoke() { int s; ! if ((s = doInvoke() & DONE_MASK) != NORMAL) reportException(s); return getRawResult(); } /** --- 731,741 ---- * * @return the computed result */ public final V invoke() { int s; ! if (((s = doInvoke()) & ABNORMAL) != 0) reportException(s); return getRawResult(); } /**
*** 745,757 **** * @throws NullPointerException if any task is null */ public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) { int s1, s2; t2.fork(); ! if ((s1 = t1.doInvoke() & DONE_MASK) != NORMAL) t1.reportException(s1); ! if ((s2 = t2.doJoin() & DONE_MASK) != NORMAL) t2.reportException(s2); } /** * Forks the given tasks, returning when {@code isDone} holds for --- 756,768 ---- * @throws NullPointerException if any task is null */ public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) { int s1, s2; t2.fork(); ! if (((s1 = t1.doInvoke()) & ABNORMAL) != 0) t1.reportException(s1); ! if (((s2 = t2.doJoin()) & ABNORMAL) != 0) t2.reportException(s2); } /** * Forks the given tasks, returning when {@code isDone} holds for
*** 777,795 **** if (ex == null) ex = new NullPointerException(); } else if (i != 0) t.fork(); ! else if (t.doInvoke() < NORMAL && ex == null) ex = t.getException(); } for (int i = 1; i <= last; ++i) { ForkJoinTask<?> t = tasks[i]; if (t != null) { if (ex != null) t.cancel(false); ! else if (t.doJoin() < NORMAL) ex = t.getException(); } } if (ex != null) rethrow(ex); --- 788,806 ---- if (ex == null) ex = new NullPointerException(); } else if (i != 0) t.fork(); ! else if ((t.doInvoke() & ABNORMAL) != 0 && ex == null) ex = t.getException(); } for (int i = 1; i <= last; ++i) { ForkJoinTask<?> t = tasks[i]; if (t != null) { if (ex != null) t.cancel(false); ! else if ((t.doJoin() & ABNORMAL) != 0) ex = t.getException(); } } if (ex != null) rethrow(ex);
*** 829,847 **** if (ex == null) ex = new NullPointerException(); } else if (i != 0) t.fork(); ! else if (t.doInvoke() < NORMAL && ex == null) ex = t.getException(); } for (int i = 1; i <= last; ++i) { ForkJoinTask<?> t = ts.get(i); if (t != null) { if (ex != null) t.cancel(false); ! else if (t.doJoin() < NORMAL) ex = t.getException(); } } if (ex != null) rethrow(ex); --- 840,858 ---- if (ex == null) ex = new NullPointerException(); } else if (i != 0) t.fork(); ! else if ((t.doInvoke() & ABNORMAL) != 0 && ex == null) ex = t.getException(); } for (int i = 1; i <= last; ++i) { ForkJoinTask<?> t = ts.get(i); if (t != null) { if (ex != null) t.cancel(false); ! else if ((t.doJoin() & ABNORMAL) != 0) ex = t.getException(); } } if (ex != null) rethrow(ex);
*** 874,925 **** * control cancellation. * * @return {@code true} if this task is now cancelled */ public boolean cancel(boolean mayInterruptIfRunning) { ! return (setCompletion(CANCELLED) & DONE_MASK) == CANCELLED; } public final boolean isDone() { return status < 0; } public final boolean isCancelled() { ! return (status & DONE_MASK) == CANCELLED; } /** * Returns {@code true} if this task threw an exception or was cancelled. * * @return {@code true} if this task threw an exception or was cancelled */ public final boolean isCompletedAbnormally() { ! return status < NORMAL; } /** * Returns {@code true} if this task completed without throwing an * exception and was not cancelled. * * @return {@code true} if this task completed without throwing an * exception and was not cancelled */ public final boolean isCompletedNormally() { ! return (status & DONE_MASK) == NORMAL; } /** * Returns the exception thrown by the base computation, or a * {@code CancellationException} if cancelled, or {@code null} if * none or if the method has not yet completed. * * @return the exception, or {@code null} if none */ public final Throwable getException() { ! int s = status & DONE_MASK; ! return ((s >= NORMAL) ? null : ! (s == CANCELLED) ? new CancellationException() : getThrowableException()); } /** * Completes this task abnormally, and if not already aborted or --- 885,937 ---- * control cancellation. * * @return {@code true} if this task is now cancelled */ public boolean cancel(boolean mayInterruptIfRunning) { ! int s = abnormalCompletion(DONE | ABNORMAL); ! return (s & (ABNORMAL | THROWN)) == ABNORMAL; } public final boolean isDone() { return status < 0; } public final boolean isCancelled() { ! return (status & (ABNORMAL | THROWN)) == ABNORMAL; } /** * Returns {@code true} if this task threw an exception or was cancelled. * * @return {@code true} if this task threw an exception or was cancelled */ public final boolean isCompletedAbnormally() { ! return (status & ABNORMAL) != 0; } /** * Returns {@code true} if this task completed without throwing an * exception and was not cancelled. * * @return {@code true} if this task completed without throwing an * exception and was not cancelled */ public final boolean isCompletedNormally() { ! return (status & (DONE | ABNORMAL)) == DONE; } /** * Returns the exception thrown by the base computation, or a * {@code CancellationException} if cancelled, or {@code null} if * none or if the method has not yet completed. * * @return the exception, or {@code null} if none */ public final Throwable getException() { ! int s = status; ! return ((s & ABNORMAL) == 0 ? null : ! (s & THROWN) == 0 ? new CancellationException() : getThrowableException()); } /** * Completes this task abnormally, and if not already aborted or
*** 959,969 **** setRawResult(value); } catch (Throwable rex) { setExceptionalCompletion(rex); return; } ! setCompletion(NORMAL); } /** * Completes this task normally without setting a value. The most * recent value established by {@link #setRawResult} (or {@code --- 971,981 ---- setRawResult(value); } catch (Throwable rex) { setExceptionalCompletion(rex); return; } ! setDone(); } /** * Completes this task normally without setting a value. The most * recent value established by {@link #setRawResult} (or {@code
*** 971,981 **** * invocations of {@code join} and related operations. * * @since 1.8 */ public final void quietlyComplete() { ! setCompletion(NORMAL); } /** * Waits if necessary for the computation to complete, and then * retrieves its result. --- 983,993 ---- * invocations of {@code join} and related operations. * * @since 1.8 */ public final void quietlyComplete() { ! setDone(); } /** * Waits if necessary for the computation to complete, and then * retrieves its result.
*** 988,1001 **** * member of a ForkJoinPool and was interrupted while waiting */ public final V get() throws InterruptedException, ExecutionException { int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ? doJoin() : externalInterruptibleAwaitDone(); ! if ((s &= DONE_MASK) == CANCELLED) ! throw new CancellationException(); ! if (s == EXCEPTIONAL) throw new ExecutionException(getThrowableException()); return getRawResult(); } /** * Waits if necessary for at most the given time for the computation --- 1000,1014 ---- * member of a ForkJoinPool and was interrupted while waiting */ public final V get() throws InterruptedException, ExecutionException { int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ? doJoin() : externalInterruptibleAwaitDone(); ! if ((s & THROWN) != 0) throw new ExecutionException(getThrowableException()); + else if ((s & ABNORMAL) != 0) + throw new CancellationException(); + else return getRawResult(); } /** * Waits if necessary for at most the given time for the computation
*** 1032,1042 **** doExec() : 0)) >= 0) { long ns, ms; // measure in nanosecs, but wait in millisecs while ((s = status) >= 0 && (ns = deadline - System.nanoTime()) > 0L) { if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) > 0L && ! STATUS.compareAndSet(this, s, s | SIGNAL)) { synchronized (this) { if (status >= 0) wait(ms); // OK to throw InterruptedException else notifyAll(); --- 1045,1055 ---- doExec() : 0)) >= 0) { long ns, ms; // measure in nanosecs, but wait in millisecs while ((s = status) >= 0 && (ns = deadline - System.nanoTime()) > 0L) { if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) > 0L && ! (s = (int)STATUS.getAndBitwiseOr(this, SIGNAL)) >= 0) { synchronized (this) { if (status >= 0) wait(ms); // OK to throw InterruptedException else notifyAll();
*** 1044,1061 **** } } } } if (s >= 0) - s = status; - if ((s &= DONE_MASK) != NORMAL) { - if (s == CANCELLED) - throw new CancellationException(); - if (s != EXCEPTIONAL) throw new TimeoutException(); throw new ExecutionException(getThrowableException()); ! } return getRawResult(); } /** * Joins this task, without returning its result or throwing its --- 1057,1072 ---- } } } } if (s >= 0) throw new TimeoutException(); + else if ((s & THROWN) != 0) throw new ExecutionException(getThrowableException()); ! else if ((s & ABNORMAL) != 0) ! throw new CancellationException(); ! else return getRawResult(); } /** * Joins this task, without returning its result or throwing its
*** 1108,1118 **** * null}. However, the value returned by {@code getRawResult} is * unaffected. To clear this value, you can invoke {@code * setRawResult(null)}. */ public void reinitialize() { ! if ((status & DONE_MASK) == EXCEPTIONAL) clearExceptionalCompletion(); else status = 0; } --- 1119,1129 ---- * null}. However, the value returned by {@code getRawResult} is * unaffected. To clear this value, you can invoke {@code * setRawResult(null)}. */ public void reinitialize() { ! if ((status & THROWN) != 0) clearExceptionalCompletion(); else status = 0; }
*** 1325,1335 **** * @return the previous value of the tag * @since 1.8 */ public final short setForkJoinTaskTag(short newValue) { for (int s;;) { ! if (STATUS.compareAndSet(this, s = status, (s & ~SMASK) | (newValue & SMASK))) return (short)s; } } --- 1336,1346 ---- * @return the previous value of the tag * @since 1.8 */ public final short setForkJoinTaskTag(short newValue) { for (int s;;) { ! if (STATUS.weakCompareAndSet(this, s = status, (s & ~SMASK) | (newValue & SMASK))) return (short)s; } }
*** 1349,1359 **** */ public final boolean compareAndSetForkJoinTaskTag(short expect, short update) { for (int s;;) { if ((short)(s = status) != expect) return false; ! if (STATUS.compareAndSet(this, s, (s & ~SMASK) | (update & SMASK))) return true; } } --- 1360,1370 ---- */ public final boolean compareAndSetForkJoinTaskTag(short expect, short update) { for (int s;;) { if ((short)(s = status) != expect) return false; ! if (STATUS.weakCompareAndSet(this, s, (s & ~SMASK) | (update & SMASK))) return true; } }
< prev index next >