< prev index next >
src/java.base/share/classes/java/util/concurrent/CountedCompleter.java
Print this page
8246585: ForkJoin updates
Reviewed-by: martin
*** 355,365 ****
* actions inside {@code onCompletion} occur (once) upon completion of
* a task and its subtasks. No additional synchronization is required
* within this method to ensure thread safety of accesses to fields of
* this task or other completed tasks.
*
! * <p><b>Completion Traversals</b>. If using {@code onCompletion} to
* process completions is inapplicable or inconvenient, you can use
* methods {@link #firstComplete} and {@link #nextComplete} to create
* custom traversals. For example, to define a MapReducer that only
* splits out right-hand tasks in the form of the third ForEach
* example, the completions must cooperatively reduce along
--- 355,365 ----
* actions inside {@code onCompletion} occur (once) upon completion of
* a task and its subtasks. No additional synchronization is required
* within this method to ensure thread safety of accesses to fields of
* this task or other completed tasks.
*
! * <p><b>Completion Traversals.</b> If using {@code onCompletion} to
* process completions is inapplicable or inconvenient, you can use
* methods {@link #firstComplete} and {@link #nextComplete} to create
* custom traversals. For example, to define a MapReducer that only
* splits out right-hand tasks in the form of the third ForEach
* example, the completions must cooperatively reduce along
*** 551,570 ****
*/
public final boolean compareAndSetPendingCount(int expected, int count) {
return PENDING.compareAndSet(this, expected, count);
}
/**
* If the pending count is nonzero, (atomically) decrements it.
*
* @return the initial (undecremented) pending count holding on entry
* to this method
*/
public final int decrementPendingCountUnlessZero() {
int c;
do {} while ((c = pending) != 0 &&
! !PENDING.weakCompareAndSet(this, c, c - 1));
return c;
}
/**
* Returns the root of the current computation; i.e., this
--- 551,575 ----
*/
public final boolean compareAndSetPendingCount(int expected, int count) {
return PENDING.compareAndSet(this, expected, count);
}
+ // internal-only weak version
+ final boolean weakCompareAndSetPendingCount(int expected, int count) {
+ return PENDING.weakCompareAndSet(this, expected, count);
+ }
+
/**
* If the pending count is nonzero, (atomically) decrements it.
*
* @return the initial (undecremented) pending count holding on entry
* to this method
*/
public final int decrementPendingCountUnlessZero() {
int c;
do {} while ((c = pending) != 0 &&
! !weakCompareAndSetPendingCount(c, c - 1));
return c;
}
/**
* Returns the root of the current computation; i.e., this
*** 593,603 ****
if ((a = (s = a).completer) == null) {
s.quietlyComplete();
return;
}
}
! else if (PENDING.weakCompareAndSet(a, c, c - 1))
return;
}
}
/**
--- 598,608 ----
if ((a = (s = a).completer) == null) {
s.quietlyComplete();
return;
}
}
! else if (a.weakCompareAndSetPendingCount(c, c - 1))
return;
}
}
/**
*** 616,626 ****
if ((a = (s = a).completer) == null) {
s.quietlyComplete();
return;
}
}
! else if (PENDING.weakCompareAndSet(a, c, c - 1))
return;
}
}
/**
--- 621,631 ----
if ((a = (s = a).completer) == null) {
s.quietlyComplete();
return;
}
}
! else if (a.weakCompareAndSetPendingCount(c, c - 1))
return;
}
}
/**
*** 661,671 ****
*/
public final CountedCompleter<?> firstComplete() {
for (int c;;) {
if ((c = pending) == 0)
return this;
! else if (PENDING.weakCompareAndSet(this, c, c - 1))
return null;
}
}
/**
--- 666,676 ----
*/
public final CountedCompleter<?> firstComplete() {
for (int c;;) {
if ((c = pending) == 0)
return this;
! else if (weakCompareAndSetPendingCount(c, c - 1))
return null;
}
}
/**
*** 716,749 ****
* @param maxTasks the maximum number of tasks to process. If
* less than or equal to zero, then no tasks are
* processed.
*/
public final void helpComplete(int maxTasks) {
! Thread t; ForkJoinWorkerThread wt;
! if (maxTasks > 0 && status >= 0) {
! if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
! (wt = (ForkJoinWorkerThread)t).pool.
! helpComplete(wt.workQueue, this, maxTasks);
else
! ForkJoinPool.common.externalHelpComplete(this, maxTasks);
! }
}
/**
* Supports ForkJoinTask exception propagation.
*/
! void internalPropagateException(Throwable ex) {
! CountedCompleter<?> a = this, s = a;
! while (a.onExceptionalCompletion(ex, s) &&
! (a = (s = a).completer) != null && a.status >= 0 &&
! isExceptionalStatus(a.recordExceptionalCompletion(ex)))
! ;
}
/**
* Implements execution conventions for CountedCompleters.
*/
protected final boolean exec() {
compute();
return false;
}
--- 721,757 ----
* @param maxTasks the maximum number of tasks to process. If
* less than or equal to zero, then no tasks are
* processed.
*/
public final void helpComplete(int maxTasks) {
! ForkJoinPool.WorkQueue q; Thread t; boolean owned;
! if (owned = (t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
! q = ((ForkJoinWorkerThread)t).workQueue;
else
! q = ForkJoinPool.commonQueue();
! if (q != null && maxTasks > 0)
! q.helpComplete(this, owned, maxTasks);
}
+ // ForkJoinTask overrides
+
/**
* Supports ForkJoinTask exception propagation.
*/
! @Override
! final int trySetException(Throwable ex) {
! CountedCompleter<?> a = this, p = a;
! do {} while (isExceptionalStatus(a.trySetThrown(ex)) &&
! a.onExceptionalCompletion(ex, p) &&
! (a = (p = a).completer) != null && a.status >= 0);
! return status;
}
/**
* Implements execution conventions for CountedCompleters.
*/
+ @Override
protected final boolean exec() {
compute();
return false;
}
*** 754,772 ****
--- 762,782 ----
* always to return a field or function of a field that
* holds the result upon completion.
*
* @return the result of the computation
*/
+ @Override
public T getRawResult() { return null; }
/**
* A method that result-bearing CountedCompleters may optionally
* use to help maintain result data. By default, does nothing.
* Overrides are not recommended. However, if this method is
* overridden to update existing objects or fields, then it must
* in general be defined to be thread-safe.
*/
+ @Override
protected void setRawResult(T t) { }
// VarHandle mechanics
private static final VarHandle PENDING;
static {
< prev index next >