< prev index next >
src/java.base/share/classes/java/util/concurrent/CountedCompleter.java
Print this page
8246585: ForkJoin updates
Reviewed-by: martin
@@ -355,11 +355,11 @@
* actions inside {@code onCompletion} occur (once) upon completion of
* a task and its subtasks. No additional synchronization is required
* within this method to ensure thread safety of accesses to fields of
* this task or other completed tasks.
*
- * <p><b>Completion Traversals</b>. If using {@code onCompletion} to
+ * <p><b>Completion Traversals.</b> If using {@code onCompletion} to
* process completions is inapplicable or inconvenient, you can use
* methods {@link #firstComplete} and {@link #nextComplete} to create
* custom traversals. For example, to define a MapReducer that only
* splits out right-hand tasks in the form of the third ForEach
* example, the completions must cooperatively reduce along
@@ -551,20 +551,25 @@
*/
public final boolean compareAndSetPendingCount(int expected, int count) {
return PENDING.compareAndSet(this, expected, count);
}
+ // internal-only weak version
+ final boolean weakCompareAndSetPendingCount(int expected, int count) {
+ return PENDING.weakCompareAndSet(this, expected, count);
+ }
+
/**
* If the pending count is nonzero, (atomically) decrements it.
*
* @return the initial (undecremented) pending count holding on entry
* to this method
*/
public final int decrementPendingCountUnlessZero() {
int c;
do {} while ((c = pending) != 0 &&
- !PENDING.weakCompareAndSet(this, c, c - 1));
+ !weakCompareAndSetPendingCount(c, c - 1));
return c;
}
/**
* Returns the root of the current computation; i.e., this
@@ -593,11 +598,11 @@
if ((a = (s = a).completer) == null) {
s.quietlyComplete();
return;
}
}
- else if (PENDING.weakCompareAndSet(a, c, c - 1))
+ else if (a.weakCompareAndSetPendingCount(c, c - 1))
return;
}
}
/**
@@ -616,11 +621,11 @@
if ((a = (s = a).completer) == null) {
s.quietlyComplete();
return;
}
}
- else if (PENDING.weakCompareAndSet(a, c, c - 1))
+ else if (a.weakCompareAndSetPendingCount(c, c - 1))
return;
}
}
/**
@@ -661,11 +666,11 @@
*/
public final CountedCompleter<?> firstComplete() {
for (int c;;) {
if ((c = pending) == 0)
return this;
- else if (PENDING.weakCompareAndSet(this, c, c - 1))
+ else if (weakCompareAndSetPendingCount(c, c - 1))
return null;
}
}
/**
@@ -716,34 +721,37 @@
* @param maxTasks the maximum number of tasks to process. If
* less than or equal to zero, then no tasks are
* processed.
*/
public final void helpComplete(int maxTasks) {
- Thread t; ForkJoinWorkerThread wt;
- if (maxTasks > 0 && status >= 0) {
- if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
- (wt = (ForkJoinWorkerThread)t).pool.
- helpComplete(wt.workQueue, this, maxTasks);
+ ForkJoinPool.WorkQueue q; Thread t; boolean owned;
+ if (owned = (t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
+ q = ((ForkJoinWorkerThread)t).workQueue;
else
- ForkJoinPool.common.externalHelpComplete(this, maxTasks);
- }
+ q = ForkJoinPool.commonQueue();
+ if (q != null && maxTasks > 0)
+ q.helpComplete(this, owned, maxTasks);
}
+ // ForkJoinTask overrides
+
/**
* Supports ForkJoinTask exception propagation.
*/
- void internalPropagateException(Throwable ex) {
- CountedCompleter<?> a = this, s = a;
- while (a.onExceptionalCompletion(ex, s) &&
- (a = (s = a).completer) != null && a.status >= 0 &&
- isExceptionalStatus(a.recordExceptionalCompletion(ex)))
- ;
+ @Override
+ final int trySetException(Throwable ex) {
+ CountedCompleter<?> a = this, p = a;
+ do {} while (isExceptionalStatus(a.trySetThrown(ex)) &&
+ a.onExceptionalCompletion(ex, p) &&
+ (a = (p = a).completer) != null && a.status >= 0);
+ return status;
}
/**
* Implements execution conventions for CountedCompleters.
*/
+ @Override
protected final boolean exec() {
compute();
return false;
}
@@ -754,19 +762,21 @@
* always to return a field or function of a field that
* holds the result upon completion.
*
* @return the result of the computation
*/
+ @Override
public T getRawResult() { return null; }
/**
* A method that result-bearing CountedCompleters may optionally
* use to help maintain result data. By default, does nothing.
* Overrides are not recommended. However, if this method is
* overridden to update existing objects or fields, then it must
* in general be defined to be thread-safe.
*/
+ @Override
protected void setRawResult(T t) { }
// VarHandle mechanics
private static final VarHandle PENDING;
static {
< prev index next >