< prev index next >

src/java.base/share/classes/java/util/concurrent/CountedCompleter.java

Print this page
8246585: ForkJoin updates
Reviewed-by: martin

@@ -355,11 +355,11 @@
  * actions inside {@code onCompletion} occur (once) upon completion of
  * a task and its subtasks. No additional synchronization is required
  * within this method to ensure thread safety of accesses to fields of
  * this task or other completed tasks.
  *
- * <p><b>Completion Traversals</b>. If using {@code onCompletion} to
+ * <p><b>Completion Traversals.</b> If using {@code onCompletion} to
  * process completions is inapplicable or inconvenient, you can use
  * methods {@link #firstComplete} and {@link #nextComplete} to create
  * custom traversals.  For example, to define a MapReducer that only
  * splits out right-hand tasks in the form of the third ForEach
  * example, the completions must cooperatively reduce along

@@ -551,20 +551,25 @@
      */
     public final boolean compareAndSetPendingCount(int expected, int count) {
         return PENDING.compareAndSet(this, expected, count);
     }
 
+    // internal-only weak version
+    final boolean weakCompareAndSetPendingCount(int expected, int count) {
+        return PENDING.weakCompareAndSet(this, expected, count);
+    }
+
     /**
      * If the pending count is nonzero, (atomically) decrements it.
      *
      * @return the initial (undecremented) pending count holding on entry
      * to this method
      */
     public final int decrementPendingCountUnlessZero() {
         int c;
         do {} while ((c = pending) != 0 &&
-                     !PENDING.weakCompareAndSet(this, c, c - 1));
+                     !weakCompareAndSetPendingCount(c, c - 1));
         return c;
     }
 
     /**
      * Returns the root of the current computation; i.e., this

@@ -593,11 +598,11 @@
                 if ((a = (s = a).completer) == null) {
                     s.quietlyComplete();
                     return;
                 }
             }
-            else if (PENDING.weakCompareAndSet(a, c, c - 1))
+            else if (a.weakCompareAndSetPendingCount(c, c - 1))
                 return;
         }
     }
 
     /**

@@ -616,11 +621,11 @@
                 if ((a = (s = a).completer) == null) {
                     s.quietlyComplete();
                     return;
                 }
             }
-            else if (PENDING.weakCompareAndSet(a, c, c - 1))
+            else if (a.weakCompareAndSetPendingCount(c, c - 1))
                 return;
         }
     }
 
     /**

@@ -661,11 +666,11 @@
      */
     public final CountedCompleter<?> firstComplete() {
         for (int c;;) {
             if ((c = pending) == 0)
                 return this;
-            else if (PENDING.weakCompareAndSet(this, c, c - 1))
+            else if (weakCompareAndSetPendingCount(c, c - 1))
                 return null;
         }
     }
 
     /**

@@ -716,34 +721,37 @@
      * @param maxTasks the maximum number of tasks to process.  If
      *                 less than or equal to zero, then no tasks are
      *                 processed.
      */
     public final void helpComplete(int maxTasks) {
-        Thread t; ForkJoinWorkerThread wt;
-        if (maxTasks > 0 && status >= 0) {
-            if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
-                (wt = (ForkJoinWorkerThread)t).pool.
-                    helpComplete(wt.workQueue, this, maxTasks);
+        ForkJoinPool.WorkQueue q; Thread t; boolean owned;
+        if (owned = (t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
+            q = ((ForkJoinWorkerThread)t).workQueue;
             else
-                ForkJoinPool.common.externalHelpComplete(this, maxTasks);
-        }
+            q = ForkJoinPool.commonQueue();
+        if (q != null && maxTasks > 0)
+            q.helpComplete(this, owned, maxTasks);
     }
 
+    // ForkJoinTask overrides
+
     /**
      * Supports ForkJoinTask exception propagation.
      */
-    void internalPropagateException(Throwable ex) {
-        CountedCompleter<?> a = this, s = a;
-        while (a.onExceptionalCompletion(ex, s) &&
-               (a = (s = a).completer) != null && a.status >= 0 &&
-               isExceptionalStatus(a.recordExceptionalCompletion(ex)))
-            ;
+    @Override
+    final int trySetException(Throwable ex) {
+        CountedCompleter<?> a = this, p = a;
+        do {} while (isExceptionalStatus(a.trySetThrown(ex)) &&
+                     a.onExceptionalCompletion(ex, p) &&
+                     (a = (p = a).completer) != null && a.status >= 0);
+        return status;
     }
 
     /**
      * Implements execution conventions for CountedCompleters.
      */
+    @Override
     protected final boolean exec() {
         compute();
         return false;
     }
 

@@ -754,19 +762,21 @@
      * always to return a field or function of a field that
      * holds the result upon completion.
      *
      * @return the result of the computation
      */
+    @Override
     public T getRawResult() { return null; }
 
     /**
      * A method that result-bearing CountedCompleters may optionally
      * use to help maintain result data.  By default, does nothing.
      * Overrides are not recommended. However, if this method is
      * overridden to update existing objects or fields, then it must
      * in general be defined to be thread-safe.
      */
+    @Override
     protected void setRawResult(T t) { }
 
     // VarHandle mechanics
     private static final VarHandle PENDING;
     static {
< prev index next >