src/share/classes/java/util/concurrent/ForkJoinTask.java

Print this page

        

@@ -40,10 +40,20 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.RandomAccess;
 import java.util.Map;
 import java.util.WeakHashMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 /**
  * Abstract base class for tasks that run within a {@link ForkJoinPool}.
  * A {@code ForkJoinTask} is a thread-like entity that is much
  * lighter weight than a normal thread.  Huge numbers of tasks and

@@ -127,10 +137,20 @@
  * ForkJoinTasks (as may be determined using method {@link
  * #inForkJoinPool}).  Attempts to invoke them in other contexts
  * result in exceptions or errors, possibly including
  * {@code ClassCastException}.
  *
+ * <p>Method {@link #join} and its variants are appropriate for use
+ * only when completion dependencies are acyclic; that is, the
+ * parallel computation can be described as a directed acyclic graph
+ * (DAG). Otherwise, executions may encounter a form of deadlock as
+ * tasks cyclically wait for each other.  However, this framework
+ * supports other methods and techniques (for example the use of
+ * {@link Phaser}, {@link #helpQuiesce}, and {@link #complete}) that
+ * may be of use in constructing custom subclasses for problems that
+ * are not statically structured as DAGs.
+ *
  * <p>Most base support methods are {@code final}, to prevent
  * overriding of implementations that are intrinsically tied to the
  * underlying lightweight task scheduling framework.  Developers
  * creating new basic styles of fork/join processing should minimally
  * implement {@code protected} methods {@link #exec}, {@link

@@ -141,13 +161,14 @@
  *
  * <p>ForkJoinTasks should perform relatively small amounts of
  * computation. Large tasks should be split into smaller subtasks,
  * usually via recursive decomposition. As a very rough rule of thumb,
  * a task should perform more than 100 and less than 10000 basic
- * computational steps. If tasks are too big, then parallelism cannot
- * improve throughput. If too small, then memory and internal task
- * maintenance overhead may overwhelm processing.
+ * computational steps, and should avoid indefinite looping. If tasks
+ * are too big, then parallelism cannot improve throughput. If too
+ * small, then memory and internal task maintenance overhead may
+ * overwhelm processing.
  *
  * <p>This class provides {@code adapt} methods for {@link Runnable}
  * and {@link Callable}, that may be of use when mixing execution of
  * {@code ForkJoinTasks} with other kinds of tasks. When all tasks are
  * of this form, consider using a pool constructed in <em>asyncMode</em>.

@@ -240,73 +261,91 @@
         exceptionMap.put(this, rex);
         setCompletion(EXCEPTIONAL);
     }
 
     /**
-     * Blocks a worker thread until completion. Called only by
-     * pool. Currently unused -- pool-based waits use timeout
-     * version below.
-     */
-    final void internalAwaitDone() {
-        int s;         // the odd construction reduces lock bias effects
-        while ((s = status) >= 0) {
-            try {
-                synchronized (this) {
-                    if (UNSAFE.compareAndSwapInt(this, statusOffset, s,SIGNAL))
-                        wait();
-                }
-            } catch (InterruptedException ie) {
-                cancelIfTerminating();
-            }
-        }
-    }
-
-    /**
      * Blocks a worker thread until completed or timed out.  Called
      * only by pool.
-     *
-     * @return status on exit
      */
