202 *
203 * @since 1.7
204 * @author Doug Lea
205 */
206 public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
207
208 /*
209 * See the internal documentation of class ForkJoinPool for a
210 * general implementation overview. ForkJoinTasks are mainly
211 * responsible for maintaining their "status" field amidst relays
212 * to methods in ForkJoinWorkerThread and ForkJoinPool.
213 *
214 * The methods of this class are more-or-less layered into
215 * (1) basic status maintenance
216 * (2) execution and awaiting completion
217 * (3) user-level methods that additionally report results.
218 * This is sometimes hard to see because this file orders exported
219 * methods in a way that flows well in javadocs.
220 */
221
222 /*
223 * The status field holds run control status bits packed into a
224 * single int to minimize footprint and to ensure atomicity (via
225 * CAS). Status is initially zero, and takes on nonnegative
226 * values until completed, upon which status (anded with
227 * DONE_MASK) holds value NORMAL, CANCELLED, or EXCEPTIONAL. Tasks
228 * undergoing blocking waits by other threads have the SIGNAL bit
229 * set. Completion of a stolen task with SIGNAL set awakens any
230 * waiters via notifyAll. Even though suboptimal for some
231 * purposes, we use basic builtin wait/notify to take advantage of
232 * "monitor inflation" in JVMs that we would otherwise need to
233 * emulate to avoid adding further per-task bookkeeping overhead.
234 * We want these monitors to be "fat", i.e., not use biasing or
235 * thin-lock techniques, so use some odd coding idioms that tend
236 * to avoid them, mainly by arranging that every synchronized
237 * block performs a wait, notifyAll or both.
238 *
239 * These control bits occupy only (some of) the upper half (16
240 * bits) of status field. The lower bits are used for user-defined
241 * tags.
242 */
243
244 /** The run status of this task */
245 volatile int status; // accessed directly by pool and workers
246 static final int DONE_MASK = 0xf0000000; // mask out non-completion bits
247 static final int NORMAL = 0xf0000000; // must be negative
248 static final int CANCELLED = 0xc0000000; // must be < NORMAL
249 static final int EXCEPTIONAL = 0x80000000; // must be < CANCELLED
250 static final int SIGNAL = 0x00010000; // must be >= 1 << 16
251 static final int SMASK = 0x0000ffff; // short bits for tags
252
253 /**
254 * Marks completion and wakes up threads waiting to join this
255 * task.
256 *
257 * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
258 * @return completion status on exit
259 */
260 private int setCompletion(int completion) {
261 for (int s;;) {
262 if ((s = status) < 0)
263 return s;
264 if (STATUS.compareAndSet(this, s, s | completion)) {
265 if ((s >>> 16) != 0)
266 synchronized (this) { notifyAll(); }
267 return completion;
268 }
269 }
270 }
271
272 /**
273 * Primary execution method for stolen tasks. Unless done, calls
274 * exec and records status if completed, but doesn't wait for
275 * completion otherwise.
276 *
277 * @return status on exit from this method
278 */
279 final int doExec() {
280 int s; boolean completed;
281 if ((s = status) >= 0) {
282 try {
283 completed = exec();
284 } catch (Throwable rex) {
285 return setExceptionalCompletion(rex);
286 }
287 if (completed)
288 s = setCompletion(NORMAL);
289 }
290 return s;
291 }
292
293 /**
294 * If not done, sets SIGNAL status and performs Object.wait(timeout).
295 * This task may or may not be done on exit. Ignores interrupts.
296 *
297 * @param timeout using Object.wait conventions.
298 */
299 final void internalWait(long timeout) {
300 int s;
301 if ((s = status) >= 0 && // force completer to issue notify
302 STATUS.compareAndSet(this, s, s | SIGNAL)) {
303 synchronized (this) {
304 if (status >= 0)
305 try { wait(timeout); } catch (InterruptedException ie) { }
306 else
307 notifyAll();
308 }
309 }
310 }
311
312 /**
313 * Blocks a non-worker-thread until completion.
314 * @return status upon completion
315 */
316 private int externalAwaitDone() {
317 int s = ((this instanceof CountedCompleter) ? // try helping
318 ForkJoinPool.common.externalHelpComplete(
319 (CountedCompleter<?>)this, 0) :
320 ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);
321 if (s >= 0 && (s = status) >= 0) {
322 boolean interrupted = false;
323 do {
324 if (STATUS.compareAndSet(this, s, s | SIGNAL)) {
325 synchronized (this) {
326 if (status >= 0) {
327 try {
328 wait(0L);
329 } catch (InterruptedException ie) {
330 interrupted = true;
331 }
332 }
333 else
334 notifyAll();
335 }
336 }
337 } while ((s = status) >= 0);
338 if (interrupted)
339 Thread.currentThread().interrupt();
340 }
341 return s;
342 }
343
344 /**
345 * Blocks a non-worker-thread until completion or interruption.
346 */
347 private int externalInterruptibleAwaitDone() throws InterruptedException {
348 int s;
349 if (Thread.interrupted())
350 throw new InterruptedException();
351 if ((s = status) >= 0 &&
352 (s = ((this instanceof CountedCompleter) ?
353 ForkJoinPool.common.externalHelpComplete(
354 (CountedCompleter<?>)this, 0) :
355 ForkJoinPool.common.tryExternalUnpush(this) ? doExec() :
356 0)) >= 0) {
357 while ((s = status) >= 0) {
358 if (STATUS.compareAndSet(this, s, s | SIGNAL)) {
359 synchronized (this) {
360 if (status >= 0)
361 wait(0L);
362 else
363 notifyAll();
364 }
365 }
366 }
367 }
368 return s;
369 }
370
371 /**
372 * Implementation for join, get, quietlyJoin. Directly handles
373 * only cases of already-completed, external wait, and
374 * unfork+exec. Others are relayed to ForkJoinPool.awaitJoin.
375 *
376 * @return status upon completion
377 */
378 private int doJoin() {
379 int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
380 return (s = status) < 0 ? s :
381 ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
382 (w = (wt = (ForkJoinWorkerThread)t).workQueue).
383 tryUnpush(this) && (s = doExec()) < 0 ? s :
384 wt.pool.awaitJoin(w, this, 0L) :
385 externalAwaitDone();
386 }
387
388 /**
389 * Implementation for invoke, quietlyInvoke.
390 *
391 * @return status upon completion
458 if ((s = status) >= 0) {
459 int h = System.identityHashCode(this);
460 final ReentrantLock lock = exceptionTableLock;
461 lock.lock();
462 try {
463 expungeStaleExceptions();
464 ExceptionNode[] t = exceptionTable;
465 int i = h & (t.length - 1);
466 for (ExceptionNode e = t[i]; ; e = e.next) {
467 if (e == null) {
468 t[i] = new ExceptionNode(this, ex, t[i],
469 exceptionTableRefQueue);
470 break;
471 }
472 if (e.get() == this) // already present
473 break;
474 }
475 } finally {
476 lock.unlock();
477 }
478 s = setCompletion(EXCEPTIONAL);
479 }
480 return s;
481 }
482
483 /**
484 * Records exception and possibly propagates.
485 *
486 * @return status on exit
487 */
488 private int setExceptionalCompletion(Throwable ex) {
489 int s = recordExceptionalCompletion(ex);
490 if ((s & DONE_MASK) == EXCEPTIONAL)
491 internalPropagateException(ex);
492 return s;
493 }
494
495 /**
496 * Hook for exception propagation support for tasks with completers.
497 */
498 void internalPropagateException(Throwable ex) {
499 }
500
501 /**
502 * Cancels, ignoring any exceptions thrown by cancel. Used during
503 * worker and pool shutdown. Cancel is spec'ed not to throw any
504 * exceptions, but if it does anyway, we have no recourse during
505 * shutdown, so guard against this case.
506 */
507 static final void cancelIgnoringExceptions(ForkJoinTask<?> t) {
508 if (t != null && t.status >= 0) {
509 try {
510 t.cancel(false);
645 ForkJoinTask.<RuntimeException>uncheckedThrow(ex);
646 }
647
648 /**
649 * The sneaky part of sneaky throw, relying on generics
650 * limitations to evade compiler complaints about rethrowing
651 * unchecked exceptions.
652 */
653 @SuppressWarnings("unchecked") static <T extends Throwable>
654 void uncheckedThrow(Throwable t) throws T {
655 if (t != null)
656 throw (T)t; // rely on vacuous cast
657 else
658 throw new Error("Unknown Exception");
659 }
660
661 /**
662 * Throws exception, if any, associated with the given status.
663 */
664 private void reportException(int s) {
665 if (s == CANCELLED)
666 throw new CancellationException();
667 if (s == EXCEPTIONAL)
668 rethrow(getThrowableException());
669 }
670
671 // public methods
672
673 /**
674 * Arranges to asynchronously execute this task in the pool the
675 * current task is running in, if applicable, or using the {@link
676 * ForkJoinPool#commonPool()} if not {@link #inForkJoinPool}. While
677 * it is not necessarily enforced, it is a usage error to fork a
678 * task more than once unless it has completed and been
679 * reinitialized. Subsequent modifications to the state of this
680 * task or any data it operates on are not necessarily
681 * consistently observable by any thread other than the one
682 * executing it unless preceded by a call to {@link #join} or
683 * related methods, or a call to {@link #isDone} returning {@code
684 * true}.
685 *
686 * @return {@code this}, to simplify usage
687 */
688 public final ForkJoinTask<V> fork() {
690 if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
691 ((ForkJoinWorkerThread)t).workQueue.push(this);
692 else
693 ForkJoinPool.common.externalPush(this);
694 return this;
695 }
696
697 /**
698 * Returns the result of the computation when it
699 * {@linkplain #isDone is done}.
700 * This method differs from {@link #get()} in that abnormal
701 * completion results in {@code RuntimeException} or {@code Error},
702 * not {@code ExecutionException}, and that interrupts of the
703 * calling thread do <em>not</em> cause the method to abruptly
704 * return by throwing {@code InterruptedException}.
705 *
706 * @return the computed result
707 */
708 public final V join() {
709 int s;
710 if ((s = doJoin() & DONE_MASK) != NORMAL)
711 reportException(s);
712 return getRawResult();
713 }
714
715 /**
716 * Commences performing this task, awaits its completion if
717 * necessary, and returns its result, or throws an (unchecked)
718 * {@code RuntimeException} or {@code Error} if the underlying
719 * computation did so.
720 *
721 * @return the computed result
722 */
723 public final V invoke() {
724 int s;
725 if ((s = doInvoke() & DONE_MASK) != NORMAL)
726 reportException(s);
727 return getRawResult();
728 }
729
730 /**
731 * Forks the given tasks, returning when {@code isDone} holds for
732 * each task or an (unchecked) exception is encountered, in which
733 * case the exception is rethrown. If more than one task
734 * encounters an exception, then this method throws any one of
735 * these exceptions. If any task encounters an exception, the
736 * other may be cancelled. However, the execution status of
737 * individual tasks is not guaranteed upon exceptional return. The
738 * status of each task may be obtained using {@link
739 * #getException()} and related methods to check if they have been
740 * cancelled, completed normally or exceptionally, or left
741 * unprocessed.
742 *
743 * @param t1 the first task
744 * @param t2 the second task
745 * @throws NullPointerException if any task is null
746 */
747 public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
748 int s1, s2;
749 t2.fork();
750 if ((s1 = t1.doInvoke() & DONE_MASK) != NORMAL)
751 t1.reportException(s1);
752 if ((s2 = t2.doJoin() & DONE_MASK) != NORMAL)
753 t2.reportException(s2);
754 }
755
756 /**
757 * Forks the given tasks, returning when {@code isDone} holds for
758 * each task or an (unchecked) exception is encountered, in which
759 * case the exception is rethrown. If more than one task
760 * encounters an exception, then this method throws any one of
761 * these exceptions. If any task encounters an exception, others
762 * may be cancelled. However, the execution status of individual
763 * tasks is not guaranteed upon exceptional return. The status of
764 * each task may be obtained using {@link #getException()} and
765 * related methods to check if they have been cancelled, completed
766 * normally or exceptionally, or left unprocessed.
767 *
768 * @param tasks the tasks
769 * @throws NullPointerException if any task is null
770 */
771 public static void invokeAll(ForkJoinTask<?>... tasks) {
772 Throwable ex = null;
773 int last = tasks.length - 1;
774 for (int i = last; i >= 0; --i) {
775 ForkJoinTask<?> t = tasks[i];
776 if (t == null) {
777 if (ex == null)
778 ex = new NullPointerException();
779 }
780 else if (i != 0)
781 t.fork();
782 else if (t.doInvoke() < NORMAL && ex == null)
783 ex = t.getException();
784 }
785 for (int i = 1; i <= last; ++i) {
786 ForkJoinTask<?> t = tasks[i];
787 if (t != null) {
788 if (ex != null)
789 t.cancel(false);
790 else if (t.doJoin() < NORMAL)
791 ex = t.getException();
792 }
793 }
794 if (ex != null)
795 rethrow(ex);
796 }
797
798 /**
799 * Forks all tasks in the specified collection, returning when
800 * {@code isDone} holds for each task or an (unchecked) exception
801 * is encountered, in which case the exception is rethrown. If
802 * more than one task encounters an exception, then this method
803 * throws any one of these exceptions. If any task encounters an
804 * exception, others may be cancelled. However, the execution
805 * status of individual tasks is not guaranteed upon exceptional
806 * return. The status of each task may be obtained using {@link
807 * #getException()} and related methods to check if they have been
808 * cancelled, completed normally or exceptionally, or left
809 * unprocessed.
810 *
814 * @throws NullPointerException if tasks or any element are null
815 */
816 public static <T extends ForkJoinTask<?>> Collection<T> invokeAll(Collection<T> tasks) {
817 if (!(tasks instanceof RandomAccess) || !(tasks instanceof List<?>)) {
818 invokeAll(tasks.toArray(new ForkJoinTask<?>[tasks.size()]));
819 return tasks;
820 }
821 @SuppressWarnings("unchecked")
822 List<? extends ForkJoinTask<?>> ts =
823 (List<? extends ForkJoinTask<?>>) tasks;
824 Throwable ex = null;
825 int last = ts.size() - 1;
826 for (int i = last; i >= 0; --i) {
827 ForkJoinTask<?> t = ts.get(i);
828 if (t == null) {
829 if (ex == null)
830 ex = new NullPointerException();
831 }
832 else if (i != 0)
833 t.fork();
834 else if (t.doInvoke() < NORMAL && ex == null)
835 ex = t.getException();
836 }
837 for (int i = 1; i <= last; ++i) {
838 ForkJoinTask<?> t = ts.get(i);
839 if (t != null) {
840 if (ex != null)
841 t.cancel(false);
842 else if (t.doJoin() < NORMAL)
843 ex = t.getException();
844 }
845 }
846 if (ex != null)
847 rethrow(ex);
848 return tasks;
849 }
850
851 /**
852 * Attempts to cancel execution of this task. This attempt will
853 * fail if the task has already completed or could not be
854 * cancelled for some other reason. If successful, and this task
855 * has not started when {@code cancel} is called, execution of
856 * this task is suppressed. After this method returns
857 * successfully, unless there is an intervening call to {@link
858 * #reinitialize}, subsequent calls to {@link #isCancelled},
859 * {@link #isDone}, and {@code cancel} will return {@code true}
860 * and calls to {@link #join} and related methods will result in
861 * {@code CancellationException}.
862 *
863 * <p>This method may be overridden in subclasses, but if so, must
864 * still ensure that these properties hold. In particular, the
865 * {@code cancel} method itself must not throw exceptions.
866 *
867 * <p>This method is designed to be invoked by <em>other</em>
868 * tasks. To terminate the current task, you can just return or
869 * throw an unchecked exception from its computation method, or
870 * invoke {@link #completeExceptionally(Throwable)}.
871 *
872 * @param mayInterruptIfRunning this value has no effect in the
873 * default implementation because interrupts are not used to
874 * control cancellation.
875 *
876 * @return {@code true} if this task is now cancelled
877 */
878 public boolean cancel(boolean mayInterruptIfRunning) {
879 return (setCompletion(CANCELLED) & DONE_MASK) == CANCELLED;
880 }
881
882 public final boolean isDone() {
883 return status < 0;
884 }
885
886 public final boolean isCancelled() {
887 return (status & DONE_MASK) == CANCELLED;
888 }
889
890 /**
891 * Returns {@code true} if this task threw an exception or was cancelled.
892 *
893 * @return {@code true} if this task threw an exception or was cancelled
894 */
895 public final boolean isCompletedAbnormally() {
896 return status < NORMAL;
897 }
898
899 /**
900 * Returns {@code true} if this task completed without throwing an
901 * exception and was not cancelled.
902 *
903 * @return {@code true} if this task completed without throwing an
904 * exception and was not cancelled
905 */
906 public final boolean isCompletedNormally() {
907 return (status & DONE_MASK) == NORMAL;
908 }
909
910 /**
911 * Returns the exception thrown by the base computation, or a
912 * {@code CancellationException} if cancelled, or {@code null} if
913 * none or if the method has not yet completed.
914 *
915 * @return the exception, or {@code null} if none
916 */
917 public final Throwable getException() {
918 int s = status & DONE_MASK;
919 return ((s >= NORMAL) ? null :
920 (s == CANCELLED) ? new CancellationException() :
921 getThrowableException());
922 }
923
924 /**
925 * Completes this task abnormally, and if not already aborted or
926 * cancelled, causes it to throw the given exception upon
927 * {@code join} and related operations. This method may be used
928 * to induce exceptions in asynchronous tasks, or to force
929 * completion of tasks that would not otherwise complete. Its use
930 * in other situations is discouraged. This method is
931 * overridable, but overridden versions must invoke {@code super}
932 * implementation to maintain guarantees.
933 *
934 * @param ex the exception to throw. If this exception is not a
935 * {@code RuntimeException} or {@code Error}, the actual exception
936 * thrown will be a {@code RuntimeException} with cause {@code ex}.
937 */
938 public void completeExceptionally(Throwable ex) {
939 setExceptionalCompletion((ex instanceof RuntimeException) ||
940 (ex instanceof Error) ? ex :
944 /**
945 * Completes this task, and if not already aborted or cancelled,
946 * returning the given value as the result of subsequent
947 * invocations of {@code join} and related operations. This method
948 * may be used to provide results for asynchronous tasks, or to
949 * provide alternative handling for tasks that would not otherwise
950 * complete normally. Its use in other situations is
951 * discouraged. This method is overridable, but overridden
952 * versions must invoke {@code super} implementation to maintain
953 * guarantees.
954 *
955 * @param value the result value for this task
956 */
957 public void complete(V value) {
958 try {
959 setRawResult(value);
960 } catch (Throwable rex) {
961 setExceptionalCompletion(rex);
962 return;
963 }
964 setCompletion(NORMAL);
965 }
966
967 /**
968 * Completes this task normally without setting a value. The most
969 * recent value established by {@link #setRawResult} (or {@code
970 * null} by default) will be returned as the result of subsequent
971 * invocations of {@code join} and related operations.
972 *
973 * @since 1.8
974 */
975 public final void quietlyComplete() {
976 setCompletion(NORMAL);
977 }
978
979 /**
980 * Waits if necessary for the computation to complete, and then
981 * retrieves its result.
982 *
983 * @return the computed result
984 * @throws CancellationException if the computation was cancelled
985 * @throws ExecutionException if the computation threw an
986 * exception
987 * @throws InterruptedException if the current thread is not a
988 * member of a ForkJoinPool and was interrupted while waiting
989 */
990 public final V get() throws InterruptedException, ExecutionException {
991 int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
992 doJoin() : externalInterruptibleAwaitDone();
993 if ((s &= DONE_MASK) == CANCELLED)
994 throw new CancellationException();
995 if (s == EXCEPTIONAL)
996 throw new ExecutionException(getThrowableException());
997 return getRawResult();
998 }
999
1000 /**
1001 * Waits if necessary for at most the given time for the computation
1002 * to complete, and then retrieves its result, if available.
1003 *
1004 * @param timeout the maximum time to wait
1005 * @param unit the time unit of the timeout argument
1006 * @return the computed result
1007 * @throws CancellationException if the computation was cancelled
1008 * @throws ExecutionException if the computation threw an
1009 * exception
1010 * @throws InterruptedException if the current thread is not a
1011 * member of a ForkJoinPool and was interrupted while waiting
1012 * @throws TimeoutException if the wait timed out
1013 */
1014 public final V get(long timeout, TimeUnit unit)
1015 throws InterruptedException, ExecutionException, TimeoutException {
1016 int s;
1017 long nanos = unit.toNanos(timeout);
1018 if (Thread.interrupted())
1019 throw new InterruptedException();
1020 if ((s = status) >= 0 && nanos > 0L) {
1021 long d = System.nanoTime() + nanos;
1022 long deadline = (d == 0L) ? 1L : d; // avoid 0
1023 Thread t = Thread.currentThread();
1024 if (t instanceof ForkJoinWorkerThread) {
1025 ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
1026 s = wt.pool.awaitJoin(wt.workQueue, this, deadline);
1027 }
1028 else if ((s = ((this instanceof CountedCompleter) ?
1029 ForkJoinPool.common.externalHelpComplete(
1030 (CountedCompleter<?>)this, 0) :
1031 ForkJoinPool.common.tryExternalUnpush(this) ?
1032 doExec() : 0)) >= 0) {
1033 long ns, ms; // measure in nanosecs, but wait in millisecs
1034 while ((s = status) >= 0 &&
1035 (ns = deadline - System.nanoTime()) > 0L) {
1036 if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) > 0L &&
1037 STATUS.compareAndSet(this, s, s | SIGNAL)) {
1038 synchronized (this) {
1039 if (status >= 0)
1040 wait(ms); // OK to throw InterruptedException
1041 else
1042 notifyAll();
1043 }
1044 }
1045 }
1046 }
1047 }
1048 if (s >= 0)
1049 s = status;
1050 if ((s &= DONE_MASK) != NORMAL) {
1051 if (s == CANCELLED)
1052 throw new CancellationException();
1053 if (s != EXCEPTIONAL)
1054 throw new TimeoutException();
1055 throw new ExecutionException(getThrowableException());
1056 }
1057 return getRawResult();
1058 }
1059
1060 /**
1061 * Joins this task, without returning its result or throwing its
1062 * exception. This method may be useful when processing
1063 * collections of tasks when some have been cancelled or otherwise
1064 * known to have aborted.
1065 */
1066 public final void quietlyJoin() {
1067 doJoin();
1068 }
1069
1070 /**
1071 * Commences performing this task and awaits its completion if
1072 * necessary, without returning its result or throwing its
1073 * exception.
1074 */
1075 public final void quietlyInvoke() {
1076 doInvoke();
1093 ForkJoinPool.quiesceCommonPool();
1094 }
1095
1096 /**
1097 * Resets the internal bookkeeping state of this task, allowing a
1098 * subsequent {@code fork}. This method allows repeated reuse of
1099 * this task, but only if reuse occurs when this task has either
1100 * never been forked, or has been forked, then completed and all
1101 * outstanding joins of this task have also completed. Effects
1102 * under any other usage conditions are not guaranteed.
1103 * This method may be useful when executing
1104 * pre-constructed trees of subtasks in loops.
1105 *
1106 * <p>Upon completion of this method, {@code isDone()} reports
1107 * {@code false}, and {@code getException()} reports {@code
1108 * null}. However, the value returned by {@code getRawResult} is
1109 * unaffected. To clear this value, you can invoke {@code
1110 * setRawResult(null)}.
1111 */
1112 public void reinitialize() {
1113 if ((status & DONE_MASK) == EXCEPTIONAL)
1114 clearExceptionalCompletion();
1115 else
1116 status = 0;
1117 }
1118
1119 /**
1120 * Returns the pool hosting the current thread, or {@code null}
1121 * if the current thread is executing outside of any ForkJoinPool.
1122 *
1123 * <p>This method returns {@code null} if and only if {@link
1124 * #inForkJoinPool} returns {@code false}.
1125 *
1126 * @return the pool, or {@code null} if none
1127 */
1128 public static ForkJoinPool getPool() {
1129 Thread t = Thread.currentThread();
1130 return (t instanceof ForkJoinWorkerThread) ?
1131 ((ForkJoinWorkerThread) t).pool : null;
1132 }
1133
1310
1311 /**
1312 * Returns the tag for this task.
1313 *
1314 * @return the tag for this task
1315 * @since 1.8
1316 */
1317 public final short getForkJoinTaskTag() {
1318 return (short)status;
1319 }
1320
1321 /**
1322 * Atomically sets the tag value for this task and returns the old value.
1323 *
1324 * @param newValue the new tag value
1325 * @return the previous value of the tag
1326 * @since 1.8
1327 */
1328 public final short setForkJoinTaskTag(short newValue) {
1329 for (int s;;) {
1330 if (STATUS.compareAndSet(this, s = status,
1331 (s & ~SMASK) | (newValue & SMASK)))
1332 return (short)s;
1333 }
1334 }
1335
1336 /**
1337 * Atomically conditionally sets the tag value for this task.
1338 * Among other applications, tags can be used as visit markers
1339 * in tasks operating on graphs, as in methods that check: {@code
1340 * if (task.compareAndSetForkJoinTaskTag((short)0, (short)1))}
1341 * before processing, otherwise exiting because the node has
1342 * already been visited.
1343 *
1344 * @param expect the expected tag value
1345 * @param update the new tag value
1346 * @return {@code true} if successful; i.e., the current value was
1347 * equal to {@code expect} and was changed to {@code update}.
1348 * @since 1.8
1349 */
1350 public final boolean compareAndSetForkJoinTaskTag(short expect, short update) {
1351 for (int s;;) {
1352 if ((short)(s = status) != expect)
1353 return false;
1354 if (STATUS.compareAndSet(this, s,
1355 (s & ~SMASK) | (update & SMASK)))
1356 return true;
1357 }
1358 }
1359
1360 /**
1361 * Adapter for Runnables. This implements RunnableFuture
1362 * to be compliant with AbstractExecutorService constraints
1363 * when used in ForkJoinPool.
1364 */
1365 static final class AdaptedRunnable<T> extends ForkJoinTask<T>
1366 implements RunnableFuture<T> {
1367 final Runnable runnable;
1368 T result;
1369 AdaptedRunnable(Runnable runnable, T result) {
1370 if (runnable == null) throw new NullPointerException();
1371 this.runnable = runnable;
1372 this.result = result; // OK to set this even before completion
1373 }
1374 public final T getRawResult() { return result; }
|
202 *
203 * @since 1.7
204 * @author Doug Lea
205 */
206 public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
207
208 /*
209 * See the internal documentation of class ForkJoinPool for a
210 * general implementation overview. ForkJoinTasks are mainly
211 * responsible for maintaining their "status" field amidst relays
212 * to methods in ForkJoinWorkerThread and ForkJoinPool.
213 *
214 * The methods of this class are more-or-less layered into
215 * (1) basic status maintenance
216 * (2) execution and awaiting completion
217 * (3) user-level methods that additionally report results.
218 * This is sometimes hard to see because this file orders exported
219 * methods in a way that flows well in javadocs.
220 */
221
222 /**
223 * The status field holds run control status bits packed into a
224 * single int to ensure atomicity. Status is initially zero, and
225 * takes on nonnegative values until completed, upon which it
226 * holds (sign bit) DONE, possibly with ABNORMAL (cancelled or
227 * exceptional) and THROWN (in which case an exception has been
228 * stored). Tasks with dependent blocked waiting joiners have the
229 * SIGNAL bit set. Completion of a task with SIGNAL set awakens
230 * any waiters via notifyAll. (Waiters also help signal others
231 * upon completion.)
232 *
233 * These control bits occupy only (some of) the upper half (16
234 * bits) of status field. The lower bits are used for user-defined
235 * tags.
236 */
237 volatile int status; // accessed directly by pool and workers
238
239 private static final int DONE = 1 << 31; // must be negative
240 private static final int ABNORMAL = 1 << 18; // set atomically with DONE
241 private static final int THROWN = 1 << 17; // set atomically with ABNORMAL
242 private static final int SIGNAL = 1 << 16; // true if joiner waiting
243 private static final int SMASK = 0xffff; // short bits for tags
244
245 static boolean isExceptionalStatus(int s) { // needed by subclasses
246 return (s & THROWN) != 0;
247 }
248
249 /**
250 * Sets DONE status and wakes up threads waiting to join this task.
251 *
252 * @return status on exit
253 */
254 private int setDone() {
255 int s;
256 if (((s = (int)STATUS.getAndBitwiseOr(this, DONE)) & SIGNAL) != 0)
257 synchronized (this) { notifyAll(); }
258 return s | DONE;
259 }
260
261 /**
262 * Marks cancelled or exceptional completion unless already done.
263 *
264 * @param completion must be DONE | ABNORMAL, ORed with THROWN if exceptional
265 * @return status on exit
266 */
267 private int abnormalCompletion(int completion) {
268 for (int s, ns;;) {
269 if ((s = status) < 0)
270 return s;
271 else if (STATUS.weakCompareAndSet(this, s, ns = s | completion)) {
272 if ((s & SIGNAL) != 0)
273 synchronized (this) { notifyAll(); }
274 return ns;
275 }
276 }
277 }
278
279 /**
280 * Primary execution method for stolen tasks. Unless done, calls
281 * exec and records status if completed, but doesn't wait for
282 * completion otherwise.
283 *
284 * @return status on exit from this method
285 */
286 final int doExec() {
287 int s; boolean completed;
288 if ((s = status) >= 0) {
289 try {
290 completed = exec();
291 } catch (Throwable rex) {
292 completed = false;
293 s = setExceptionalCompletion(rex);
294 }
295 if (completed)
296 s = setDone();
297 }
298 return s;
299 }
300
301 /**
302 * If not done, sets SIGNAL status and performs Object.wait(timeout).
303 * This task may or may not be done on exit. Ignores interrupts.
304 *
305 * @param timeout using Object.wait conventions.
306 */
307 final void internalWait(long timeout) {
308 if ((int)STATUS.getAndBitwiseOr(this, SIGNAL) >= 0) {
309 synchronized (this) {
310 if (status >= 0)
311 try { wait(timeout); } catch (InterruptedException ie) { }
312 else
313 notifyAll();
314 }
315 }
316 }
317
318 /**
319 * Blocks a non-worker-thread until completion.
320 * @return status upon completion
321 */
322 private int externalAwaitDone() {
323 int s = tryExternalHelp();
324 if (s >= 0 && (s = (int)STATUS.getAndBitwiseOr(this, SIGNAL)) >= 0) {
325 boolean interrupted = false;
326 synchronized (this) {
327 for (;;) {
328 if ((s = status) >= 0) {
329 try {
330 wait(0L);
331 } catch (InterruptedException ie) {
332 interrupted = true;
333 }
334 }
335 else {
336 notifyAll();
337 break;
338 }
339 }
340 }
341 if (interrupted)
342 Thread.currentThread().interrupt();
343 }
344 return s;
345 }
346
347 /**
348 * Blocks a non-worker-thread until completion or interruption.
349 */
350 private int externalInterruptibleAwaitDone() throws InterruptedException {
351 int s = tryExternalHelp();
352 if (s >= 0 && (s = (int)STATUS.getAndBitwiseOr(this, SIGNAL)) >= 0) {
353 synchronized (this) {
354 for (;;) {
355 if ((s = status) >= 0)
356 wait(0L);
357 else {
358 notifyAll();
359 break;
360 }
361 }
362 }
363 }
364 else if (Thread.interrupted())
365 throw new InterruptedException();
366 return s;
367 }
368
369 /**
370 * Tries to help with tasks allowed for external callers.
371 *
372 * @return current status
373 */
374 private int tryExternalHelp() {
375 int s;
376 return ((s = status) < 0 ? s:
377 (this instanceof CountedCompleter) ?
378 ForkJoinPool.common.externalHelpComplete(
379 (CountedCompleter<?>)this, 0) :
380 ForkJoinPool.common.tryExternalUnpush(this) ?
381 doExec() : 0);
382 }
383
384 /**
385 * Implementation for join, get, quietlyJoin. Directly handles
386 * only cases of already-completed, external wait, and
387 * unfork+exec. Others are relayed to ForkJoinPool.awaitJoin.
388 *
389 * @return status upon completion
390 */
391 private int doJoin() {
392 int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
393 return (s = status) < 0 ? s :
394 ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
395 (w = (wt = (ForkJoinWorkerThread)t).workQueue).
396 tryUnpush(this) && (s = doExec()) < 0 ? s :
397 wt.pool.awaitJoin(w, this, 0L) :
398 externalAwaitDone();
399 }
400
401 /**
402 * Implementation for invoke, quietlyInvoke.
403 *
404 * @return status upon completion
471 if ((s = status) >= 0) {
472 int h = System.identityHashCode(this);
473 final ReentrantLock lock = exceptionTableLock;
474 lock.lock();
475 try {
476 expungeStaleExceptions();
477 ExceptionNode[] t = exceptionTable;
478 int i = h & (t.length - 1);
479 for (ExceptionNode e = t[i]; ; e = e.next) {
480 if (e == null) {
481 t[i] = new ExceptionNode(this, ex, t[i],
482 exceptionTableRefQueue);
483 break;
484 }
485 if (e.get() == this) // already present
486 break;
487 }
488 } finally {
489 lock.unlock();
490 }
491 s = abnormalCompletion(DONE | ABNORMAL | THROWN);
492 }
493 return s;
494 }
495
496 /**
497 * Records exception and possibly propagates.
498 *
499 * @return status on exit
500 */
501 private int setExceptionalCompletion(Throwable ex) {
502 int s = recordExceptionalCompletion(ex);
503 if ((s & THROWN) != 0)
504 internalPropagateException(ex);
505 return s;
506 }
507
508 /**
509 * Hook for exception propagation support for tasks with completers.
510 */
511 void internalPropagateException(Throwable ex) {
512 }
513
514 /**
515 * Cancels, ignoring any exceptions thrown by cancel. Used during
516 * worker and pool shutdown. Cancel is spec'ed not to throw any
517 * exceptions, but if it does anyway, we have no recourse during
518 * shutdown, so guard against this case.
519 */
520 static final void cancelIgnoringExceptions(ForkJoinTask<?> t) {
521 if (t != null && t.status >= 0) {
522 try {
523 t.cancel(false);
658 ForkJoinTask.<RuntimeException>uncheckedThrow(ex);
659 }
660
661 /**
662 * The sneaky part of sneaky throw, relying on generics
663 * limitations to evade compiler complaints about rethrowing
664 * unchecked exceptions.
665 */
666 @SuppressWarnings("unchecked") static <T extends Throwable>
667 void uncheckedThrow(Throwable t) throws T {
668 if (t != null)
669 throw (T)t; // rely on vacuous cast
670 else
671 throw new Error("Unknown Exception");
672 }
673
674 /**
675 * Throws exception, if any, associated with the given status.
676 */
677 private void reportException(int s) {
678 rethrow((s & THROWN) != 0 ? getThrowableException() :
679 new CancellationException());
680 }
681
682 // public methods
683
684 /**
685 * Arranges to asynchronously execute this task in the pool the
686 * current task is running in, if applicable, or using the {@link
687 * ForkJoinPool#commonPool()} if not {@link #inForkJoinPool}. While
688 * it is not necessarily enforced, it is a usage error to fork a
689 * task more than once unless it has completed and been
690 * reinitialized. Subsequent modifications to the state of this
691 * task or any data it operates on are not necessarily
692 * consistently observable by any thread other than the one
693 * executing it unless preceded by a call to {@link #join} or
694 * related methods, or a call to {@link #isDone} returning {@code
695 * true}.
696 *
697 * @return {@code this}, to simplify usage
698 */
699 public final ForkJoinTask<V> fork() {
701 if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
702 ((ForkJoinWorkerThread)t).workQueue.push(this);
703 else
704 ForkJoinPool.common.externalPush(this);
705 return this;
706 }
707
708 /**
709 * Returns the result of the computation when it
710 * {@linkplain #isDone is done}.
711 * This method differs from {@link #get()} in that abnormal
712 * completion results in {@code RuntimeException} or {@code Error},
713 * not {@code ExecutionException}, and that interrupts of the
714 * calling thread do <em>not</em> cause the method to abruptly
715 * return by throwing {@code InterruptedException}.
716 *
717 * @return the computed result
718 */
719 public final V join() {
720 int s;
721 if (((s = doJoin()) & ABNORMAL) != 0)
722 reportException(s);
723 return getRawResult();
724 }
725
726 /**
727 * Commences performing this task, awaits its completion if
728 * necessary, and returns its result, or throws an (unchecked)
729 * {@code RuntimeException} or {@code Error} if the underlying
730 * computation did so.
731 *
732 * @return the computed result
733 */
734 public final V invoke() {
735 int s;
736 if (((s = doInvoke()) & ABNORMAL) != 0)
737 reportException(s);
738 return getRawResult();
739 }
740
741 /**
742 * Forks the given tasks, returning when {@code isDone} holds for
743 * each task or an (unchecked) exception is encountered, in which
744 * case the exception is rethrown. If more than one task
745 * encounters an exception, then this method throws any one of
746 * these exceptions. If any task encounters an exception, the
747 * other may be cancelled. However, the execution status of
748 * individual tasks is not guaranteed upon exceptional return. The
749 * status of each task may be obtained using {@link
750 * #getException()} and related methods to check if they have been
751 * cancelled, completed normally or exceptionally, or left
752 * unprocessed.
753 *
754 * @param t1 the first task
755 * @param t2 the second task
756 * @throws NullPointerException if any task is null
757 */
758 public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
759 int s1, s2;
760 t2.fork();
761 if (((s1 = t1.doInvoke()) & ABNORMAL) != 0)
762 t1.reportException(s1);
763 if (((s2 = t2.doJoin()) & ABNORMAL) != 0)
764 t2.reportException(s2);
765 }
766
767 /**
768 * Forks the given tasks, returning when {@code isDone} holds for
769 * each task or an (unchecked) exception is encountered, in which
770 * case the exception is rethrown. If more than one task
771 * encounters an exception, then this method throws any one of
772 * these exceptions. If any task encounters an exception, others
773 * may be cancelled. However, the execution status of individual
774 * tasks is not guaranteed upon exceptional return. The status of
775 * each task may be obtained using {@link #getException()} and
776 * related methods to check if they have been cancelled, completed
777 * normally or exceptionally, or left unprocessed.
778 *
779 * @param tasks the tasks
780 * @throws NullPointerException if any task is null
781 */
782 public static void invokeAll(ForkJoinTask<?>... tasks) {
783 Throwable ex = null;
784 int last = tasks.length - 1;
785 for (int i = last; i >= 0; --i) {
786 ForkJoinTask<?> t = tasks[i];
787 if (t == null) {
788 if (ex == null)
789 ex = new NullPointerException();
790 }
791 else if (i != 0)
792 t.fork();
793 else if ((t.doInvoke() & ABNORMAL) != 0 && ex == null)
794 ex = t.getException();
795 }
796 for (int i = 1; i <= last; ++i) {
797 ForkJoinTask<?> t = tasks[i];
798 if (t != null) {
799 if (ex != null)
800 t.cancel(false);
801 else if ((t.doJoin() & ABNORMAL) != 0)
802 ex = t.getException();
803 }
804 }
805 if (ex != null)
806 rethrow(ex);
807 }
808
809 /**
810 * Forks all tasks in the specified collection, returning when
811 * {@code isDone} holds for each task or an (unchecked) exception
812 * is encountered, in which case the exception is rethrown. If
813 * more than one task encounters an exception, then this method
814 * throws any one of these exceptions. If any task encounters an
815 * exception, others may be cancelled. However, the execution
816 * status of individual tasks is not guaranteed upon exceptional
817 * return. The status of each task may be obtained using {@link
818 * #getException()} and related methods to check if they have been
819 * cancelled, completed normally or exceptionally, or left
820 * unprocessed.
821 *
825 * @throws NullPointerException if tasks or any element are null
826 */
827 public static <T extends ForkJoinTask<?>> Collection<T> invokeAll(Collection<T> tasks) {
828 if (!(tasks instanceof RandomAccess) || !(tasks instanceof List<?>)) {
829 invokeAll(tasks.toArray(new ForkJoinTask<?>[tasks.size()]));
830 return tasks;
831 }
832 @SuppressWarnings("unchecked")
833 List<? extends ForkJoinTask<?>> ts =
834 (List<? extends ForkJoinTask<?>>) tasks;
835 Throwable ex = null;
836 int last = ts.size() - 1;
837 for (int i = last; i >= 0; --i) {
838 ForkJoinTask<?> t = ts.get(i);
839 if (t == null) {
840 if (ex == null)
841 ex = new NullPointerException();
842 }
843 else if (i != 0)
844 t.fork();
845 else if ((t.doInvoke() & ABNORMAL) != 0 && ex == null)
846 ex = t.getException();
847 }
848 for (int i = 1; i <= last; ++i) {
849 ForkJoinTask<?> t = ts.get(i);
850 if (t != null) {
851 if (ex != null)
852 t.cancel(false);
853 else if ((t.doJoin() & ABNORMAL) != 0)
854 ex = t.getException();
855 }
856 }
857 if (ex != null)
858 rethrow(ex);
859 return tasks;
860 }
861
862 /**
863 * Attempts to cancel execution of this task. This attempt will
864 * fail if the task has already completed or could not be
865 * cancelled for some other reason. If successful, and this task
866 * has not started when {@code cancel} is called, execution of
867 * this task is suppressed. After this method returns
868 * successfully, unless there is an intervening call to {@link
869 * #reinitialize}, subsequent calls to {@link #isCancelled},
870 * {@link #isDone}, and {@code cancel} will return {@code true}
871 * and calls to {@link #join} and related methods will result in
872 * {@code CancellationException}.
873 *
874 * <p>This method may be overridden in subclasses, but if so, must
875 * still ensure that these properties hold. In particular, the
876 * {@code cancel} method itself must not throw exceptions.
877 *
878 * <p>This method is designed to be invoked by <em>other</em>
879 * tasks. To terminate the current task, you can just return or
880 * throw an unchecked exception from its computation method, or
881 * invoke {@link #completeExceptionally(Throwable)}.
882 *
883 * @param mayInterruptIfRunning this value has no effect in the
884 * default implementation because interrupts are not used to
885 * control cancellation.
886 *
887 * @return {@code true} if this task is now cancelled
888 */
889 public boolean cancel(boolean mayInterruptIfRunning) {
890 int s = abnormalCompletion(DONE | ABNORMAL);
891 return (s & (ABNORMAL | THROWN)) == ABNORMAL;
892 }
893
894 public final boolean isDone() {
895 return status < 0;
896 }
897
898 public final boolean isCancelled() {
899 return (status & (ABNORMAL | THROWN)) == ABNORMAL;
900 }
901
902 /**
903 * Returns {@code true} if this task threw an exception or was cancelled.
904 *
905 * @return {@code true} if this task threw an exception or was cancelled
906 */
907 public final boolean isCompletedAbnormally() {
908 return (status & ABNORMAL) != 0;
909 }
910
911 /**
912 * Returns {@code true} if this task completed without throwing an
913 * exception and was not cancelled.
914 *
915 * @return {@code true} if this task completed without throwing an
916 * exception and was not cancelled
917 */
918 public final boolean isCompletedNormally() {
919 return (status & (DONE | ABNORMAL)) == DONE;
920 }
921
922 /**
923 * Returns the exception thrown by the base computation, or a
924 * {@code CancellationException} if cancelled, or {@code null} if
925 * none or if the method has not yet completed.
926 *
927 * @return the exception, or {@code null} if none
928 */
929 public final Throwable getException() {
930 int s = status;
931 return ((s & ABNORMAL) == 0 ? null :
932 (s & THROWN) == 0 ? new CancellationException() :
933 getThrowableException());
934 }
935
936 /**
937 * Completes this task abnormally, and if not already aborted or
938 * cancelled, causes it to throw the given exception upon
939 * {@code join} and related operations. This method may be used
940 * to induce exceptions in asynchronous tasks, or to force
941 * completion of tasks that would not otherwise complete. Its use
942 * in other situations is discouraged. This method is
943 * overridable, but overridden versions must invoke {@code super}
944 * implementation to maintain guarantees.
945 *
946 * @param ex the exception to throw. If this exception is not a
947 * {@code RuntimeException} or {@code Error}, the actual exception
948 * thrown will be a {@code RuntimeException} with cause {@code ex}.
949 */
950 public void completeExceptionally(Throwable ex) {
951 setExceptionalCompletion((ex instanceof RuntimeException) ||
952 (ex instanceof Error) ? ex :
956 /**
957 * Completes this task, and if not already aborted or cancelled,
958 * returning the given value as the result of subsequent
959 * invocations of {@code join} and related operations. This method
960 * may be used to provide results for asynchronous tasks, or to
961 * provide alternative handling for tasks that would not otherwise
962 * complete normally. Its use in other situations is
963 * discouraged. This method is overridable, but overridden
964 * versions must invoke {@code super} implementation to maintain
965 * guarantees.
966 *
967 * @param value the result value for this task
968 */
969 public void complete(V value) {
970 try {
971 setRawResult(value);
972 } catch (Throwable rex) {
973 setExceptionalCompletion(rex);
974 return;
975 }
976 setDone();
977 }
978
979 /**
980 * Completes this task normally without setting a value. The most
981 * recent value established by {@link #setRawResult} (or {@code
982 * null} by default) will be returned as the result of subsequent
983 * invocations of {@code join} and related operations.
984 *
985 * @since 1.8
986 */
987 public final void quietlyComplete() {
988 setDone();
989 }
990
991 /**
992 * Waits if necessary for the computation to complete, and then
993 * retrieves its result.
994 *
995 * @return the computed result
996 * @throws CancellationException if the computation was cancelled
997 * @throws ExecutionException if the computation threw an
998 * exception
999 * @throws InterruptedException if the current thread is not a
1000 * member of a ForkJoinPool and was interrupted while waiting
1001 */
1002 public final V get() throws InterruptedException, ExecutionException {
1003 int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
1004 doJoin() : externalInterruptibleAwaitDone();
1005 if ((s & THROWN) != 0)
1006 throw new ExecutionException(getThrowableException());
1007 else if ((s & ABNORMAL) != 0)
1008 throw new CancellationException();
1009 else
1010 return getRawResult();
1011 }
1012
1013 /**
1014 * Waits if necessary for at most the given time for the computation
1015 * to complete, and then retrieves its result, if available.
1016 *
1017 * @param timeout the maximum time to wait
1018 * @param unit the time unit of the timeout argument
1019 * @return the computed result
1020 * @throws CancellationException if the computation was cancelled
1021 * @throws ExecutionException if the computation threw an
1022 * exception
1023 * @throws InterruptedException if the current thread is not a
1024 * member of a ForkJoinPool and was interrupted while waiting
1025 * @throws TimeoutException if the wait timed out
1026 */
1027 public final V get(long timeout, TimeUnit unit)
1028 throws InterruptedException, ExecutionException, TimeoutException {
1029 int s;
1030 long nanos = unit.toNanos(timeout);
1031 if (Thread.interrupted())
1032 throw new InterruptedException();
1033 if ((s = status) >= 0 && nanos > 0L) {
1034 long d = System.nanoTime() + nanos;
1035 long deadline = (d == 0L) ? 1L : d; // avoid 0
1036 Thread t = Thread.currentThread();
1037 if (t instanceof ForkJoinWorkerThread) {
1038 ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
1039 s = wt.pool.awaitJoin(wt.workQueue, this, deadline);
1040 }
1041 else if ((s = ((this instanceof CountedCompleter) ?
1042 ForkJoinPool.common.externalHelpComplete(
1043 (CountedCompleter<?>)this, 0) :
1044 ForkJoinPool.common.tryExternalUnpush(this) ?
1045 doExec() : 0)) >= 0) {
1046 long ns, ms; // measure in nanosecs, but wait in millisecs
1047 while ((s = status) >= 0 &&
1048 (ns = deadline - System.nanoTime()) > 0L) {
1049 if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) > 0L &&
1050 (s = (int)STATUS.getAndBitwiseOr(this, SIGNAL)) >= 0) {
1051 synchronized (this) {
1052 if (status >= 0)
1053 wait(ms); // OK to throw InterruptedException
1054 else
1055 notifyAll();
1056 }
1057 }
1058 }
1059 }
1060 }
1061 if (s >= 0)
1062 throw new TimeoutException();
1063 else if ((s & THROWN) != 0)
1064 throw new ExecutionException(getThrowableException());
1065 else if ((s & ABNORMAL) != 0)
1066 throw new CancellationException();
1067 else
1068 return getRawResult();
1069 }
1070
1071 /**
1072 * Joins this task, without returning its result or throwing its
1073 * exception. This method may be useful when processing
1074 * collections of tasks when some have been cancelled or otherwise
1075 * known to have aborted.
1076 */
1077 public final void quietlyJoin() {
1078 doJoin();
1079 }
1080
1081 /**
1082 * Commences performing this task and awaits its completion if
1083 * necessary, without returning its result or throwing its
1084 * exception.
1085 */
1086 public final void quietlyInvoke() {
1087 doInvoke();
1104 ForkJoinPool.quiesceCommonPool();
1105 }
1106
1107 /**
1108 * Resets the internal bookkeeping state of this task, allowing a
1109 * subsequent {@code fork}. This method allows repeated reuse of
1110 * this task, but only if reuse occurs when this task has either
1111 * never been forked, or has been forked, then completed and all
1112 * outstanding joins of this task have also completed. Effects
1113 * under any other usage conditions are not guaranteed.
1114 * This method may be useful when executing
1115 * pre-constructed trees of subtasks in loops.
1116 *
1117 * <p>Upon completion of this method, {@code isDone()} reports
1118 * {@code false}, and {@code getException()} reports {@code
1119 * null}. However, the value returned by {@code getRawResult} is
1120 * unaffected. To clear this value, you can invoke {@code
1121 * setRawResult(null)}.
1122 */
1123 public void reinitialize() {
1124 if ((status & THROWN) != 0)
1125 clearExceptionalCompletion();
1126 else
1127 status = 0;
1128 }
1129
1130 /**
1131 * Returns the pool hosting the current thread, or {@code null}
1132 * if the current thread is executing outside of any ForkJoinPool.
1133 *
1134 * <p>This method returns {@code null} if and only if {@link
1135 * #inForkJoinPool} returns {@code false}.
1136 *
1137 * @return the pool, or {@code null} if none
1138 */
1139 public static ForkJoinPool getPool() {
1140 Thread t = Thread.currentThread();
1141 return (t instanceof ForkJoinWorkerThread) ?
1142 ((ForkJoinWorkerThread) t).pool : null;
1143 }
1144
1321
1322 /**
1323 * Returns the tag for this task.
1324 *
1325 * @return the tag for this task
1326 * @since 1.8
1327 */
1328 public final short getForkJoinTaskTag() {
1329 return (short)status;
1330 }
1331
1332 /**
1333 * Atomically sets the tag value for this task and returns the old value.
1334 *
1335 * @param newValue the new tag value
1336 * @return the previous value of the tag
1337 * @since 1.8
1338 */
1339 public final short setForkJoinTaskTag(short newValue) {
1340 for (int s;;) {
1341 if (STATUS.weakCompareAndSet(this, s = status,
1342 (s & ~SMASK) | (newValue & SMASK)))
1343 return (short)s;
1344 }
1345 }
1346
1347 /**
1348 * Atomically conditionally sets the tag value for this task.
1349 * Among other applications, tags can be used as visit markers
1350 * in tasks operating on graphs, as in methods that check: {@code
1351 * if (task.compareAndSetForkJoinTaskTag((short)0, (short)1))}
1352 * before processing, otherwise exiting because the node has
1353 * already been visited.
1354 *
1355 * @param expect the expected tag value
1356 * @param update the new tag value
1357 * @return {@code true} if successful; i.e., the current value was
1358 * equal to {@code expect} and was changed to {@code update}.
1359 * @since 1.8
1360 */
1361 public final boolean compareAndSetForkJoinTaskTag(short expect, short update) {
1362 for (int s;;) {
1363 if ((short)(s = status) != expect)
1364 return false;
1365 if (STATUS.weakCompareAndSet(this, s,
1366 (s & ~SMASK) | (update & SMASK)))
1367 return true;
1368 }
1369 }
1370
1371 /**
1372 * Adapter for Runnables. This implements RunnableFuture
1373 * to be compliant with AbstractExecutorService constraints
1374 * when used in ForkJoinPool.
1375 */
1376 static final class AdaptedRunnable<T> extends ForkJoinTask<T>
1377 implements RunnableFuture<T> {
1378 final Runnable runnable;
1379 T result;
1380 AdaptedRunnable(Runnable runnable, T result) {
1381 if (runnable == null) throw new NullPointerException();
1382 this.runnable = runnable;
1383 this.result = result; // OK to set this even before completion
1384 }
1385 public final T getRawResult() { return result; }
|