src/share/classes/java/util/concurrent/ForkJoinWorkerThread.java

Print this page

        

@@ -36,20 +36,22 @@
 package java.util.concurrent;
 
 import java.util.Random;
 import java.util.Collection;
 import java.util.concurrent.locks.LockSupport;
+import java.util.concurrent.RejectedExecutionException;
 
 /**
- * A thread managed by a {@link ForkJoinPool}.  This class is
- * subclassable solely for the sake of adding functionality -- there
- * are no overridable methods dealing with scheduling or execution.
- * However, you can override initialization and termination methods
- * surrounding the main task processing loop.  If you do create such a
- * subclass, you will also need to supply a custom {@link
- * ForkJoinPool.ForkJoinWorkerThreadFactory} to use it in a {@code
- * ForkJoinPool}.
+ * A thread managed by a {@link ForkJoinPool}, which executes
+ * {@link ForkJoinTask}s.
+ * This class is subclassable solely for the sake of adding
+ * functionality -- there are no overridable methods dealing with
+ * scheduling or execution.  However, you can override initialization
+ * and termination methods surrounding the main task processing loop.
+ * If you do create such a subclass, you will also need to supply a
+ * custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to use it
+ * in a {@code ForkJoinPool}.
  *
  * @since 1.7
  * @author Doug Lea
  */
 public class ForkJoinWorkerThread extends Thread {

@@ -374,19 +376,19 @@
     }
 
     /**
      * Initializes internal state after construction but before
      * processing any tasks. If you override this method, you must
-     * invoke @code{super.onStart()} at the beginning of the method.
+     * invoke {@code super.onStart()} at the beginning of the method.
      * Initialization requires care: Most fields must have legal
      * default values, to ensure that attempted accesses from other
      * threads work correctly even before this thread starts
      * processing tasks.
      */
     protected void onStart() {
         int rs = seedGenerator.nextInt();
-        seed = rs == 0? 1 : rs; // seed must be nonzero
+        seed = (rs == 0) ? 1 : rs; // seed must be nonzero
 
         // Allocate name string and arrays in this thread
         String pid = Integer.toString(pool.getPoolNumber());
         String wid = Integer.toString(poolIndex);
         setName("ForkJoinPool-" + pid + "-worker-" + wid);

@@ -424,11 +426,11 @@
     }
 
     /**
      * This method is required to be public, but should never be
      * called explicitly. It performs the main run loop to execute
-     * ForkJoinTasks.
+     * {@link ForkJoinTask}s.
      */
     public void run() {
         Throwable exception = null;
         try {
             onStart();

@@ -626,10 +628,23 @@
                 long u = (i << qShift) + qBase; // raw offset
                 ForkJoinTask<?> t = q[i];
                 if (t == null)   // lost to stealer
                     break;
                 if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
+                    /*
+                     * Note: here and in related methods, as a
+                     * performance (not correctness) issue, we'd like
+                     * to encourage compiler not to arbitrarily
+                     * postpone setting sp after successful CAS.
+                     * Currently there is no intrinsic for arranging
+                     * this, but using Unsafe putOrderedInt may be a
+                     * preferable strategy on some compilers even
+                     * though its main effect is a pre-, not post-
+                     * fence. To simplify possible changes, the option
+                     * is left in comments next to the associated
+                     * assignments.
+                     */
                     sp = s; // putOrderedInt may encourage more timely write
                     // UNSAFE.putOrderedInt(this, spOffset, s);
                     return t;
                 }
             }

