25 /*
26 * This file is available under and governed by the GNU General Public
27 * License version 2 only, as published by the Free Software Foundation.
28 * However, the following notice accompanied the original version of this
29 * file:
30 *
31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 * Expert Group and released to the public domain, as explained at
33 * http://creativecommons.org/licenses/publicdomain
34 */
35
36 package java.util.concurrent;
37
38 import java.io.Serializable;
39 import java.util.Collection;
40 import java.util.Collections;
41 import java.util.List;
42 import java.util.RandomAccess;
43 import java.util.Map;
44 import java.util.WeakHashMap;
45
46 /**
47 * Abstract base class for tasks that run within a {@link ForkJoinPool}.
48 * A {@code ForkJoinTask} is a thread-like entity that is much
49 * lighter weight than a normal thread. Huge numbers of tasks and
50 * subtasks may be hosted by a small number of actual threads in a
51 * ForkJoinPool, at the price of some usage limitations.
52 *
53 * <p>A "main" {@code ForkJoinTask} begins execution when submitted
54 * to a {@link ForkJoinPool}. Once started, it will usually in turn
55 * start other subtasks. As indicated by the name of this class,
56 * many programs using {@code ForkJoinTask} employ only methods
57 * {@link #fork} and {@link #join}, or derivatives such as {@link
58 * #invokeAll(ForkJoinTask...) invokeAll}. However, this class also
59 * provides a number of other methods that can come into play in
60 * advanced usages, as well as extension mechanics that allow
61 * support of new forms of fork/join processing.
62 *
63 * <p>A {@code ForkJoinTask} is a lightweight form of {@link Future}.
64 * The efficiency of {@code ForkJoinTask}s stems from a set of
112 * cancelled or encountered an exception, in which case {@link
113 * #getException} will return either the encountered exception or
114 * {@link java.util.concurrent.CancellationException}.
115 *
116 * <p>The ForkJoinTask class is not usually directly subclassed.
117 * Instead, you subclass one of the abstract classes that support a
118 * particular style of fork/join processing, typically {@link
119 * RecursiveAction} for computations that do not return results, or
120 * {@link RecursiveTask} for those that do. Normally, a concrete
121 * ForkJoinTask subclass declares fields comprising its parameters,
122 * established in a constructor, and then defines a {@code compute}
123 * method that somehow uses the control methods supplied by this base
124 * class. While these methods have {@code public} access (to allow
125 * instances of different task subclasses to call each other's
126 * methods), some of them may only be called from within other
127 * ForkJoinTasks (as may be determined using method {@link
128 * #inForkJoinPool}). Attempts to invoke them in other contexts
129 * result in exceptions or errors, possibly including
130 * {@code ClassCastException}.
131 *
132 * <p>Most base support methods are {@code final}, to prevent
133 * overriding of implementations that are intrinsically tied to the
134 * underlying lightweight task scheduling framework. Developers
135 * creating new basic styles of fork/join processing should minimally
136 * implement {@code protected} methods {@link #exec}, {@link
137 * #setRawResult}, and {@link #getRawResult}, while also introducing
138 * an abstract computational method that can be implemented in its
139 * subclasses, possibly relying on other {@code protected} methods
140 * provided by this class.
141 *
142 * <p>ForkJoinTasks should perform relatively small amounts of
143 * computation. Large tasks should be split into smaller subtasks,
144 * usually via recursive decomposition. As a very rough rule of thumb,
145 * a task should perform more than 100 and less than 10000 basic
146 * computational steps. If tasks are too big, then parallelism cannot
147 * improve throughput. If too small, then memory and internal task
148 * maintenance overhead may overwhelm processing.
149 *
150 * <p>This class provides {@code adapt} methods for {@link Runnable}
151 * and {@link Callable}, that may be of use when mixing execution of
152 * {@code ForkJoinTasks} with other kinds of tasks. When all tasks are
153 * of this form, consider using a pool constructed in <em>asyncMode</em>.
154 *
155 * <p>ForkJoinTasks are {@code Serializable}, which enables them to be
156 * used in extensions such as remote execution frameworks. It is
157 * sensible to serialize tasks only before or after, but not during,
158 * execution. Serialization is not relied on during execution itself.
159 *
160 * @since 1.7
161 * @author Doug Lea
162 */
163 public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
164
165 /*
166 * See the internal documentation of class ForkJoinPool for a
167 * general implementation overview. ForkJoinTasks are mainly
168 * responsible for maintaining their "status" field amidst relays
225 while ((s = status) >= 0) {
226 if (UNSAFE.compareAndSwapInt(this, statusOffset, s, completion)) {
227 if (s != 0)
228 synchronized (this) { notifyAll(); }
229 break;
230 }
231 }
232 }
233
234 /**
235 * Records exception and sets exceptional completion.
236 *
237 * @return status on exit
238 */
239 private void setExceptionalCompletion(Throwable rex) {
240 exceptionMap.put(this, rex);
241 setCompletion(EXCEPTIONAL);
242 }
243
244 /**
245 * Blocks a worker thread until completion. Called only by
246 * pool. Currently unused -- pool-based waits use timeout
247 * version below.
248 */
249 final void internalAwaitDone() {
250 int s; // the odd construction reduces lock bias effects
251 while ((s = status) >= 0) {
252 try {
253 synchronized (this) {
254 if (UNSAFE.compareAndSwapInt(this, statusOffset, s,SIGNAL))
255 wait();
256 }
257 } catch (InterruptedException ie) {
258 cancelIfTerminating();
259 }
260 }
261 }
262
263 /**
264 * Blocks a worker thread until completed or timed out. Called
265 * only by pool.
266 *
267 * @return status on exit
268 */
269 final int internalAwaitDone(long millis) {
270 int s;
271 if ((s = status) >= 0) {
272 try {
273 synchronized (this) {
274 if (UNSAFE.compareAndSwapInt(this, statusOffset, s,SIGNAL))
275 wait(millis, 0);
276 }
277 } catch (InterruptedException ie) {
278 cancelIfTerminating();
279 }
280 s = status;
281 }
282 return s;
283 }
284
285 /**
286 * Blocks a non-worker-thread until completion.
287 */
288 private void externalAwaitDone() {
289 int s;
290 while ((s = status) >= 0) {
291 synchronized (this) {
292 if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)){
293 boolean interrupted = false;
294 while (status >= 0) {
295 try {
296 wait();
297 } catch (InterruptedException ie) {
298 interrupted = true;
299 }
300 }
301 if (interrupted)
302 Thread.currentThread().interrupt();
303 break;
304 }
305 }
306 }
307 }
308
309 /**
310 * Unless done, calls exec and records status if completed, but
311 * doesn't wait for completion otherwise. Primary execution method
312 * for ForkJoinWorkerThread.
313 */
314 final void quietlyExec() {
315 try {
316 if (status < 0 || !exec())
317 return;
318 } catch (Throwable rex) {
319 setExceptionalCompletion(rex);
320 return;
321 }
322 setCompletion(NORMAL); // must be outside try block
323 }
324
325 // public methods
326
327 /**
328 * Arranges to asynchronously execute this task. While it is not
329 * necessarily enforced, it is a usage error to fork a task more
330 * than once unless it has completed and been reinitialized.
331 * Subsequent modifications to the state of this task or any data
332 * it operates on are not necessarily consistently observable by
333 * any thread other than the one executing it unless preceded by a
334 * call to {@link #join} or related methods, or a call to {@link
335 * #isDone} returning {@code true}.
336 *
337 * <p>This method may be invoked only from within {@code
338 * ForkJoinTask} computations (as may be determined using method
339 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
340 * result in exceptions or errors, possibly including {@code
341 * ClassCastException}.
342 *
343 * @return {@code this}, to simplify usage
344 */
345 public final ForkJoinTask<V> fork() {
346 ((ForkJoinWorkerThread) Thread.currentThread())
347 .pushTask(this);
348 return this;
349 }
350
351 /**
352 * Returns the result of the computation when it {@link #isDone is done}.
353 * This method differs from {@link #get()} in that
354 * abnormal completion results in {@code RuntimeException} or
355 * {@code Error}, not {@code ExecutionException}.
356 *
357 * @return the computed result
358 */
359 public final V join() {
360 quietlyJoin();
361 Throwable ex;
362 if (status < NORMAL && (ex = getException()) != null)
363 UNSAFE.throwException(ex);
364 return getRawResult();
365 }
366
367 /**
368 * Commences performing this task, awaits its completion if
369 * necessary, and returns its result, or throws an (unchecked)
370 * {@code RuntimeException} or {@code Error} if the underlying
371 * computation did so.
372 *
373 * @return the computed result
374 */
375 public final V invoke() {
377 Throwable ex;
378 if (status < NORMAL && (ex = getException()) != null)
379 UNSAFE.throwException(ex);
380 return getRawResult();
381 }
382
383 /**
384 * Forks the given tasks, returning when {@code isDone} holds for
385 * each task or an (unchecked) exception is encountered, in which
386 * case the exception is rethrown. If more than one task
387 * encounters an exception, then this method throws any one of
388 * these exceptions. If any task encounters an exception, the
389 * other may be cancelled. However, the execution status of
390 * individual tasks is not guaranteed upon exceptional return. The
391 * status of each task may be obtained using {@link
392 * #getException()} and related methods to check if they have been
393 * cancelled, completed normally or exceptionally, or left
394 * unprocessed.
395 *
396 * <p>This method may be invoked only from within {@code
397 * ForkJoinTask} computations (as may be determined using method
398 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
399 * result in exceptions or errors, possibly including {@code
400 * ClassCastException}.
401 *
402 * @param t1 the first task
403 * @param t2 the second task
404 * @throws NullPointerException if any task is null
405 */
406 public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
407 t2.fork();
408 t1.invoke();
409 t2.join();
410 }
411
412 /**
413 * Forks the given tasks, returning when {@code isDone} holds for
414 * each task or an (unchecked) exception is encountered, in which
415 * case the exception is rethrown. If more than one task
416 * encounters an exception, then this method throws any one of
417 * these exceptions. If any task encounters an exception, others
418 * may be cancelled. However, the execution status of individual
419 * tasks is not guaranteed upon exceptional return. The status of
420 * each task may be obtained using {@link #getException()} and
421 * related methods to check if they have been cancelled, completed
422 * normally or exceptionally, or left unprocessed.
423 *
424 * <p>This method may be invoked only from within {@code
425 * ForkJoinTask} computations (as may be determined using method
426 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
427 * result in exceptions or errors, possibly including {@code
428 * ClassCastException}.
429 *
430 * @param tasks the tasks
431 * @throws NullPointerException if any task is null
432 */
433 public static void invokeAll(ForkJoinTask<?>... tasks) {
434 Throwable ex = null;
435 int last = tasks.length - 1;
436 for (int i = last; i >= 0; --i) {
437 ForkJoinTask<?> t = tasks[i];
438 if (t == null) {
439 if (ex == null)
440 ex = new NullPointerException();
441 }
442 else if (i != 0)
443 t.fork();
444 else {
445 t.quietlyInvoke();
460 }
461 }
462 if (ex != null)
463 UNSAFE.throwException(ex);
464 }
465
466 /**
467 * Forks all tasks in the specified collection, returning when
468 * {@code isDone} holds for each task or an (unchecked) exception
469 * is encountered, in which case the exception is rethrown. If
470 * more than one task encounters an exception, then this method
471 * throws any one of these exceptions. If any task encounters an
472 * exception, others may be cancelled. However, the execution
473 * status of individual tasks is not guaranteed upon exceptional
474 * return. The status of each task may be obtained using {@link
475 * #getException()} and related methods to check if they have been
476 * cancelled, completed normally or exceptionally, or left
477 * unprocessed.
478 *
479 * <p>This method may be invoked only from within {@code
480 * ForkJoinTask} computations (as may be determined using method
481 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
482 * result in exceptions or errors, possibly including {@code
483 * ClassCastException}.
484 *
485 * @param tasks the collection of tasks
486 * @return the tasks argument, to simplify usage
487 * @throws NullPointerException if tasks or any element are null
488 */
489 public static <T extends ForkJoinTask<?>> Collection<T> invokeAll(Collection<T> tasks) {
490 if (!(tasks instanceof RandomAccess) || !(tasks instanceof List<?>)) {
491 invokeAll(tasks.toArray(new ForkJoinTask<?>[tasks.size()]));
492 return tasks;
493 }
494 @SuppressWarnings("unchecked")
495 List<? extends ForkJoinTask<?>> ts =
496 (List<? extends ForkJoinTask<?>>) tasks;
497 Throwable ex = null;
498 int last = ts.size() - 1;
499 for (int i = last; i >= 0; --i) {
500 ForkJoinTask<?> t = ts.get(i);
512 }
513 for (int i = 1; i <= last; ++i) {
514 ForkJoinTask<?> t = ts.get(i);
515 if (t != null) {
516 if (ex != null)
517 t.cancel(false);
518 else {
519 t.quietlyJoin();
520 if (ex == null && t.status < NORMAL)
521 ex = t.getException();
522 }
523 }
524 }
525 if (ex != null)
526 UNSAFE.throwException(ex);
527 return tasks;
528 }
529
530 /**
531 * Attempts to cancel execution of this task. This attempt will
532 * fail if the task has already completed, has already been
533 * cancelled, or could not be cancelled for some other reason. If
534 * successful, and this task has not started when cancel is
535 * called, execution of this task is suppressed, {@link
536 * #isCancelled} will report true, and {@link #join} will result
537 * in a {@code CancellationException} being thrown.
538 *
539 * <p>This method may be overridden in subclasses, but if so, must
540 * still ensure that these minimal properties hold. In particular,
541 * the {@code cancel} method itself must not throw exceptions.
542 *
543 * <p>This method is designed to be invoked by <em>other</em>
544 * tasks. To terminate the current task, you can just return or
545 * throw an unchecked exception from its computation method, or
546 * invoke {@link #completeExceptionally}.
547 *
548 * @param mayInterruptIfRunning this value is ignored in the
549 * default implementation because tasks are not
550 * cancelled via interruption
551 *
552 * @return {@code true} if this task is now cancelled
553 */
554 public boolean cancel(boolean mayInterruptIfRunning) {
555 setCompletion(CANCELLED);
556 return status == CANCELLED;
557 }
558
559 /**
560 * Cancels, ignoring any exceptions thrown by cancel. Used during
561 * worker and pool shutdown. Cancel is spec'ed not to throw any
562 * exceptions, but if it does anyway, we have no recourse during
563 * shutdown, so guard against this case.
564 */
565 final void cancelIgnoringExceptions() {
566 try {
567 cancel(false);
568 } catch (Throwable ignore) {
569 }
570 }
664 setRawResult(value);
665 } catch (Throwable rex) {
666 setExceptionalCompletion(rex);
667 return;
668 }
669 setCompletion(NORMAL);
670 }
671
672 /**
673 * Waits if necessary for the computation to complete, and then
674 * retrieves its result.
675 *
676 * @return the computed result
677 * @throws CancellationException if the computation was cancelled
678 * @throws ExecutionException if the computation threw an
679 * exception
680 * @throws InterruptedException if the current thread is not a
681 * member of a ForkJoinPool and was interrupted while waiting
682 */
683 public final V get() throws InterruptedException, ExecutionException {
684 int s;
685 if (Thread.currentThread() instanceof ForkJoinWorkerThread) {
686 quietlyJoin();
687 s = status;
688 }
689 else {
690 while ((s = status) >= 0) {
691 synchronized (this) { // interruptible form of awaitDone
692 if (UNSAFE.compareAndSwapInt(this, statusOffset,
693 s, SIGNAL)) {
694 while (status >= 0)
695 wait();
696 }
697 }
698 }
699 }
700 if (s < NORMAL) {
701 Throwable ex;
702 if (s == CANCELLED)
703 throw new CancellationException();
704 if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
705 throw new ExecutionException(ex);
706 }
707 return getRawResult();
708 }
709
710 /**
711 * Waits if necessary for at most the given time for the computation
712 * to complete, and then retrieves its result, if available.
713 *
714 * @param timeout the maximum time to wait
715 * @param unit the time unit of the timeout argument
716 * @return the computed result
717 * @throws CancellationException if the computation was cancelled
718 * @throws ExecutionException if the computation threw an
719 * exception
720 * @throws InterruptedException if the current thread is not a
721 * member of a ForkJoinPool and was interrupted while waiting
722 * @throws TimeoutException if the wait timed out
723 */
724 public final V get(long timeout, TimeUnit unit)
725 throws InterruptedException, ExecutionException, TimeoutException {
726 Thread t = Thread.currentThread();
727 ForkJoinPool pool;
728 if (t instanceof ForkJoinWorkerThread) {
729 ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
730 if (status >= 0 && w.unpushTask(this))
731 quietlyExec();
732 pool = w.pool;
733 }
734 else
735 pool = null;
736 /*
737 * Timed wait loop intermixes cases for FJ (pool != null) and
738 * non FJ threads. For FJ, decrement pool count but don't try
739 * for replacement; increment count on completion. For non-FJ,
740 * deal with interrupts. This is messy, but a little less so
741 * than is splitting the FJ and nonFJ cases.
742 */
743 boolean interrupted = false;
744 boolean dec = false; // true if pool count decremented
745 long nanos = unit.toNanos(timeout);
746 for (;;) {
747 if (pool == null && Thread.interrupted()) {
748 interrupted = true;
749 break;
750 }
751 int s = status;
752 if (s < 0)
753 break;
754 if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)) {
755 long startTime = System.nanoTime();
756 long nt; // wait time
757 while (status >= 0 &&
758 (nt = nanos - (System.nanoTime() - startTime)) > 0) {
759 if (pool != null && !dec)
760 dec = pool.tryDecrementRunningCount();
761 else {
762 long ms = nt / 1000000;
763 int ns = (int) (nt % 1000000);
764 try {
765 synchronized (this) {
766 if (status >= 0)
767 wait(ms, ns);
768 }
769 } catch (InterruptedException ie) {
770 if (pool != null)
771 cancelIfTerminating();
772 else {
773 interrupted = true;
774 break;
775 }
776 }
777 }
778 }
779 break;
780 }
781 }
782 if (pool != null && dec)
783 pool.incrementRunningCount();
784 if (interrupted)
785 throw new InterruptedException();
786 int es = status;
787 if (es != NORMAL) {
788 Throwable ex;
789 if (es == CANCELLED)
790 throw new CancellationException();
791 if (es == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
792 throw new ExecutionException(ex);
793 throw new TimeoutException();
794 }
795 return getRawResult();
796 }
797
798 /**
799 * Joins this task, without returning its result or throwing its
800 * exception. This method may be useful when processing
801 * collections of tasks when some have been cancelled or otherwise
802 * known to have aborted.
803 */
804 public final void quietlyJoin() {
805 Thread t;
806 if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
807 ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
808 if (status >= 0) {
809 if (w.unpushTask(this)) {
810 boolean completed;
811 try {
812 completed = exec();
813 } catch (Throwable rex) {
814 setExceptionalCompletion(rex);
815 return;
816 }
817 if (completed) {
818 setCompletion(NORMAL);
819 return;
820 }
821 }
822 w.joinTask(this);
823 }
824 }
825 else
826 externalAwaitDone();
827 }
828
829 /**
830 * Commences performing this task and awaits its completion if
831 * necessary, without returning its result or throwing its
832 * exception.
833 */
834 public final void quietlyInvoke() {
835 if (status >= 0) {
836 boolean completed;
837 try {
838 completed = exec();
839 } catch (Throwable rex) {
840 setExceptionalCompletion(rex);
841 return;
842 }
843 if (completed)
844 setCompletion(NORMAL);
845 else
846 quietlyJoin();
847 }
848 }
849
850 /**
851 * Possibly executes tasks until the pool hosting the current task
852 * {@link ForkJoinPool#isQuiescent is quiescent}. This method may
853 * be of use in designs in which many tasks are forked, but none
854 * are explicitly joined, instead executing them until all are
855 * processed.
856 *
857 * <p>This method may be invoked only from within {@code
858 * ForkJoinTask} computations (as may be determined using method
859 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
860 * result in exceptions or errors, possibly including {@code
861 * ClassCastException}.
862 */
863 public static void helpQuiesce() {
864 ((ForkJoinWorkerThread) Thread.currentThread())
865 .helpQuiescePool();
866 }
867
868 /**
869 * Resets the internal bookkeeping state of this task, allowing a
870 * subsequent {@code fork}. This method allows repeated reuse of
871 * this task, but only if reuse occurs when this task has either
872 * never been forked, or has been forked, then completed and all
873 * outstanding joins of this task have also completed. Effects
874 * under any other usage conditions are not guaranteed.
875 * This method may be useful when executing
876 * pre-constructed trees of subtasks in loops.
877 */
878 public void reinitialize() {
879 if (status == EXCEPTIONAL)
880 exceptionMap.remove(this);
881 status = 0;
882 }
883
884 /**
885 * Returns the pool hosting the current task execution, or null
886 * if this task is executing outside of any ForkJoinPool.
887 *
888 * @see #inForkJoinPool
889 * @return the pool, or {@code null} if none
890 */
891 public static ForkJoinPool getPool() {
892 Thread t = Thread.currentThread();
893 return (t instanceof ForkJoinWorkerThread) ?
894 ((ForkJoinWorkerThread) t).pool : null;
895 }
896
897 /**
898 * Returns {@code true} if the current thread is executing as a
899 * ForkJoinPool computation.
900 *
901 * @return {@code true} if the current thread is executing as a
902 * ForkJoinPool computation, or false otherwise
903 */
904 public static boolean inForkJoinPool() {
905 return Thread.currentThread() instanceof ForkJoinWorkerThread;
906 }
907
908 /**
909 * Tries to unschedule this task for execution. This method will
910 * typically succeed if this task is the most recently forked task
911 * by the current thread, and has not commenced executing in
912 * another thread. This method may be useful when arranging
913 * alternative local processing of tasks that could have been, but
914 * were not, stolen.
915 *
916 * <p>This method may be invoked only from within {@code
917 * ForkJoinTask} computations (as may be determined using method
918 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
919 * result in exceptions or errors, possibly including {@code
920 * ClassCastException}.
921 *
922 * @return {@code true} if unforked
923 */
924 public boolean tryUnfork() {
925 return ((ForkJoinWorkerThread) Thread.currentThread())
926 .unpushTask(this);
927 }
928
929 /**
930 * Returns an estimate of the number of tasks that have been
931 * forked by the current worker thread but not yet executed. This
932 * value may be useful for heuristic decisions about whether to
933 * fork other tasks.
934 *
935 * <p>This method may be invoked only from within {@code
936 * ForkJoinTask} computations (as may be determined using method
937 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
938 * result in exceptions or errors, possibly including {@code
939 * ClassCastException}.
940 *
941 * @return the number of tasks
942 */
943 public static int getQueuedTaskCount() {
944 return ((ForkJoinWorkerThread) Thread.currentThread())
945 .getQueueSize();
946 }
947
948 /**
949 * Returns an estimate of how many more locally queued tasks are
950 * held by the current worker thread than there are other worker
951 * threads that might steal them. This value may be useful for
952 * heuristic decisions about whether to fork other tasks. In many
953 * usages of ForkJoinTasks, at steady state, each worker should
954 * aim to maintain a small constant surplus (for example, 3) of
955 * tasks, and to process computations locally if this threshold is
956 * exceeded.
957 *
958 * <p>This method may be invoked only from within {@code
959 * ForkJoinTask} computations (as may be determined using method
960 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
961 * result in exceptions or errors, possibly including {@code
962 * ClassCastException}.
963 *
964 * @return the surplus number of tasks, which may be negative
965 */
966 public static int getSurplusQueuedTaskCount() {
967 return ((ForkJoinWorkerThread) Thread.currentThread())
968 .getEstimatedSurplusTaskCount();
969 }
970
971 // Extension methods
972
973 /**
974 * Returns the result that would be returned by {@link #join}, even
975 * if this task completed abnormally, or {@code null} if this task
976 * is not known to have been completed. This method is designed
977 * to aid debugging, as well as to support extensions. Its use in
978 * any other context is discouraged.
979 *
997 * is considered to be done normally. It may return false in
998 * asynchronous actions that require explicit invocations of
999 * {@link #complete} to become joinable. It may also throw an
1000 * (unchecked) exception to indicate abnormal exit.
1001 *
1002 * @return {@code true} if completed normally
1003 */
1004 protected abstract boolean exec();
1005
1006 /**
1007 * Returns, but does not unschedule or execute, a task queued by
1008 * the current thread but not yet executed, if one is immediately
1009 * available. There is no guarantee that this task will actually
1010 * be polled or executed next. Conversely, this method may return
1011 * null even if a task exists but cannot be accessed without
1012 * contention with other threads. This method is designed
1013 * primarily to support extensions, and is unlikely to be useful
1014 * otherwise.
1015 *
1016 * <p>This method may be invoked only from within {@code
1017 * ForkJoinTask} computations (as may be determined using method
1018 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
1019 * result in exceptions or errors, possibly including {@code
1020 * ClassCastException}.
1021 *
1022 * @return the next task, or {@code null} if none are available
1023 */
1024 protected static ForkJoinTask<?> peekNextLocalTask() {
1025 return ((ForkJoinWorkerThread) Thread.currentThread())
1026 .peekTask();
1027 }
1028
1029 /**
1030 * Unschedules and returns, without executing, the next task
1031 * queued by the current thread but not yet executed. This method
1032 * is designed primarily to support extensions, and is unlikely to
1033 * be useful otherwise.
1034 *
1035 * <p>This method may be invoked only from within {@code
1036 * ForkJoinTask} computations (as may be determined using method
1037 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
1038 * result in exceptions or errors, possibly including {@code
1039 * ClassCastException}.
1040 *
1041 * @return the next task, or {@code null} if none are available
1042 */
1043 protected static ForkJoinTask<?> pollNextLocalTask() {
1044 return ((ForkJoinWorkerThread) Thread.currentThread())
1045 .pollLocalTask();
1046 }
1047
1048 /**
1049 * Unschedules and returns, without executing, the next task
1050 * queued by the current thread but not yet executed, if one is
1051 * available, or if not available, a task that was forked by some
1052 * other thread, if available. Availability may be transient, so a
1053 * {@code null} result does not necessarily imply quiescence
1054 * of the pool this task is operating in. This method is designed
1055 * primarily to support extensions, and is unlikely to be useful
1056 * otherwise.
1057 *
1058 * <p>This method may be invoked only from within {@code
1059 * ForkJoinTask} computations (as may be determined using method
1060 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
1061 * result in exceptions or errors, possibly including {@code
1062 * ClassCastException}.
1063 *
1064 * @return a task, or {@code null} if none are available
1065 */
1066 protected static ForkJoinTask<?> pollTask() {
1067 return ((ForkJoinWorkerThread) Thread.currentThread())
1068 .pollTask();
1069 }
1070
1071 /**
1072 * Adaptor for Runnables. This implements RunnableFuture
1073 * to be compliant with AbstractExecutorService constraints
1074 * when used in ForkJoinPool.
1075 */
1076 static final class AdaptedRunnable<T> extends ForkJoinTask<T>
1077 implements RunnableFuture<T> {
1078 final Runnable runnable;
1079 final T resultOnCompletion;
|
25 /*
26 * This file is available under and governed by the GNU General Public
27 * License version 2 only, as published by the Free Software Foundation.
28 * However, the following notice accompanied the original version of this
29 * file:
30 *
31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 * Expert Group and released to the public domain, as explained at
33 * http://creativecommons.org/licenses/publicdomain
34 */
35
36 package java.util.concurrent;
37
38 import java.io.Serializable;
39 import java.util.Collection;
40 import java.util.Collections;
41 import java.util.List;
42 import java.util.RandomAccess;
43 import java.util.Map;
44 import java.util.WeakHashMap;
45 import java.util.concurrent.Callable;
46 import java.util.concurrent.CancellationException;
47 import java.util.concurrent.ExecutionException;
48 import java.util.concurrent.Executor;
49 import java.util.concurrent.ExecutorService;
50 import java.util.concurrent.Future;
51 import java.util.concurrent.RejectedExecutionException;
52 import java.util.concurrent.RunnableFuture;
53 import java.util.concurrent.TimeUnit;
54 import java.util.concurrent.TimeoutException;
55
56 /**
57 * Abstract base class for tasks that run within a {@link ForkJoinPool}.
58 * A {@code ForkJoinTask} is a thread-like entity that is much
59 * lighter weight than a normal thread. Huge numbers of tasks and
60 * subtasks may be hosted by a small number of actual threads in a
61 * ForkJoinPool, at the price of some usage limitations.
62 *
63 * <p>A "main" {@code ForkJoinTask} begins execution when submitted
64 * to a {@link ForkJoinPool}. Once started, it will usually in turn
65 * start other subtasks. As indicated by the name of this class,
66 * many programs using {@code ForkJoinTask} employ only methods
67 * {@link #fork} and {@link #join}, or derivatives such as {@link
68 * #invokeAll(ForkJoinTask...) invokeAll}. However, this class also
69 * provides a number of other methods that can come into play in
70 * advanced usages, as well as extension mechanics that allow
71 * support of new forms of fork/join processing.
72 *
73 * <p>A {@code ForkJoinTask} is a lightweight form of {@link Future}.
74 * The efficiency of {@code ForkJoinTask}s stems from a set of
122 * cancelled or encountered an exception, in which case {@link
123 * #getException} will return either the encountered exception or
124 * {@link java.util.concurrent.CancellationException}.
125 *
126 * <p>The ForkJoinTask class is not usually directly subclassed.
127 * Instead, you subclass one of the abstract classes that support a
128 * particular style of fork/join processing, typically {@link
129 * RecursiveAction} for computations that do not return results, or
130 * {@link RecursiveTask} for those that do. Normally, a concrete
131 * ForkJoinTask subclass declares fields comprising its parameters,
132 * established in a constructor, and then defines a {@code compute}
133 * method that somehow uses the control methods supplied by this base
134 * class. While these methods have {@code public} access (to allow
135 * instances of different task subclasses to call each other's
136 * methods), some of them may only be called from within other
137 * ForkJoinTasks (as may be determined using method {@link
138 * #inForkJoinPool}). Attempts to invoke them in other contexts
139 * result in exceptions or errors, possibly including
140 * {@code ClassCastException}.
141 *
142 * <p>Method {@link #join} and its variants are appropriate for use
143 * only when completion dependencies are acyclic; that is, the
144 * parallel computation can be described as a directed acyclic graph
145 * (DAG). Otherwise, executions may encounter a form of deadlock as
146 * tasks cyclically wait for each other. However, this framework
147 * supports other methods and techniques (for example the use of
148 * {@link Phaser}, {@link #helpQuiesce}, and {@link #complete}) that
149 * may be of use in constructing custom subclasses for problems that
150 * are not statically structured as DAGs.
151 *
152 * <p>Most base support methods are {@code final}, to prevent
153 * overriding of implementations that are intrinsically tied to the
154 * underlying lightweight task scheduling framework. Developers
155 * creating new basic styles of fork/join processing should minimally
156 * implement {@code protected} methods {@link #exec}, {@link
157 * #setRawResult}, and {@link #getRawResult}, while also introducing
158 * an abstract computational method that can be implemented in its
159 * subclasses, possibly relying on other {@code protected} methods
160 * provided by this class.
161 *
162 * <p>ForkJoinTasks should perform relatively small amounts of
163 * computation. Large tasks should be split into smaller subtasks,
164 * usually via recursive decomposition. As a very rough rule of thumb,
165 * a task should perform more than 100 and less than 10000 basic
166 * computational steps, and should avoid indefinite looping. If tasks
167 * are too big, then parallelism cannot improve throughput. If too
168 * small, then memory and internal task maintenance overhead may
169 * overwhelm processing.
170 *
171 * <p>This class provides {@code adapt} methods for {@link Runnable}
172 * and {@link Callable}, that may be of use when mixing execution of
173 * {@code ForkJoinTasks} with other kinds of tasks. When all tasks are
174 * of this form, consider using a pool constructed in <em>asyncMode</em>.
175 *
176 * <p>ForkJoinTasks are {@code Serializable}, which enables them to be
177 * used in extensions such as remote execution frameworks. It is
178 * sensible to serialize tasks only before or after, but not during,
179 * execution. Serialization is not relied on during execution itself.
180 *
181 * @since 1.7
182 * @author Doug Lea
183 */
184 public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
185
186 /*
187 * See the internal documentation of class ForkJoinPool for a
188 * general implementation overview. ForkJoinTasks are mainly
189 * responsible for maintaining their "status" field amidst relays
246 while ((s = status) >= 0) {
247 if (UNSAFE.compareAndSwapInt(this, statusOffset, s, completion)) {
248 if (s != 0)
249 synchronized (this) { notifyAll(); }
250 break;
251 }
252 }
253 }
254
255 /**
256 * Records exception and sets exceptional completion.
257 *
258 * @return status on exit
259 */
260 private void setExceptionalCompletion(Throwable rex) {
261 exceptionMap.put(this, rex);
262 setCompletion(EXCEPTIONAL);
263 }
264
265 /**
266 * Blocks a worker thread until completed or timed out. Called
267 * only by pool.
268 */
269 final void internalAwaitDone(long millis, int nanos) {
270 int s = status;
271 if ((s == 0 &&
272 UNSAFE.compareAndSwapInt(this, statusOffset, 0, SIGNAL)) ||
273 s > 0) {
274 try { // the odd construction reduces lock bias effects
275 synchronized (this) {
276 if (status > 0)
277 wait(millis, nanos);
278 else
279 notifyAll();
280 }
281 } catch (InterruptedException ie) {
282 cancelIfTerminating();
283 }
284 }
285 }
286
287 /**
288 * Blocks a non-worker-thread until completion.
289 */
290 private void externalAwaitDone() {
291 if (status >= 0) {
292 boolean interrupted = false;
293 synchronized (this) {
294 for (;;) {
295 int s = status;
296 if (s == 0)
297 UNSAFE.compareAndSwapInt(this, statusOffset,
298 0, SIGNAL);
299 else if (s < 0) {
300 notifyAll();
301 break;
302 }
303 else {
304 try {
305 wait();
306 } catch (InterruptedException ie) {
307 interrupted = true;
308 }
309 }
310 }
311 }
312 if (interrupted)
313 Thread.currentThread().interrupt();
314 }
315 }
316
317 /**
318 * Blocks a non-worker-thread until completion or interruption or timeout.
319 */
320 private void externalInterruptibleAwaitDone(boolean timed, long nanos)
321 throws InterruptedException {
322 if (Thread.interrupted())
323 throw new InterruptedException();
324 if (status >= 0) {
325 long startTime = timed ? System.nanoTime() : 0L;
326 synchronized (this) {
327 for (;;) {
328 long nt;
329 int s = status;
330 if (s == 0)
331 UNSAFE.compareAndSwapInt(this, statusOffset,
332 0, SIGNAL);
333 else if (s < 0) {
334 notifyAll();
335 break;
336 }
337 else if (!timed)
338 wait();
339 else if ((nt = nanos - (System.nanoTime()-startTime)) > 0L)
340 wait(nt / 1000000, (int)(nt % 1000000));
341 else
342 break;
343 }
344 }
345 }
346 }
347
348 /**
349 * Unless done, calls exec and records status if completed, but
350 * doesn't wait for completion otherwise. Primary execution method
351 * for ForkJoinWorkerThread.
352 */
353 final void quietlyExec() {
354 try {
355 if (status < 0 || !exec())
356 return;
357 } catch (Throwable rex) {
358 setExceptionalCompletion(rex);
359 return;
360 }
361 setCompletion(NORMAL); // must be outside try block
362 }
363
364 // public methods
365
366 /**
367 * Arranges to asynchronously execute this task. While it is not
368 * necessarily enforced, it is a usage error to fork a task more
369 * than once unless it has completed and been reinitialized.
370 * Subsequent modifications to the state of this task or any data
371 * it operates on are not necessarily consistently observable by
372 * any thread other than the one executing it unless preceded by a
373 * call to {@link #join} or related methods, or a call to {@link
374 * #isDone} returning {@code true}.
375 *
376 * <p>This method may be invoked only from within {@code
377 * ForkJoinPool} computations (as may be determined using method
378 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
379 * result in exceptions or errors, possibly including {@code
380 * ClassCastException}.
381 *
382 * @return {@code this}, to simplify usage
383 */
384 public final ForkJoinTask<V> fork() {
385 ((ForkJoinWorkerThread) Thread.currentThread())
386 .pushTask(this);
387 return this;
388 }
389
390 /**
391 * Returns the result of the computation when it {@link #isDone is
392 * done}. This method differs from {@link #get()} in that
393 * abnormal completion results in {@code RuntimeException} or
394 * {@code Error}, not {@code ExecutionException}, and that
395 * interrupts of the calling thread do <em>not</em> cause the
396 * method to abruptly return by throwing {@code
397 * InterruptedException}.
398 *
399 * @return the computed result
400 */
401 public final V join() {
402 quietlyJoin();
403 Throwable ex;
404 if (status < NORMAL && (ex = getException()) != null)
405 UNSAFE.throwException(ex);
406 return getRawResult();
407 }
408
409 /**
410 * Commences performing this task, awaits its completion if
411 * necessary, and returns its result, or throws an (unchecked)
412 * {@code RuntimeException} or {@code Error} if the underlying
413 * computation did so.
414 *
415 * @return the computed result
416 */
417 public final V invoke() {
419 Throwable ex;
420 if (status < NORMAL && (ex = getException()) != null)
421 UNSAFE.throwException(ex);
422 return getRawResult();
423 }
424
425 /**
426 * Forks the given tasks, returning when {@code isDone} holds for
427 * each task or an (unchecked) exception is encountered, in which
428 * case the exception is rethrown. If more than one task
429 * encounters an exception, then this method throws any one of
430 * these exceptions. If any task encounters an exception, the
431 * other may be cancelled. However, the execution status of
432 * individual tasks is not guaranteed upon exceptional return. The
433 * status of each task may be obtained using {@link
434 * #getException()} and related methods to check if they have been
435 * cancelled, completed normally or exceptionally, or left
436 * unprocessed.
437 *
438 * <p>This method may be invoked only from within {@code
439 * ForkJoinPool} computations (as may be determined using method
440 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
441 * result in exceptions or errors, possibly including {@code
442 * ClassCastException}.
443 *
444 * @param t1 the first task
445 * @param t2 the second task
446 * @throws NullPointerException if any task is null
447 */
448 public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
449 t2.fork();
450 t1.invoke();
451 t2.join();
452 }
453
454 /**
455 * Forks the given tasks, returning when {@code isDone} holds for
456 * each task or an (unchecked) exception is encountered, in which
457 * case the exception is rethrown. If more than one task
458 * encounters an exception, then this method throws any one of
459 * these exceptions. If any task encounters an exception, others
460 * may be cancelled. However, the execution status of individual
461 * tasks is not guaranteed upon exceptional return. The status of
462 * each task may be obtained using {@link #getException()} and
463 * related methods to check if they have been cancelled, completed
464 * normally or exceptionally, or left unprocessed.
465 *
466 * <p>This method may be invoked only from within {@code
467 * ForkJoinPool} computations (as may be determined using method
468 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
469 * result in exceptions or errors, possibly including {@code
470 * ClassCastException}.
471 *
472 * @param tasks the tasks
473 * @throws NullPointerException if any task is null
474 */
475 public static void invokeAll(ForkJoinTask<?>... tasks) {
476 Throwable ex = null;
477 int last = tasks.length - 1;
478 for (int i = last; i >= 0; --i) {
479 ForkJoinTask<?> t = tasks[i];
480 if (t == null) {
481 if (ex == null)
482 ex = new NullPointerException();
483 }
484 else if (i != 0)
485 t.fork();
486 else {
487 t.quietlyInvoke();
502 }
503 }
504 if (ex != null)
505 UNSAFE.throwException(ex);
506 }
507
508 /**
509 * Forks all tasks in the specified collection, returning when
510 * {@code isDone} holds for each task or an (unchecked) exception
511 * is encountered, in which case the exception is rethrown. If
512 * more than one task encounters an exception, then this method
513 * throws any one of these exceptions. If any task encounters an
514 * exception, others may be cancelled. However, the execution
515 * status of individual tasks is not guaranteed upon exceptional
516 * return. The status of each task may be obtained using {@link
517 * #getException()} and related methods to check if they have been
518 * cancelled, completed normally or exceptionally, or left
519 * unprocessed.
520 *
521 * <p>This method may be invoked only from within {@code
522 * ForkJoinPool} computations (as may be determined using method
523 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
524 * result in exceptions or errors, possibly including {@code
525 * ClassCastException}.
526 *
527 * @param tasks the collection of tasks
528 * @return the tasks argument, to simplify usage
529 * @throws NullPointerException if tasks or any element are null
530 */
531 public static <T extends ForkJoinTask<?>> Collection<T> invokeAll(Collection<T> tasks) {
532 if (!(tasks instanceof RandomAccess) || !(tasks instanceof List<?>)) {
533 invokeAll(tasks.toArray(new ForkJoinTask<?>[tasks.size()]));
534 return tasks;
535 }
536 @SuppressWarnings("unchecked")
537 List<? extends ForkJoinTask<?>> ts =
538 (List<? extends ForkJoinTask<?>>) tasks;
539 Throwable ex = null;
540 int last = ts.size() - 1;
541 for (int i = last; i >= 0; --i) {
542 ForkJoinTask<?> t = ts.get(i);
554 }
555 for (int i = 1; i <= last; ++i) {
556 ForkJoinTask<?> t = ts.get(i);
557 if (t != null) {
558 if (ex != null)
559 t.cancel(false);
560 else {
561 t.quietlyJoin();
562 if (ex == null && t.status < NORMAL)
563 ex = t.getException();
564 }
565 }
566 }
567 if (ex != null)
568 UNSAFE.throwException(ex);
569 return tasks;
570 }
571
572 /**
573 * Attempts to cancel execution of this task. This attempt will
574 * fail if the task has already completed or could not be
575 * cancelled for some other reason. If successful, and this task
576 * has not started when {@code cancel} is called, execution of
577 * this task is suppressed. After this method returns
578 * successfully, unless there is an intervening call to {@link
579 * #reinitialize}, subsequent calls to {@link #isCancelled},
580 * {@link #isDone}, and {@code cancel} will return {@code true}
581 * and calls to {@link #join} and related methods will result in
582 * {@code CancellationException}.
583 *
584 * <p>This method may be overridden in subclasses, but if so, must
585 * still ensure that these properties hold. In particular, the
586 * {@code cancel} method itself must not throw exceptions.
587 *
588 * <p>This method is designed to be invoked by <em>other</em>
589 * tasks. To terminate the current task, you can just return or
590 * throw an unchecked exception from its computation method, or
591 * invoke {@link #completeExceptionally}.
592 *
593 * @param mayInterruptIfRunning this value has no effect in the
594 * default implementation because interrupts are not used to
595 * control cancellation.
596 *
597 * @return {@code true} if this task is now cancelled
598 */
599 public boolean cancel(boolean mayInterruptIfRunning) {
600 setCompletion(CANCELLED);
601 return status == CANCELLED;
602 }
603
604 /**
605 * Cancels, ignoring any exceptions thrown by cancel. Used during
606 * worker and pool shutdown. Cancel is spec'ed not to throw any
607 * exceptions, but if it does anyway, we have no recourse during
608 * shutdown, so guard against this case.
609 */
610 final void cancelIgnoringExceptions() {
611 try {
612 cancel(false);
613 } catch (Throwable ignore) {
614 }
615 }
709 setRawResult(value);
710 } catch (Throwable rex) {
711 setExceptionalCompletion(rex);
712 return;
713 }
714 setCompletion(NORMAL);
715 }
716
717 /**
718 * Waits if necessary for the computation to complete, and then
719 * retrieves its result.
720 *
721 * @return the computed result
722 * @throws CancellationException if the computation was cancelled
723 * @throws ExecutionException if the computation threw an
724 * exception
725 * @throws InterruptedException if the current thread is not a
726 * member of a ForkJoinPool and was interrupted while waiting
727 */
728 public final V get() throws InterruptedException, ExecutionException {
729 Thread t = Thread.currentThread();
730 if (t instanceof ForkJoinWorkerThread)
731 quietlyJoin();
732 else
733 externalInterruptibleAwaitDone(false, 0L);
734 int s = status;
735 if (s != NORMAL) {
736 Throwable ex;
737 if (s == CANCELLED)
738 throw new CancellationException();
739 if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
740 throw new ExecutionException(ex);
741 }
742 return getRawResult();
743 }
744
745 /**
746 * Waits if necessary for at most the given time for the computation
747 * to complete, and then retrieves its result, if available.
748 *
749 * @param timeout the maximum time to wait
750 * @param unit the time unit of the timeout argument
751 * @return the computed result
752 * @throws CancellationException if the computation was cancelled
753 * @throws ExecutionException if the computation threw an
754 * exception
755 * @throws InterruptedException if the current thread is not a
756 * member of a ForkJoinPool and was interrupted while waiting
757 * @throws TimeoutException if the wait timed out
758 */
759 public final V get(long timeout, TimeUnit unit)
760 throws InterruptedException, ExecutionException, TimeoutException {
761 long nanos = unit.toNanos(timeout);
762 Thread t = Thread.currentThread();
763 if (t instanceof ForkJoinWorkerThread)
764 ((ForkJoinWorkerThread)t).joinTask(this, true, nanos);
765 else
766 externalInterruptibleAwaitDone(true, nanos);
767 int s = status;
768 if (s != NORMAL) {
769 Throwable ex;
770 if (s == CANCELLED)
771 throw new CancellationException();
772 if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
773 throw new ExecutionException(ex);
774 throw new TimeoutException();
775 }
776 return getRawResult();
777 }
778
779 /**
780 * Joins this task, without returning its result or throwing its
781 * exception. This method may be useful when processing
782 * collections of tasks when some have been cancelled or otherwise
783 * known to have aborted.
784 */
785 public final void quietlyJoin() {
786 Thread t;
787 if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
788 ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
789 if (status >= 0) {
790 if (w.unpushTask(this)) {
791 boolean completed;
792 try {
793 completed = exec();
794 } catch (Throwable rex) {
795 setExceptionalCompletion(rex);
796 return;
797 }
798 if (completed) {
799 setCompletion(NORMAL);
800 return;
801 }
802 }
803 w.joinTask(this, false, 0L);
804 }
805 }
806 else
807 externalAwaitDone();
808 }
809
810 /**
811 * Commences performing this task and awaits its completion if
812 * necessary, without returning its result or throwing its
813 * exception.
814 */
815 public final void quietlyInvoke() {
816 if (status >= 0) {
817 boolean completed;
818 try {
819 completed = exec();
820 } catch (Throwable rex) {
821 setExceptionalCompletion(rex);
822 return;
823 }
824 if (completed)
825 setCompletion(NORMAL);
826 else
827 quietlyJoin();
828 }
829 }
830
831 /**
832 * Possibly executes tasks until the pool hosting the current task
833 * {@link ForkJoinPool#isQuiescent is quiescent}. This method may
834 * be of use in designs in which many tasks are forked, but none
835 * are explicitly joined, instead executing them until all are
836 * processed.
837 *
838 * <p>This method may be invoked only from within {@code
839 * ForkJoinPool} computations (as may be determined using method
840 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
841 * result in exceptions or errors, possibly including {@code
842 * ClassCastException}.
843 */
844 public static void helpQuiesce() {
845 ((ForkJoinWorkerThread) Thread.currentThread())
846 .helpQuiescePool();
847 }
848
849 /**
850 * Resets the internal bookkeeping state of this task, allowing a
851 * subsequent {@code fork}. This method allows repeated reuse of
852 * this task, but only if reuse occurs when this task has either
853 * never been forked, or has been forked, then completed and all
854 * outstanding joins of this task have also completed. Effects
855 * under any other usage conditions are not guaranteed.
856 * This method may be useful when executing
857 * pre-constructed trees of subtasks in loops.
858 *
859 * <p>Upon completion of this method, {@code isDone()} reports
860 * {@code false}, and {@code getException()} reports {@code
861 * null}. However, the value returned by {@code getRawResult} is
862 * unaffected. To clear this value, you can invoke {@code
863 * setRawResult(null)}.
864 */
865 public void reinitialize() {
866 if (status == EXCEPTIONAL)
867 exceptionMap.remove(this);
868 status = 0;
869 }
870
871 /**
872 * Returns the pool hosting the current task execution, or null
873 * if this task is executing outside of any ForkJoinPool.
874 *
875 * @see #inForkJoinPool
876 * @return the pool, or {@code null} if none
877 */
878 public static ForkJoinPool getPool() {
879 Thread t = Thread.currentThread();
880 return (t instanceof ForkJoinWorkerThread) ?
881 ((ForkJoinWorkerThread) t).pool : null;
882 }
883
884 /**
885 * Returns {@code true} if the current thread is a {@link
886 * ForkJoinWorkerThread} executing as a ForkJoinPool computation.
887 *
888 * @return {@code true} if the current thread is a {@link
889 * ForkJoinWorkerThread} executing as a ForkJoinPool computation,
890 * or {@code false} otherwise
891 */
892 public static boolean inForkJoinPool() {
893 return Thread.currentThread() instanceof ForkJoinWorkerThread;
894 }
895
896 /**
897 * Tries to unschedule this task for execution. This method will
898 * typically succeed if this task is the most recently forked task
899 * by the current thread, and has not commenced executing in
900 * another thread. This method may be useful when arranging
901 * alternative local processing of tasks that could have been, but
902 * were not, stolen.
903 *
904 * <p>This method may be invoked only from within {@code
905 * ForkJoinPool} computations (as may be determined using method
906 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
907 * result in exceptions or errors, possibly including {@code
908 * ClassCastException}.
909 *
910 * @return {@code true} if unforked
911 */
912 public boolean tryUnfork() {
913 return ((ForkJoinWorkerThread) Thread.currentThread())
914 .unpushTask(this);
915 }
916
917 /**
918 * Returns an estimate of the number of tasks that have been
919 * forked by the current worker thread but not yet executed. This
920 * value may be useful for heuristic decisions about whether to
921 * fork other tasks.
922 *
923 * <p>This method may be invoked only from within {@code
924 * ForkJoinPool} computations (as may be determined using method
925 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
926 * result in exceptions or errors, possibly including {@code
927 * ClassCastException}.
928 *
929 * @return the number of tasks
930 */
931 public static int getQueuedTaskCount() {
932 return ((ForkJoinWorkerThread) Thread.currentThread())
933 .getQueueSize();
934 }
935
936 /**
937 * Returns an estimate of how many more locally queued tasks are
938 * held by the current worker thread than there are other worker
939 * threads that might steal them. This value may be useful for
940 * heuristic decisions about whether to fork other tasks. In many
941 * usages of ForkJoinTasks, at steady state, each worker should
942 * aim to maintain a small constant surplus (for example, 3) of
943 * tasks, and to process computations locally if this threshold is
944 * exceeded.
945 *
946 * <p>This method may be invoked only from within {@code
947 * ForkJoinPool} computations (as may be determined using method
948 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
949 * result in exceptions or errors, possibly including {@code
950 * ClassCastException}.
951 *
952 * @return the surplus number of tasks, which may be negative
953 */
954 public static int getSurplusQueuedTaskCount() {
955 return ((ForkJoinWorkerThread) Thread.currentThread())
956 .getEstimatedSurplusTaskCount();
957 }
958
959 // Extension methods
960
961 /**
962 * Returns the result that would be returned by {@link #join}, even
963 * if this task completed abnormally, or {@code null} if this task
964 * is not known to have been completed. This method is designed
965 * to aid debugging, as well as to support extensions. Its use in
966 * any other context is discouraged.
967 *
985 * is considered to be done normally. It may return false in
986 * asynchronous actions that require explicit invocations of
987 * {@link #complete} to become joinable. It may also throw an
988 * (unchecked) exception to indicate abnormal exit.
989 *
990 * @return {@code true} if completed normally
991 */
992 protected abstract boolean exec();
993
994 /**
995 * Returns, but does not unschedule or execute, a task queued by
996 * the current thread but not yet executed, if one is immediately
997 * available. There is no guarantee that this task will actually
998 * be polled or executed next. Conversely, this method may return
999 * null even if a task exists but cannot be accessed without
1000 * contention with other threads. This method is designed
1001 * primarily to support extensions, and is unlikely to be useful
1002 * otherwise.
1003 *
1004 * <p>This method may be invoked only from within {@code
1005 * ForkJoinPool} computations (as may be determined using method
1006 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
1007 * result in exceptions or errors, possibly including {@code
1008 * ClassCastException}.
1009 *
1010 * @return the next task, or {@code null} if none are available
1011 */
1012 protected static ForkJoinTask<?> peekNextLocalTask() {
1013 return ((ForkJoinWorkerThread) Thread.currentThread())
1014 .peekTask();
1015 }
1016
1017 /**
1018 * Unschedules and returns, without executing, the next task
1019 * queued by the current thread but not yet executed. This method
1020 * is designed primarily to support extensions, and is unlikely to
1021 * be useful otherwise.
1022 *
1023 * <p>This method may be invoked only from within {@code
1024 * ForkJoinPool} computations (as may be determined using method
1025 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
1026 * result in exceptions or errors, possibly including {@code
1027 * ClassCastException}.
1028 *
1029 * @return the next task, or {@code null} if none are available
1030 */
1031 protected static ForkJoinTask<?> pollNextLocalTask() {
1032 return ((ForkJoinWorkerThread) Thread.currentThread())
1033 .pollLocalTask();
1034 }
1035
1036 /**
1037 * Unschedules and returns, without executing, the next task
1038 * queued by the current thread but not yet executed, if one is
1039 * available, or if not available, a task that was forked by some
1040 * other thread, if available. Availability may be transient, so a
1041 * {@code null} result does not necessarily imply quiescence
1042 * of the pool this task is operating in. This method is designed
1043 * primarily to support extensions, and is unlikely to be useful
1044 * otherwise.
1045 *
1046 * <p>This method may be invoked only from within {@code
1047 * ForkJoinPool} computations (as may be determined using method
1048 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
1049 * result in exceptions or errors, possibly including {@code
1050 * ClassCastException}.
1051 *
1052 * @return a task, or {@code null} if none are available
1053 */
1054 protected static ForkJoinTask<?> pollTask() {
1055 return ((ForkJoinWorkerThread) Thread.currentThread())
1056 .pollTask();
1057 }
1058
1059 /**
1060 * Adaptor for Runnables. This implements RunnableFuture
1061 * to be compliant with AbstractExecutorService constraints
1062 * when used in ForkJoinPool.
1063 */
1064 static final class AdaptedRunnable<T> extends ForkJoinTask<T>
1065 implements RunnableFuture<T> {
1066 final Runnable runnable;
1067 final T resultOnCompletion;
|