src/share/classes/java/util/concurrent/ForkJoinTask.java

Print this page

        

*** 40,49 **** --- 40,59 ---- import java.util.Collections; import java.util.List; import java.util.RandomAccess; import java.util.Map; import java.util.WeakHashMap; + import java.util.concurrent.Callable; + import java.util.concurrent.CancellationException; + import java.util.concurrent.ExecutionException; + import java.util.concurrent.Executor; + import java.util.concurrent.ExecutorService; + import java.util.concurrent.Future; + import java.util.concurrent.RejectedExecutionException; + import java.util.concurrent.RunnableFuture; + import java.util.concurrent.TimeUnit; + import java.util.concurrent.TimeoutException; /** * Abstract base class for tasks that run within a {@link ForkJoinPool}. * A {@code ForkJoinTask} is a thread-like entity that is much * lighter weight than a normal thread. Huge numbers of tasks and
*** 127,136 **** --- 137,156 ---- * ForkJoinTasks (as may be determined using method {@link * #inForkJoinPool}). Attempts to invoke them in other contexts * result in exceptions or errors, possibly including * {@code ClassCastException}. * + * <p>Method {@link #join} and its variants are appropriate for use + * only when completion dependencies are acyclic; that is, the + * parallel computation can be described as a directed acyclic graph + * (DAG). Otherwise, executions may encounter a form of deadlock as + * tasks cyclically wait for each other. However, this framework + * supports other methods and techniques (for example the use of + * {@link Phaser}, {@link #helpQuiesce}, and {@link #complete}) that + * may be of use in constructing custom subclasses for problems that + * are not statically structured as DAGs. + * * <p>Most base support methods are {@code final}, to prevent * overriding of implementations that are intrinsically tied to the * underlying lightweight task scheduling framework. Developers * creating new basic styles of fork/join processing should minimally * implement {@code protected} methods {@link #exec}, {@link
*** 141,153 **** * * <p>ForkJoinTasks should perform relatively small amounts of * computation. Large tasks should be split into smaller subtasks, * usually via recursive decomposition. As a very rough rule of thumb, * a task should perform more than 100 and less than 10000 basic ! * computational steps. If tasks are too big, then parallelism cannot ! * improve throughput. If too small, then memory and internal task ! * maintenance overhead may overwhelm processing. * * <p>This class provides {@code adapt} methods for {@link Runnable} * and {@link Callable}, that may be of use when mixing execution of * {@code ForkJoinTasks} with other kinds of tasks. When all tasks are * of this form, consider using a pool constructed in <em>asyncMode</em>. --- 161,174 ---- * * <p>ForkJoinTasks should perform relatively small amounts of * computation. Large tasks should be split into smaller subtasks, * usually via recursive decomposition. As a very rough rule of thumb, * a task should perform more than 100 and less than 10000 basic ! * computational steps, and should avoid indefinite looping. If tasks ! * are too big, then parallelism cannot improve throughput. If too ! * small, then memory and internal task maintenance overhead may ! * overwhelm processing. * * <p>This class provides {@code adapt} methods for {@link Runnable} * and {@link Callable}, that may be of use when mixing execution of * {@code ForkJoinTasks} with other kinds of tasks. When all tasks are * of this form, consider using a pool constructed in <em>asyncMode</em>.
*** 240,312 **** exceptionMap.put(this, rex); setCompletion(EXCEPTIONAL); } /** - * Blocks a worker thread until completion. Called only by - * pool. Currently unused -- pool-based waits use timeout - * version below. - */ - final void internalAwaitDone() { - int s; // the odd construction reduces lock bias effects - while ((s = status) >= 0) { - try { - synchronized (this) { - if (UNSAFE.compareAndSwapInt(this, statusOffset, s,SIGNAL)) - wait(); - } - } catch (InterruptedException ie) { - cancelIfTerminating(); - } - } - } - - /** * Blocks a worker thread until completed or timed out. Called * only by pool. - * - * @return status on exit */ ! final int internalAwaitDone(long millis) { ! int s; ! if ((s = status) >= 0) { ! try { synchronized (this) { ! if (UNSAFE.compareAndSwapInt(this, statusOffset, s,SIGNAL)) ! wait(millis, 0); } } catch (InterruptedException ie) { cancelIfTerminating(); } - s = status; } - return s; } /** * Blocks a non-worker-thread until completion. */ private void externalAwaitDone() { ! int s; ! while ((s = status) >= 0) { ! synchronized (this) { ! if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)){ boolean interrupted = false; ! while (status >= 0) { try { wait(); } catch (InterruptedException ie) { interrupted = true; } } if (interrupted) Thread.currentThread().interrupt(); break; } } } } /** * Unless done, calls exec and records status if completed, but * doesn't wait for completion otherwise. Primary execution method * for ForkJoinWorkerThread. --- 261,351 ---- exceptionMap.put(this, rex); setCompletion(EXCEPTIONAL); } /** * Blocks a worker thread until completed or timed out. Called * only by pool. */ ! final void internalAwaitDone(long millis, int nanos) { ! int s = status; ! if ((s == 0 && ! UNSAFE.compareAndSwapInt(this, statusOffset, 0, SIGNAL)) || ! s > 0) { ! try { // the odd construction reduces lock bias effects synchronized (this) { ! if (status > 0) ! wait(millis, nanos); ! else ! notifyAll(); } } catch (InterruptedException ie) { cancelIfTerminating(); } } } /** * Blocks a non-worker-thread until completion. */ private void externalAwaitDone() { ! if (status >= 0) { boolean interrupted = false; ! synchronized (this) { ! for (;;) { ! int s = status; ! if (s == 0) ! UNSAFE.compareAndSwapInt(this, statusOffset, ! 0, SIGNAL); ! else if (s < 0) { ! notifyAll(); ! break; ! } ! else { try { wait(); } catch (InterruptedException ie) { interrupted = true; } } + } + } if (interrupted) Thread.currentThread().interrupt(); + } + } + + /** + * Blocks a non-worker-thread until completion or interruption or timeout. + */ + private void externalInterruptibleAwaitDone(boolean timed, long nanos) + throws InterruptedException { + if (Thread.interrupted()) + throw new InterruptedException(); + if (status >= 0) { + long startTime = timed ? System.nanoTime() : 0L; + synchronized (this) { + for (;;) { + long nt; + int s = status; + if (s == 0) + UNSAFE.compareAndSwapInt(this, statusOffset, + 0, SIGNAL); + else if (s < 0) { + notifyAll(); break; } + else if (!timed) + wait(); + else if ((nt = nanos - (System.nanoTime()-startTime)) > 0L) + wait(nt / 1000000, (int)(nt % 1000000)); + else + break; } } } + } /** * Unless done, calls exec and records status if completed, but * doesn't wait for completion otherwise. Primary execution method * for ForkJoinWorkerThread.
*** 333,343 **** * any thread other than the one executing it unless preceded by a * call to {@link #join} or related methods, or a call to {@link * #isDone} returning {@code true}. * * <p>This method may be invoked only from within {@code ! * ForkJoinTask} computations (as may be determined using method * {@link #inForkJoinPool}). Attempts to invoke in other contexts * result in exceptions or errors, possibly including {@code * ClassCastException}. * * @return {@code this}, to simplify usage --- 372,382 ---- * any thread other than the one executing it unless preceded by a * call to {@link #join} or related methods, or a call to {@link * #isDone} returning {@code true}. * * <p>This method may be invoked only from within {@code ! * ForkJoinPool} computations (as may be determined using method * {@link #inForkJoinPool}). Attempts to invoke in other contexts * result in exceptions or errors, possibly including {@code * ClassCastException}. * * @return {@code this}, to simplify usage
*** 347,360 **** .pushTask(this); return this; } /** ! * Returns the result of the computation when it {@link #isDone is done}. ! * This method differs from {@link #get()} in that * abnormal completion results in {@code RuntimeException} or ! * {@code Error}, not {@code ExecutionException}. * * @return the computed result */ public final V join() { quietlyJoin(); --- 386,402 ---- .pushTask(this); return this; } /** ! * Returns the result of the computation when it {@link #isDone is ! * done}. This method differs from {@link #get()} in that * abnormal completion results in {@code RuntimeException} or ! * {@code Error}, not {@code ExecutionException}, and that ! * interrupts of the calling thread do <em>not</em> cause the ! * method to abruptly return by throwing {@code ! * InterruptedException}. * * @return the computed result */ public final V join() { quietlyJoin();
*** 392,402 **** * #getException()} and related methods to check if they have been * cancelled, completed normally or exceptionally, or left * unprocessed. * * <p>This method may be invoked only from within {@code ! * ForkJoinTask} computations (as may be determined using method * {@link #inForkJoinPool}). Attempts to invoke in other contexts * result in exceptions or errors, possibly including {@code * ClassCastException}. * * @param t1 the first task --- 434,444 ---- * #getException()} and related methods to check if they have been * cancelled, completed normally or exceptionally, or left * unprocessed. * * <p>This method may be invoked only from within {@code ! * ForkJoinPool} computations (as may be determined using method * {@link #inForkJoinPool}). Attempts to invoke in other contexts * result in exceptions or errors, possibly including {@code * ClassCastException}. * * @param t1 the first task
*** 420,430 **** * each task may be obtained using {@link #getException()} and * related methods to check if they have been cancelled, completed * normally or exceptionally, or left unprocessed. * * <p>This method may be invoked only from within {@code ! * ForkJoinTask} computations (as may be determined using method * {@link #inForkJoinPool}). Attempts to invoke in other contexts * result in exceptions or errors, possibly including {@code * ClassCastException}. * * @param tasks the tasks --- 462,472 ---- * each task may be obtained using {@link #getException()} and * related methods to check if they have been cancelled, completed * normally or exceptionally, or left unprocessed. * * <p>This method may be invoked only from within {@code ! * ForkJoinPool} computations (as may be determined using method * {@link #inForkJoinPool}). Attempts to invoke in other contexts * result in exceptions or errors, possibly including {@code * ClassCastException}. * * @param tasks the tasks
*** 475,485 **** * #getException()} and related methods to check if they have been * cancelled, completed normally or exceptionally, or left * unprocessed. * * <p>This method may be invoked only from within {@code ! * ForkJoinTask} computations (as may be determined using method * {@link #inForkJoinPool}). Attempts to invoke in other contexts * result in exceptions or errors, possibly including {@code * ClassCastException}. * * @param tasks the collection of tasks --- 517,527 ---- * #getException()} and related methods to check if they have been * cancelled, completed normally or exceptionally, or left * unprocessed. * * <p>This method may be invoked only from within {@code ! * ForkJoinPool} computations (as may be determined using method * {@link #inForkJoinPool}). Attempts to invoke in other contexts * result in exceptions or errors, possibly including {@code * ClassCastException}. * * @param tasks the collection of tasks
*** 527,555 **** return tasks; } /** * Attempts to cancel execution of this task. This attempt will ! * fail if the task has already completed, has already been ! * cancelled, or could not be cancelled for some other reason. If ! * successful, and this task has not started when cancel is ! * called, execution of this task is suppressed, {@link ! * #isCancelled} will report true, and {@link #join} will result ! * in a {@code CancellationException} being thrown. * * <p>This method may be overridden in subclasses, but if so, must ! * still ensure that these minimal properties hold. In particular, ! * the {@code cancel} method itself must not throw exceptions. * * <p>This method is designed to be invoked by <em>other</em> * tasks. To terminate the current task, you can just return or * throw an unchecked exception from its computation method, or * invoke {@link #completeExceptionally}. * ! * @param mayInterruptIfRunning this value is ignored in the ! * default implementation because tasks are not ! * cancelled via interruption * * @return {@code true} if this task is now cancelled */ public boolean cancel(boolean mayInterruptIfRunning) { setCompletion(CANCELLED); --- 569,600 ---- return tasks; } /** * Attempts to cancel execution of this task. This attempt will ! * fail if the task has already completed or could not be ! * cancelled for some other reason. If successful, and this task ! * has not started when {@code cancel} is called, execution of ! * this task is suppressed. After this method returns ! * successfully, unless there is an intervening call to {@link ! * #reinitialize}, subsequent calls to {@link #isCancelled}, ! * {@link #isDone}, and {@code cancel} will return {@code true} ! * and calls to {@link #join} and related methods will result in ! * {@code CancellationException}. * * <p>This method may be overridden in subclasses, but if so, must ! * still ensure that these properties hold. In particular, the ! * {@code cancel} method itself must not throw exceptions. * * <p>This method is designed to be invoked by <em>other</em> * tasks. To terminate the current task, you can just return or * throw an unchecked exception from its computation method, or * invoke {@link #completeExceptionally}. * ! * @param mayInterruptIfRunning this value has no effect in the ! * default implementation because interrupts are not used to ! * control cancellation. * * @return {@code true} if this task is now cancelled */ public boolean cancel(boolean mayInterruptIfRunning) { setCompletion(CANCELLED);
*** 679,705 **** * exception * @throws InterruptedException if the current thread is not a * member of a ForkJoinPool and was interrupted while waiting */ public final V get() throws InterruptedException, ExecutionException { ! int s; ! if (Thread.currentThread() instanceof ForkJoinWorkerThread) { quietlyJoin(); ! s = status; ! } ! else { ! while ((s = status) >= 0) { ! synchronized (this) { // interruptible form of awaitDone ! if (UNSAFE.compareAndSwapInt(this, statusOffset, ! s, SIGNAL)) { ! while (status >= 0) ! wait(); ! } ! } ! } ! } ! if (s < NORMAL) { Throwable ex; if (s == CANCELLED) throw new CancellationException(); if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null) throw new ExecutionException(ex); --- 724,740 ---- * exception * @throws InterruptedException if the current thread is not a * member of a ForkJoinPool and was interrupted while waiting */ public final V get() throws InterruptedException, ExecutionException { ! Thread t = Thread.currentThread(); ! if (t instanceof ForkJoinWorkerThread) quietlyJoin(); ! else ! externalInterruptibleAwaitDone(false, 0L); ! int s = status; ! if (s != NORMAL) { Throwable ex; if (s == CANCELLED) throw new CancellationException(); if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null) throw new ExecutionException(ex);
*** 721,796 **** * member of a ForkJoinPool and was interrupted while waiting * @throws TimeoutException if the wait timed out */ public final V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { Thread t = Thread.currentThread(); ! ForkJoinPool pool; ! if (t instanceof ForkJoinWorkerThread) { ! ForkJoinWorkerThread w = (ForkJoinWorkerThread) t; ! if (status >= 0 && w.unpushTask(this)) ! quietlyExec(); ! pool = w.pool; ! } else ! pool = null; ! /* ! * Timed wait loop intermixes cases for FJ (pool != null) and ! * non FJ threads. For FJ, decrement pool count but don't try ! * for replacement; increment count on completion. For non-FJ, ! * deal with interrupts. This is messy, but a little less so ! * than is splitting the FJ and nonFJ cases. ! */ ! boolean interrupted = false; ! boolean dec = false; // true if pool count decremented ! long nanos = unit.toNanos(timeout); ! for (;;) { ! if (pool == null && Thread.interrupted()) { ! interrupted = true; ! break; ! } int s = status; ! if (s < 0) ! break; ! if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)) { ! long startTime = System.nanoTime(); ! long nt; // wait time ! while (status >= 0 && ! (nt = nanos - (System.nanoTime() - startTime)) > 0) { ! if (pool != null && !dec) ! dec = pool.tryDecrementRunningCount(); ! else { ! long ms = nt / 1000000; ! int ns = (int) (nt % 1000000); ! try { ! synchronized (this) { ! if (status >= 0) ! wait(ms, ns); ! } ! } catch (InterruptedException ie) { ! if (pool != null) ! cancelIfTerminating(); ! else { ! interrupted = true; ! break; ! } ! } ! } ! } ! break; ! } ! } ! if (pool != null && dec) ! pool.incrementRunningCount(); ! if (interrupted) ! throw new InterruptedException(); ! int es = status; ! if (es != NORMAL) { Throwable ex; ! if (es == CANCELLED) throw new CancellationException(); ! if (es == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null) throw new ExecutionException(ex); throw new TimeoutException(); } return getRawResult(); } --- 756,777 ---- * member of a ForkJoinPool and was interrupted while waiting * @throws TimeoutException if the wait timed out */ public final V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + long nanos = unit.toNanos(timeout); Thread t = Thread.currentThread(); ! if (t instanceof ForkJoinWorkerThread) ! ((ForkJoinWorkerThread)t).joinTask(this, true, nanos); else ! externalInterruptibleAwaitDone(true, nanos); int s = status; ! if (s != NORMAL) { Throwable ex; ! if (s == CANCELLED) throw new CancellationException(); ! if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null) throw new ExecutionException(ex); throw new TimeoutException(); } return getRawResult(); }
*** 817,827 **** if (completed) { setCompletion(NORMAL); return; } } ! w.joinTask(this); } } else externalAwaitDone(); } --- 798,808 ---- if (completed) { setCompletion(NORMAL); return; } } ! w.joinTask(this, false, 0L); } } else externalAwaitDone(); }
*** 853,863 **** * be of use in designs in which many tasks are forked, but none * are explicitly joined, instead executing them until all are * processed. * * <p>This method may be invoked only from within {@code ! * ForkJoinTask} computations (as may be determined using method * {@link #inForkJoinPool}). Attempts to invoke in other contexts * result in exceptions or errors, possibly including {@code * ClassCastException}. */ public static void helpQuiesce() { --- 834,844 ---- * be of use in designs in which many tasks are forked, but none * are explicitly joined, instead executing them until all are * processed. * * <p>This method may be invoked only from within {@code ! * ForkJoinPool} computations (as may be determined using method * {@link #inForkJoinPool}). Attempts to invoke in other contexts * result in exceptions or errors, possibly including {@code * ClassCastException}. */ public static void helpQuiesce() {
*** 872,881 **** --- 853,868 ---- * never been forked, or has been forked, then completed and all * outstanding joins of this task have also completed. Effects * under any other usage conditions are not guaranteed. * This method may be useful when executing * pre-constructed trees of subtasks in loops. + * + * <p>Upon completion of this method, {@code isDone()} reports + * {@code false}, and {@code getException()} reports {@code + * null}. However, the value returned by {@code getRawResult} is + * unaffected. To clear this value, you can invoke {@code + * setRawResult(null)}. */ public void reinitialize() { if (status == EXCEPTIONAL) exceptionMap.remove(this); status = 0;
*** 893,907 **** return (t instanceof ForkJoinWorkerThread) ? ((ForkJoinWorkerThread) t).pool : null; } /** ! * Returns {@code true} if the current thread is executing as a ! * ForkJoinPool computation. * ! * @return {@code true} if the current thread is executing as a ! * ForkJoinPool computation, or false otherwise */ public static boolean inForkJoinPool() { return Thread.currentThread() instanceof ForkJoinWorkerThread; } --- 880,895 ---- return (t instanceof ForkJoinWorkerThread) ? ((ForkJoinWorkerThread) t).pool : null; } /** ! * Returns {@code true} if the current thread is a {@link ! * ForkJoinWorkerThread} executing as a ForkJoinPool computation. * ! * @return {@code true} if the current thread is a {@link ! * ForkJoinWorkerThread} executing as a ForkJoinPool computation, ! * or {@code false} otherwise */ public static boolean inForkJoinPool() { return Thread.currentThread() instanceof ForkJoinWorkerThread; }
*** 912,922 **** * another thread. This method may be useful when arranging * alternative local processing of tasks that could have been, but * were not, stolen. * * <p>This method may be invoked only from within {@code ! * ForkJoinTask} computations (as may be determined using method * {@link #inForkJoinPool}). Attempts to invoke in other contexts * result in exceptions or errors, possibly including {@code * ClassCastException}. * * @return {@code true} if unforked --- 900,910 ---- * another thread. This method may be useful when arranging * alternative local processing of tasks that could have been, but * were not, stolen. * * <p>This method may be invoked only from within {@code ! * ForkJoinPool} computations (as may be determined using method * {@link #inForkJoinPool}). Attempts to invoke in other contexts * result in exceptions or errors, possibly including {@code * ClassCastException}. * * @return {@code true} if unforked
*** 931,941 **** * forked by the current worker thread but not yet executed. This * value may be useful for heuristic decisions about whether to * fork other tasks. * * <p>This method may be invoked only from within {@code ! * ForkJoinTask} computations (as may be determined using method * {@link #inForkJoinPool}). Attempts to invoke in other contexts * result in exceptions or errors, possibly including {@code * ClassCastException}. * * @return the number of tasks --- 919,929 ---- * forked by the current worker thread but not yet executed. This * value may be useful for heuristic decisions about whether to * fork other tasks. * * <p>This method may be invoked only from within {@code ! * ForkJoinPool} computations (as may be determined using method * {@link #inForkJoinPool}). Attempts to invoke in other contexts * result in exceptions or errors, possibly including {@code * ClassCastException}. * * @return the number of tasks
*** 954,964 **** * aim to maintain a small constant surplus (for example, 3) of * tasks, and to process computations locally if this threshold is * exceeded. * * <p>This method may be invoked only from within {@code ! * ForkJoinTask} computations (as may be determined using method * {@link #inForkJoinPool}). Attempts to invoke in other contexts * result in exceptions or errors, possibly including {@code * ClassCastException}. * * @return the surplus number of tasks, which may be negative --- 942,952 ---- * aim to maintain a small constant surplus (for example, 3) of * tasks, and to process computations locally if this threshold is * exceeded. * * <p>This method may be invoked only from within {@code ! * ForkJoinPool} computations (as may be determined using method * {@link #inForkJoinPool}). Attempts to invoke in other contexts * result in exceptions or errors, possibly including {@code * ClassCastException}. * * @return the surplus number of tasks, which may be negative
*** 1012,1022 **** * contention with other threads. This method is designed * primarily to support extensions, and is unlikely to be useful * otherwise. * * <p>This method may be invoked only from within {@code ! * ForkJoinTask} computations (as may be determined using method * {@link #inForkJoinPool}). Attempts to invoke in other contexts * result in exceptions or errors, possibly including {@code * ClassCastException}. * * @return the next task, or {@code null} if none are available --- 1000,1010 ---- * contention with other threads. This method is designed * primarily to support extensions, and is unlikely to be useful * otherwise. * * <p>This method may be invoked only from within {@code ! * ForkJoinPool} computations (as may be determined using method * {@link #inForkJoinPool}). Attempts to invoke in other contexts * result in exceptions or errors, possibly including {@code * ClassCastException}. * * @return the next task, or {@code null} if none are available
*** 1031,1041 **** * queued by the current thread but not yet executed. This method * is designed primarily to support extensions, and is unlikely to * be useful otherwise. * * <p>This method may be invoked only from within {@code ! * ForkJoinTask} computations (as may be determined using method * {@link #inForkJoinPool}). Attempts to invoke in other contexts * result in exceptions or errors, possibly including {@code * ClassCastException}. * * @return the next task, or {@code null} if none are available --- 1019,1029 ---- * queued by the current thread but not yet executed. This method * is designed primarily to support extensions, and is unlikely to * be useful otherwise. * * <p>This method may be invoked only from within {@code ! * ForkJoinPool} computations (as may be determined using method * {@link #inForkJoinPool}). Attempts to invoke in other contexts * result in exceptions or errors, possibly including {@code * ClassCastException}. * * @return the next task, or {@code null} if none are available
*** 1054,1064 **** * of the pool this task is operating in. This method is designed * primarily to support extensions, and is unlikely to be useful * otherwise. * * <p>This method may be invoked only from within {@code ! * ForkJoinTask} computations (as may be determined using method * {@link #inForkJoinPool}). Attempts to invoke in other contexts * result in exceptions or errors, possibly including {@code * ClassCastException}. * * @return a task, or {@code null} if none are available --- 1042,1052 ---- * of the pool this task is operating in. This method is designed * primarily to support extensions, and is unlikely to be useful * otherwise. * * <p>This method may be invoked only from within {@code ! * ForkJoinPool} computations (as may be determined using method * {@link #inForkJoinPool}). Attempts to invoke in other contexts * result in exceptions or errors, possibly including {@code * ClassCastException}. * * @return a task, or {@code null} if none are available