@@ -882,23 +897,20 @@
      * Removes and cancels all tasks in queue.  Can be called from any
      * thread.
      */
     final void cancelTasks() {
         ForkJoinTask<?> cj = currentJoin; // try to cancel ongoing tasks
-        if (cj != null) {
-            currentJoin = null;
+        if (cj != null && cj.status >= 0) {
             cj.cancelIgnoringExceptions();
             try {
                 this.interrupt(); // awaken wait
             } catch (SecurityException ignore) {
             }
         }
         ForkJoinTask<?> cs = currentSteal;
-        if (cs != null) {
-            currentSteal = null;
+        if (cs != null && cs.status >= 0)
             cs.cancelIgnoringExceptions();
-        }
         while (base != sp) {
             ForkJoinTask<?> t = deqTask();
             if (t != null)
                 t.cancelIgnoringExceptions();
         }

@@ -957,142 +969,158 @@
 
     /**
      * Possibly runs some tasks and/or blocks, until task is done.
      *
      * @param joinMe the task to join
+     * @param timed true if use timed wait
+     * @param nanos wait time if timed
      */
-    final void joinTask(ForkJoinTask<?> joinMe) {
+    final void joinTask(ForkJoinTask<?> joinMe, boolean timed, long nanos) {
         // currentJoin only written by this thread; only need ordered store
         ForkJoinTask<?> prevJoin = currentJoin;
         UNSAFE.putOrderedObject(this, currentJoinOffset, joinMe);
-        if (sp != base)
-            localHelpJoinTask(joinMe);
-        if (joinMe.status >= 0)
-            pool.awaitJoin(joinMe, this);
+        pool.awaitJoin(joinMe, this, timed, nanos);
         UNSAFE.putOrderedObject(this, currentJoinOffset, prevJoin);
     }
 
     /**
-     * Run tasks in local queue until given task is done.
+     * Tries to locate and help perform tasks for a stealer of the
+     * given task, or in turn one of its stealers.  Traces
+     * currentSteal->currentJoin links looking for a thread working on
+     * a descendant of the given task and with a non-empty queue to
+     * steal back and execute tasks from.
      *
+     * The implementation is very branchy to cope with potential
+     * inconsistencies or loops encountering chains that are stale,
+     * unknown, or of length greater than MAX_HELP_DEPTH links.  All
+     * of these cases are dealt with by just returning back to the
+     * caller, who is expected to retry if other join mechanisms also
+     * don't work out.
+     *
      * @param joinMe the task to join
+     * @param running if false, then must update pool count upon
+     *  running a task
+     * @return value of running on exit
      */
-    private void localHelpJoinTask(ForkJoinTask<?> joinMe) {
-        int s;
+    final boolean helpJoinTask(ForkJoinTask<?> joinMe, boolean running) {
+        /*
+         * Initial checks to (1) abort if terminating; (2) clean out
+         * old cancelled tasks from local queue; (3) if joinMe is next
+         * task, run it; (4) omit scan if local queue nonempty (since
+         * it may contain non-descendents of joinMe).
+         */
+        ForkJoinPool p = pool;
+        for (;;) {
         ForkJoinTask<?>[] q;
-        while (joinMe.status >= 0 && (s = sp) != base && (q = queue) != null) {
+            int s;
+            if (joinMe.status < 0)
+                return running;
+            else if ((runState & TERMINATING) != 0) {
+                joinMe.cancelIgnoringExceptions();
+                return running;
+            }
+            else if ((s = sp) == base || (q = queue) == null)
+                break;                            // queue empty
+            else {
             int i = (q.length - 1) & --s;
             long u = (i << qShift) + qBase; // raw offset
             ForkJoinTask<?> t = q[i];
-            if (t == null)  // lost to a stealer
-                break;
-            if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
-                /*
-                 * This recheck (and similarly in helpJoinTask)
-                 * handles cases where joinMe is independently
-                 * cancelled or forced even though there is other work
-                 * available. Back out of the pop by putting t back
-                 * into slot before we commit by writing sp.
-                 */
-                if (joinMe.status < 0) {
-                    UNSAFE.putObjectVolatile(q, u, t);
-                    break;
-                }
-                sp = s;
+                if (t == null)
+                    break;                        // lost to a stealer
+                else if (t != joinMe && t.status >= 0)
+                    return running;               // cannot safely help
+                else if ((running ||
+                          (running = p.tryIncrementRunningCount())) &&
+                         UNSAFE.compareAndSwapObject(q, u, t, null)) {
+                    sp = s; // putOrderedInt may encourage more timely write
                 // UNSAFE.putOrderedInt(this, spOffset, s);
                 t.quietlyExec();
             }
         }
     }
 
-    /**
-     * Unless terminating, tries to locate and help perform tasks for
-     * a stealer of the given task, or in turn one of its stealers.
-     * Traces currentSteal->currentJoin links looking for a thread
-     * working on a descendant of the given task and with a non-empty
-     * queue to steal back and execute tasks from.
-     *
-     * The implementation is very branchy to cope with potential
-     * inconsistencies or loops encountering chains that are stale,
-     * unknown, or of length greater than MAX_HELP_DEPTH links.  All
-     * of these cases are dealt with by just returning back to the
-     * caller, who is expected to retry if other join mechanisms also
-     * don't work out.
-     *
-     * @param joinMe the task to join
-     */
-    final void helpJoinTask(ForkJoinTask<?> joinMe) {
-        ForkJoinWorkerThread[] ws;
-        int n;
-        if (joinMe.status < 0)                // already done
-            return;
-        if ((runState & TERMINATING) != 0) {  // cancel if shutting down
-            joinMe.cancelIgnoringExceptions();
-            return;
-        }
-        if ((ws = pool.workers) == null || (n = ws.length) <= 1)
-            return;                           // need at least 2 workers
-
+        int n;                                    // worker array size
+        ForkJoinWorkerThread[] ws = p.workers;
+        if (ws != null && (n = ws.length) > 1) {  // need at least 2 workers
         ForkJoinTask<?> task = joinMe;        // base of chain
         ForkJoinWorkerThread thread = this;   // thread with stolen task
-        for (int d = 0; d < MAX_HELP_DEPTH; ++d) { // chain length
+
+            outer:for (int d = 0; d < MAX_HELP_DEPTH; ++d) { // chain length
             // Try to find v, the stealer of task, by first using hint
             ForkJoinWorkerThread v = ws[thread.stealHint & (n - 1)];
             if (v == null || v.currentSteal != task) {
                 for (int j = 0; ; ++j) {      // search array
                     if (j < n) {
                         ForkJoinTask<?> vs;
                         if ((v = ws[j]) != null &&
                             (vs = v.currentSteal) != null) {
-                            if (joinMe.status < 0 || task.status < 0)
-                                return;       // stale or done
+                                if (joinMe.status < 0)
+                                    break outer;
                             if (vs == task) {
+                                    if (task.status < 0)
+                                        break outer; // stale
                                 thread.stealHint = j;
                                 break;        // save hint for next time
                             }
                         }
                     }
                     else
-                        return;               // no stealer
+                            break outer;          // no stealer
                 }
             }
-            for (;;) { // Try to help v, using specialized form of deqTask
+
+                // Try to help v, using specialized form of deqTask
+                for (;;) {
                 if (joinMe.status < 0)
-                    return;
+                        break outer;
                 int b = v.base;
                 ForkJoinTask<?>[] q = v.queue;
                 if (b == v.sp || q == null)
-                    break;
+                        break;                    // empty
                 int i = (q.length - 1) & b;
                 long u = (i << qShift) + qBase;
                 ForkJoinTask<?> t = q[i];
-                int pid = poolIndex;
-                ForkJoinTask<?> ps = currentSteal;
                 if (task.status < 0)
-                    return;                   // stale or done
-                if (t != null && v.base == b++ &&
+                        break outer;              // stale
+                    if (t != null &&
+                        (running ||
+                         (running = p.tryIncrementRunningCount())) &&
+                        v.base == b++ &&
                     UNSAFE.compareAndSwapObject(q, u, t, null)) {
-                    if (joinMe.status < 0) {
+                        if (t != joinMe && joinMe.status < 0) {
                         UNSAFE.putObjectVolatile(q, u, t);
-                        return;               // back out on cancel
+                            break outer;          // joinMe cancelled; back out
                     }
                     v.base = b;
+                        if (t.status >= 0) {
+                            ForkJoinTask<?> ps = currentSteal;
+                            int pid = poolIndex;
                     v.stealHint = pid;
-                    UNSAFE.putOrderedObject(this, currentStealOffset, t);
+                            UNSAFE.putOrderedObject(this,
+                                                    currentStealOffset, t);
                     t.quietlyExec();
-                    UNSAFE.putOrderedObject(this, currentStealOffset, ps);
+                            UNSAFE.putOrderedObject(this,
+                                                    currentStealOffset, ps);
                 }
             }
+                    else if ((runState & TERMINATING) != 0) {
+                        joinMe.cancelIgnoringExceptions();
+                        break outer;
+                    }
+                }
+
             // Try to descend to find v's stealer
             ForkJoinTask<?> next = v.currentJoin;
             if (task.status < 0 || next == null || next == task ||
                 joinMe.status < 0)
-                return;
+                    break;                 // done, stale, dead-end, or cyclic
             task = next;
             thread = v;
         }
     }
+        return running;
+    }
 
     /**
      * Implements ForkJoinTask.getSurplusQueuedTaskCount().
      * Returns an estimate of the number of tasks, offset by a
      * function of number of idle workers.