< prev index next >

src/java.base/share/classes/java/util/concurrent/CountedCompleter.java

Print this page
8246585: ForkJoin updates
Reviewed-by: martin

*** 355,365 **** * actions inside {@code onCompletion} occur (once) upon completion of * a task and its subtasks. No additional synchronization is required * within this method to ensure thread safety of accesses to fields of * this task or other completed tasks. * ! * <p><b>Completion Traversals</b>. If using {@code onCompletion} to * process completions is inapplicable or inconvenient, you can use * methods {@link #firstComplete} and {@link #nextComplete} to create * custom traversals. For example, to define a MapReducer that only * splits out right-hand tasks in the form of the third ForEach * example, the completions must cooperatively reduce along --- 355,365 ---- * actions inside {@code onCompletion} occur (once) upon completion of * a task and its subtasks. No additional synchronization is required * within this method to ensure thread safety of accesses to fields of * this task or other completed tasks. * ! * <p><b>Completion Traversals.</b> If using {@code onCompletion} to * process completions is inapplicable or inconvenient, you can use * methods {@link #firstComplete} and {@link #nextComplete} to create * custom traversals. For example, to define a MapReducer that only * splits out right-hand tasks in the form of the third ForEach * example, the completions must cooperatively reduce along
*** 551,570 **** */ public final boolean compareAndSetPendingCount(int expected, int count) { return PENDING.compareAndSet(this, expected, count); } /** * If the pending count is nonzero, (atomically) decrements it. * * @return the initial (undecremented) pending count holding on entry * to this method */ public final int decrementPendingCountUnlessZero() { int c; do {} while ((c = pending) != 0 && ! !PENDING.weakCompareAndSet(this, c, c - 1)); return c; } /** * Returns the root of the current computation; i.e., this --- 551,575 ---- */ public final boolean compareAndSetPendingCount(int expected, int count) { return PENDING.compareAndSet(this, expected, count); } + // internal-only weak version + final boolean weakCompareAndSetPendingCount(int expected, int count) { + return PENDING.weakCompareAndSet(this, expected, count); + } + /** * If the pending count is nonzero, (atomically) decrements it. * * @return the initial (undecremented) pending count holding on entry * to this method */ public final int decrementPendingCountUnlessZero() { int c; do {} while ((c = pending) != 0 && ! !weakCompareAndSetPendingCount(c, c - 1)); return c; } /** * Returns the root of the current computation; i.e., this
*** 593,603 **** if ((a = (s = a).completer) == null) { s.quietlyComplete(); return; } } ! else if (PENDING.weakCompareAndSet(a, c, c - 1)) return; } } /** --- 598,608 ---- if ((a = (s = a).completer) == null) { s.quietlyComplete(); return; } } ! else if (a.weakCompareAndSetPendingCount(c, c - 1)) return; } } /**
*** 616,626 **** if ((a = (s = a).completer) == null) { s.quietlyComplete(); return; } } ! else if (PENDING.weakCompareAndSet(a, c, c - 1)) return; } } /** --- 621,631 ---- if ((a = (s = a).completer) == null) { s.quietlyComplete(); return; } } ! else if (a.weakCompareAndSetPendingCount(c, c - 1)) return; } } /**
*** 661,671 **** */ public final CountedCompleter<?> firstComplete() { for (int c;;) { if ((c = pending) == 0) return this; ! else if (PENDING.weakCompareAndSet(this, c, c - 1)) return null; } } /** --- 666,676 ---- */ public final CountedCompleter<?> firstComplete() { for (int c;;) { if ((c = pending) == 0) return this; ! else if (weakCompareAndSetPendingCount(c, c - 1)) return null; } } /**
*** 716,749 **** * @param maxTasks the maximum number of tasks to process. If * less than or equal to zero, then no tasks are * processed. */ public final void helpComplete(int maxTasks) { ! Thread t; ForkJoinWorkerThread wt; ! if (maxTasks > 0 && status >= 0) { ! if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ! (wt = (ForkJoinWorkerThread)t).pool. ! helpComplete(wt.workQueue, this, maxTasks); else ! ForkJoinPool.common.externalHelpComplete(this, maxTasks); ! } } /** * Supports ForkJoinTask exception propagation. */ ! void internalPropagateException(Throwable ex) { ! CountedCompleter<?> a = this, s = a; ! while (a.onExceptionalCompletion(ex, s) && ! (a = (s = a).completer) != null && a.status >= 0 && ! isExceptionalStatus(a.recordExceptionalCompletion(ex))) ! ; } /** * Implements execution conventions for CountedCompleters. */ protected final boolean exec() { compute(); return false; } --- 721,757 ---- * @param maxTasks the maximum number of tasks to process. If * less than or equal to zero, then no tasks are * processed. */ public final void helpComplete(int maxTasks) { ! ForkJoinPool.WorkQueue q; Thread t; boolean owned; ! if (owned = (t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ! q = ((ForkJoinWorkerThread)t).workQueue; else ! q = ForkJoinPool.commonQueue(); ! if (q != null && maxTasks > 0) ! q.helpComplete(this, owned, maxTasks); } + // ForkJoinTask overrides + /** * Supports ForkJoinTask exception propagation. */ ! @Override ! final int trySetException(Throwable ex) { ! CountedCompleter<?> a = this, p = a; ! do {} while (isExceptionalStatus(a.trySetThrown(ex)) && ! a.onExceptionalCompletion(ex, p) && ! (a = (p = a).completer) != null && a.status >= 0); ! return status; } /** * Implements execution conventions for CountedCompleters. */ + @Override protected final boolean exec() { compute(); return false; }
*** 754,772 **** --- 762,782 ---- * always to return a field or function of a field that * holds the result upon completion. * * @return the result of the computation */ + @Override public T getRawResult() { return null; } /** * A method that result-bearing CountedCompleters may optionally * use to help maintain result data. By default, does nothing. * Overrides are not recommended. However, if this method is * overridden to update existing objects or fields, then it must * in general be defined to be thread-safe. */ + @Override protected void setRawResult(T t) { } // VarHandle mechanics private static final VarHandle PENDING; static {
< prev index next >