-    final int internalAwaitDone(long millis) {
-        int s;
-        if ((s = status) >= 0) {
-            try {
+    final void internalAwaitDone(long millis, int nanos) {
+        int s = status;
+        if ((s == 0 &&
+             UNSAFE.compareAndSwapInt(this, statusOffset, 0, SIGNAL)) ||
+            s > 0)  {
+            try {     // the odd construction reduces lock bias effects
                 synchronized (this) {
-                    if (UNSAFE.compareAndSwapInt(this, statusOffset, s,SIGNAL))
-                        wait(millis, 0);
+                    if (status > 0)
+                        wait(millis, nanos);
+                    else
+                        notifyAll();
                 }
             } catch (InterruptedException ie) {
                 cancelIfTerminating();
             }
-            s = status;
         }
-        return s;
     }
 
     /**
      * Blocks a non-worker-thread until completion.
      */
     private void externalAwaitDone() {
-        int s;
-        while ((s = status) >= 0) {
-            synchronized (this) {
-                if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)){
+        if (status >= 0) {
                     boolean interrupted = false;
-                    while (status >= 0) {
+            synchronized (this) {
+                for (;;) {
+                    int s = status;
+                    if (s == 0)
+                        UNSAFE.compareAndSwapInt(this, statusOffset,
+                                                 0, SIGNAL);
+                    else if (s < 0) {
+                        notifyAll();
+                        break;
+                    }
+                    else {
                         try {
                             wait();
                         } catch (InterruptedException ie) {
                             interrupted = true;
                         }
                     }
+                }
+            }
                     if (interrupted)
                         Thread.currentThread().interrupt();
+        }
+    }
+
+    /**
+     * Blocks a non-worker-thread until completion or interruption or timeout.
+     */
+    private void externalInterruptibleAwaitDone(boolean timed, long nanos)
+        throws InterruptedException {
+        if (Thread.interrupted())
+            throw new InterruptedException();
+        if (status >= 0) {
+            long startTime = timed ? System.nanoTime() : 0L;
+            synchronized (this) {
+                for (;;) {
+                    long nt;
+                    int s = status;
+                    if (s == 0)
+                        UNSAFE.compareAndSwapInt(this, statusOffset,
+                                                 0, SIGNAL);
+                    else if (s < 0) {
+                        notifyAll();
                     break;
                 }
+                    else if (!timed)
+                        wait();
+                    else if ((nt = nanos - (System.nanoTime()-startTime)) > 0L)
+                        wait(nt / 1000000, (int)(nt % 1000000));
+                    else
+                        break;
             }
         }
     }
+    }
 
     /**
      * Unless done, calls exec and records status if completed, but
      * doesn't wait for completion otherwise. Primary execution method
      * for ForkJoinWorkerThread.

@@ -333,11 +372,11 @@
      * any thread other than the one executing it unless preceded by a
      * call to {@link #join} or related methods, or a call to {@link
      * #isDone} returning {@code true}.
      *
      * <p>This method may be invoked only from within {@code
-     * ForkJoinTask} computations (as may be determined using method
+     * ForkJoinPool} computations (as may be determined using method
      * {@link #inForkJoinPool}).  Attempts to invoke in other contexts
      * result in exceptions or errors, possibly including {@code
      * ClassCastException}.
      *
      * @return {@code this}, to simplify usage

@@ -347,14 +386,17 @@
             .pushTask(this);
         return this;
     }
 
     /**
-     * Returns the result of the computation when it {@link #isDone is done}.
-     * This method differs from {@link #get()} in that
+     * Returns the result of the computation when it {@link #isDone is
+     * done}.  This method differs from {@link #get()} in that
      * abnormal completion results in {@code RuntimeException} or
-     * {@code Error}, not {@code ExecutionException}.
+     * {@code Error}, not {@code ExecutionException}, and that
+     * interrupts of the calling thread do <em>not</em> cause the
+     * method to abruptly return by throwing {@code
+     * InterruptedException}.
      *
      * @return the computed result
      */
     public final V join() {
         quietlyJoin();

@@ -392,11 +434,11 @@
      * #getException()} and related methods to check if they have been
      * cancelled, completed normally or exceptionally, or left
      * unprocessed.
      *
      * <p>This method may be invoked only from within {@code
-     * ForkJoinTask} computations (as may be determined using method
+     * ForkJoinPool} computations (as may be determined using method
      * {@link #inForkJoinPool}).  Attempts to invoke in other contexts
      * result in exceptions or errors, possibly including {@code
      * ClassCastException}.
      *
      * @param t1 the first task

@@ -420,11 +462,11 @@
      * each task may be obtained using {@link #getException()} and
      * related methods to check if they have been cancelled, completed
      * normally or exceptionally, or left unprocessed.
      *
      * <p>This method may be invoked only from within {@code
-     * ForkJoinTask} computations (as may be determined using method
+     * ForkJoinPool} computations (as may be determined using method
      * {@link #inForkJoinPool}).  Attempts to invoke in other contexts
      * result in exceptions or errors, possibly including {@code
      * ClassCastException}.
      *
      * @param tasks the tasks

@@ -475,11 +517,11 @@
      * #getException()} and related methods to check if they have been
      * cancelled, completed normally or exceptionally, or left
      * unprocessed.
      *
      * <p>This method may be invoked only from within {@code
-     * ForkJoinTask} computations (as may be determined using method
+     * ForkJoinPool} computations (as may be determined using method
      * {@link #inForkJoinPool}).  Attempts to invoke in other contexts
      * result in exceptions or errors, possibly including {@code
      * ClassCastException}.
      *
      * @param tasks the collection of tasks

@@ -527,29 +569,32 @@
         return tasks;
     }
 
     /**
      * Attempts to cancel execution of this task. This attempt will
-     * fail if the task has already completed, has already been
-     * cancelled, or could not be cancelled for some other reason. If
-     * successful, and this task has not started when cancel is
-     * called, execution of this task is suppressed, {@link
-     * #isCancelled} will report true, and {@link #join} will result
-     * in a {@code CancellationException} being thrown.
+     * fail if the task has already completed or could not be
+     * cancelled for some other reason. If successful, and this task
+     * has not started when {@code cancel} is called, execution of
+     * this task is suppressed. After this method returns
+     * successfully, unless there is an intervening call to {@link
+     * #reinitialize}, subsequent calls to {@link #isCancelled},
+     * {@link #isDone}, and {@code cancel} will return {@code true}
+     * and calls to {@link #join} and related methods will result in
+     * {@code CancellationException}.
      *
      * <p>This method may be overridden in subclasses, but if so, must
-     * still ensure that these minimal properties hold. In particular,
-     * the {@code cancel} method itself must not throw exceptions.
+     * still ensure that these properties hold. In particular, the
+     * {@code cancel} method itself must not throw exceptions.
      *
      * <p>This method is designed to be invoked by <em>other</em>
      * tasks. To terminate the current task, you can just return or
      * throw an unchecked exception from its computation method, or
      * invoke {@link #completeExceptionally}.
      *
-     * @param mayInterruptIfRunning this value is ignored in the
-     * default implementation because tasks are not
-     * cancelled via interruption
+     * @param mayInterruptIfRunning this value has no effect in the
+     * default implementation because interrupts are not used to
+     * control cancellation.
      *
      * @return {@code true} if this task is now cancelled
      */
     public boolean cancel(boolean mayInterruptIfRunning) {
         setCompletion(CANCELLED);

@@ -679,27 +724,17 @@
      * exception
      * @throws InterruptedException if the current thread is not a
      * member of a ForkJoinPool and was interrupted while waiting
      */
     public final V get() throws InterruptedException, ExecutionException {
-        int s;
-        if (Thread.currentThread() instanceof ForkJoinWorkerThread) {
+        Thread t = Thread.currentThread();
+        if (t instanceof ForkJoinWorkerThread)
             quietlyJoin();
-            s = status;
-        }
-        else {
-            while ((s = status) >= 0) {
-                synchronized (this) { // interruptible form of awaitDone
-                    if (UNSAFE.compareAndSwapInt(this, statusOffset,
-                                                 s, SIGNAL)) {
-                        while (status >= 0)
-                            wait();
-                    }
-                }
-            }
-        }
-        if (s < NORMAL) {
+        else
+            externalInterruptibleAwaitDone(false, 0L);
+        int s = status;
+        if (s != NORMAL) {
             Throwable ex;
             if (s == CANCELLED)
                 throw new CancellationException();
             if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
                 throw new ExecutionException(ex);

@@ -721,76 +756,22 @@
      * member of a ForkJoinPool and was interrupted while waiting
      * @throws TimeoutException if the wait timed out
      */
     public final V get(long timeout, TimeUnit unit)
         throws InterruptedException, ExecutionException, TimeoutException {
+        long nanos = unit.toNanos(timeout);
         Thread t = Thread.currentThread();
-        ForkJoinPool pool;
-        if (t instanceof ForkJoinWorkerThread) {
-            ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
-            if (status >= 0 && w.unpushTask(this))
-                quietlyExec();
-            pool = w.pool;
-        }
+        if (t instanceof ForkJoinWorkerThread)
+            ((ForkJoinWorkerThread)t).joinTask(this, true, nanos);
         else
-            pool = null;
-        /*
-         * Timed wait loop intermixes cases for FJ (pool != null) and
-         * non FJ threads. For FJ, decrement pool count but don't try
-         * for replacement; increment count on completion. For non-FJ,
-         * deal with interrupts. This is messy, but a little less so
-         * than is splitting the FJ and nonFJ cases.
-         */
-        boolean interrupted = false;
-        boolean dec = false; // true if pool count decremented
-        long nanos = unit.toNanos(timeout);
-        for (;;) {
-            if (pool == null && Thread.interrupted()) {
-                interrupted = true;
-                break;
-            }
+            externalInterruptibleAwaitDone(true, nanos);
             int s = status;
-            if (s < 0)
-                break;
-            if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)) {
-                long startTime = System.nanoTime();
-                long nt; // wait time
-                while (status >= 0 &&
-                       (nt = nanos - (System.nanoTime() - startTime)) > 0) {
-                    if (pool != null && !dec)
-                        dec = pool.tryDecrementRunningCount();
-                    else {
-                        long ms = nt / 1000000;
-                        int ns = (int) (nt % 1000000);
-                        try {
-                            synchronized (this) {
-                                if (status >= 0)
-                                    wait(ms, ns);
-                            }
-                        } catch (InterruptedException ie) {
-                            if (pool != null)
-                                cancelIfTerminating();
-                            else {
-                                interrupted = true;
-                                break;
-                            }
-                        }
-                    }
-                }
-                break;
-            }
-        }
-        if (pool != null && dec)
-            pool.incrementRunningCount();
-        if (interrupted)
-            throw new InterruptedException();
-        int es = status;
-        if (es != NORMAL) {
+        if (s != NORMAL) {
             Throwable ex;
-            if (es == CANCELLED)
+            if (s == CANCELLED)
                 throw new CancellationException();
-            if (es == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
+            if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
                 throw new ExecutionException(ex);
             throw new TimeoutException();
         }
         return getRawResult();
     }

@@ -817,11 +798,11 @@
                     if (completed) {
                         setCompletion(NORMAL);
                         return;
                     }
                 }
-                w.joinTask(this);
+                w.joinTask(this, false, 0L);
             }
         }
         else
             externalAwaitDone();
     }

@@ -853,11 +834,11 @@
      * be of use in designs in which many tasks are forked, but none
      * are explicitly joined, instead executing them until all are
      * processed.
      *
      * <p>This method may be invoked only from within {@code
-     * ForkJoinTask} computations (as may be determined using method
+     * ForkJoinPool} computations (as may be determined using method
      * {@link #inForkJoinPool}).  Attempts to invoke in other contexts
      * result in exceptions or errors, possibly including {@code
      * ClassCastException}.
      */
     public static void helpQuiesce() {

@@ -872,10 +853,16 @@
      * never been forked, or has been forked, then completed and all
      * outstanding joins of this task have also completed. Effects
      * under any other usage conditions are not guaranteed.
      * This method may be useful when executing
      * pre-constructed trees of subtasks in loops.
+     *
+     * <p>Upon completion of this method, {@code isDone()} reports
+     * {@code false}, and {@code getException()} reports {@code
+     * null}. However, the value returned by {@code getRawResult} is
+     * unaffected. To clear this value, you can invoke {@code
+     * setRawResult(null)}.
      */
     public void reinitialize() {
         if (status == EXCEPTIONAL)
             exceptionMap.remove(this);
         status = 0;

@@ -893,15 +880,16 @@
         return (t instanceof ForkJoinWorkerThread) ?
             ((ForkJoinWorkerThread) t).pool : null;
     }
 
     /**
-     * Returns {@code true} if the current thread is executing as a
-     * ForkJoinPool computation.
+     * Returns {@code true} if the current thread is a {@link
+     * ForkJoinWorkerThread} executing as a ForkJoinPool computation.
      *
-     * @return {@code true} if the current thread is executing as a
-     * ForkJoinPool computation, or false otherwise
+     * @return {@code true} if the current thread is a {@link
+     * ForkJoinWorkerThread} executing as a ForkJoinPool computation,
+     * or {@code false} otherwise
      */
     public static boolean inForkJoinPool() {
         return Thread.currentThread() instanceof ForkJoinWorkerThread;
     }
 

@@ -912,11 +900,11 @@
      * another thread.  This method may be useful when arranging
      * alternative local processing of tasks that could have been, but
      * were not, stolen.
      *
      * <p>This method may be invoked only from within {@code
-     * ForkJoinTask} computations (as may be determined using method
+     * ForkJoinPool} computations (as may be determined using method
      * {@link #inForkJoinPool}).  Attempts to invoke in other contexts
      * result in exceptions or errors, possibly including {@code
      * ClassCastException}.
      *
      * @return {@code true} if unforked

@@ -931,11 +919,11 @@
      * forked by the current worker thread but not yet executed. This
      * value may be useful for heuristic decisions about whether to
      * fork other tasks.
      *
      * <p>This method may be invoked only from within {@code
-     * ForkJoinTask} computations (as may be determined using method
+     * ForkJoinPool} computations (as may be determined using method
      * {@link #inForkJoinPool}).  Attempts to invoke in other contexts
      * result in exceptions or errors, possibly including {@code
      * ClassCastException}.
      *
      * @return the number of tasks

@@ -954,11 +942,11 @@
      * aim to maintain a small constant surplus (for example, 3) of
      * tasks, and to process computations locally if this threshold is
      * exceeded.
      *
      * <p>This method may be invoked only from within {@code
-     * ForkJoinTask} computations (as may be determined using method
+     * ForkJoinPool} computations (as may be determined using method
      * {@link #inForkJoinPool}).  Attempts to invoke in other contexts
      * result in exceptions or errors, possibly including {@code
      * ClassCastException}.
      *
      * @return the surplus number of tasks, which may be negative

@@ -1012,11 +1000,11 @@
      * contention with other threads.  This method is designed
      * primarily to support extensions, and is unlikely to be useful
      * otherwise.
      *
      * <p>This method may be invoked only from within {@code
-     * ForkJoinTask} computations (as may be determined using method
+     * ForkJoinPool} computations (as may be determined using method
      * {@link #inForkJoinPool}).  Attempts to invoke in other contexts
      * result in exceptions or errors, possibly including {@code
      * ClassCastException}.
      *
      * @return the next task, or {@code null} if none are available

@@ -1031,11 +1019,11 @@
      * queued by the current thread but not yet executed.  This method
      * is designed primarily to support extensions, and is unlikely to
      * be useful otherwise.
      *
      * <p>This method may be invoked only from within {@code
-     * ForkJoinTask} computations (as may be determined using method
+     * ForkJoinPool} computations (as may be determined using method
      * {@link #inForkJoinPool}).  Attempts to invoke in other contexts
      * result in exceptions or errors, possibly including {@code
      * ClassCastException}.
      *
      * @return the next task, or {@code null} if none are available

@@ -1054,11 +1042,11 @@
      * of the pool this task is operating in.  This method is designed
      * primarily to support extensions, and is unlikely to be useful
      * otherwise.
      *
      * <p>This method may be invoked only from within {@code
-     * ForkJoinTask} computations (as may be determined using method
+     * ForkJoinPool} computations (as may be determined using method
      * {@link #inForkJoinPool}).  Attempts to invoke in other contexts
      * result in exceptions or errors, possibly including {@code
      * ClassCastException}.
      *
      * @return a task, or {@code null} if none are available