447 expungeStaleExceptions();
448 ExceptionNode[] t = exceptionTable;
449 int i = h & (t.length - 1);
450 for (ExceptionNode e = t[i]; ; e = e.next) {
451 if (e == null) {
452 t[i] = new ExceptionNode(this, ex, t[i]);
453 break;
454 }
455 if (e.get() == this) // already present
456 break;
457 }
458 } finally {
459 lock.unlock();
460 }
461 s = setCompletion(EXCEPTIONAL);
462 }
463 return s;
464 }
465
466 /**
467 * Records exception and possibly propagates
468 *
469 * @return status on exit
470 */
471 private int setExceptionalCompletion(Throwable ex) {
472 int s = recordExceptionalCompletion(ex);
473 if ((s & DONE_MASK) == EXCEPTIONAL)
474 internalPropagateException(ex);
475 return s;
476 }
477
478 /**
479 * Hook for exception propagation support for tasks with completers.
480 */
481 void internalPropagateException(Throwable ex) {
482 }
483
484 /**
485 * Cancels, ignoring any exceptions thrown by cancel. Used during
486 * worker and pool shutdown. Cancel is spec'ed not to throw any
487 * exceptions, but if it does anyway, we have no recourse during
488 * shutdown, so guard against this case.
489 */
490 static final void cancelIgnoringExceptions(ForkJoinTask<?> t) {
491 if (t != null && t.status >= 0) {
492 try {
493 t.cancel(false);
494 } catch (Throwable ignore) {
495 }
496 }
497 }
498
499 /**
500 * Removes exception node and clears status
501 */
502 private void clearExceptionalCompletion() {
503 int h = System.identityHashCode(this);
504 final ReentrantLock lock = exceptionTableLock;
505 lock.lock();
506 try {
507 ExceptionNode[] t = exceptionTable;
508 int i = h & (t.length - 1);
509 ExceptionNode e = t[i];
510 ExceptionNode pred = null;
511 while (e != null) {
512 ExceptionNode next = e.next;
513 if (e.get() == this) {
514 if (pred == null)
515 t[i] = next;
516 else
517 pred.next = next;
518 break;
519 }
520 pred = e;
618 static final void helpExpungeStaleExceptions() {
619 final ReentrantLock lock = exceptionTableLock;
620 if (lock.tryLock()) {
621 try {
622 expungeStaleExceptions();
623 } finally {
624 lock.unlock();
625 }
626 }
627 }
628
629 /**
630 * A version of "sneaky throw" to relay exceptions
631 */
632 static void rethrow(final Throwable ex) {
633 if (ex != null) {
634 if (ex instanceof Error)
635 throw (Error)ex;
636 if (ex instanceof RuntimeException)
637 throw (RuntimeException)ex;
638 throw uncheckedThrowable(ex, RuntimeException.class);
639 }
640 }
641
642 /**
643 * The sneaky part of sneaky throw, relying on generics
644 * limitations to evade compiler complaints about rethrowing
645 * unchecked exceptions
646 */
647 @SuppressWarnings("unchecked") static <T extends Throwable>
648 T uncheckedThrowable(final Throwable t, final Class<T> c) {
649 return (T)t; // rely on vacuous cast
650 }
651
652 /**
653 * Throws exception, if any, associated with the given status.
654 */
655 private void reportException(int s) {
656 if (s == CANCELLED)
657 throw new CancellationException();
658 if (s == EXCEPTIONAL)
659 rethrow(getThrowableException());
660 }
661
662 // public methods
663
664 /**
665 * Arranges to asynchronously execute this task in the pool the
666 * current task is running in, if applicable, or using the {@link
667 * ForkJoinPool#commonPool()} if not {@link #inForkJoinPool}. While
668 * it is not necessarily enforced, it is a usage error to fork a
669 * task more than once unless it has completed and been
670 * reinitialized. Subsequent modifications to the state of this
671 * task or any data it operates on are not necessarily
672 * consistently observable by any thread other than the one
673 * executing it unless preceded by a call to {@link #join} or
674 * related methods, or a call to {@link #isDone} returning {@code
675 * true}.
676 *
677 * @return {@code this}, to simplify usage
678 */
679 public final ForkJoinTask<V> fork() {
680 Thread t;
681 if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
682 ((ForkJoinWorkerThread)t).workQueue.push(this);
683 else
684 ForkJoinPool.commonPool.externalPush(this);
685 return this;
686 }
687
688 /**
689 * Returns the result of the computation when it {@link #isDone is
690 * done}. This method differs from {@link #get()} in that
691 * abnormal completion results in {@code RuntimeException} or
692 * {@code Error}, not {@code ExecutionException}, and that
693 * interrupts of the calling thread do <em>not</em> cause the
694 * method to abruptly return by throwing {@code
695 * InterruptedException}.
696 *
697 * @return the computed result
698 */
699 public final V join() {
700 int s;
701 if ((s = doJoin() & DONE_MASK) != NORMAL)
702 reportException(s);
703 return getRawResult();
704 }
840
841 /**
842 * Attempts to cancel execution of this task. This attempt will
843 * fail if the task has already completed or could not be
844 * cancelled for some other reason. If successful, and this task
845 * has not started when {@code cancel} is called, execution of
846 * this task is suppressed. After this method returns
847 * successfully, unless there is an intervening call to {@link
848 * #reinitialize}, subsequent calls to {@link #isCancelled},
849 * {@link #isDone}, and {@code cancel} will return {@code true}
850 * and calls to {@link #join} and related methods will result in
851 * {@code CancellationException}.
852 *
853 * <p>This method may be overridden in subclasses, but if so, must
854 * still ensure that these properties hold. In particular, the
855 * {@code cancel} method itself must not throw exceptions.
856 *
857 * <p>This method is designed to be invoked by <em>other</em>
858 * tasks. To terminate the current task, you can just return or
859 * throw an unchecked exception from its computation method, or
860 * invoke {@link #completeExceptionally}.
861 *
862 * @param mayInterruptIfRunning this value has no effect in the
863 * default implementation because interrupts are not used to
864 * control cancellation.
865 *
866 * @return {@code true} if this task is now cancelled
867 */
868 public boolean cancel(boolean mayInterruptIfRunning) {
869 return (setCompletion(CANCELLED) & DONE_MASK) == CANCELLED;
870 }
871
872 public final boolean isDone() {
873 return status < 0;
874 }
875
876 public final boolean isCancelled() {
877 return (status & DONE_MASK) == CANCELLED;
878 }
879
880 /**
990
991 /**
992 * Waits if necessary for at most the given time for the computation
993 * to complete, and then retrieves its result, if available.
994 *
995 * @param timeout the maximum time to wait
996 * @param unit the time unit of the timeout argument
997 * @return the computed result
998 * @throws CancellationException if the computation was cancelled
999 * @throws ExecutionException if the computation threw an
1000 * exception
1001 * @throws InterruptedException if the current thread is not a
1002 * member of a ForkJoinPool and was interrupted while waiting
1003 * @throws TimeoutException if the wait timed out
1004 */
1005 public final V get(long timeout, TimeUnit unit)
1006 throws InterruptedException, ExecutionException, TimeoutException {
1007 if (Thread.interrupted())
1008 throw new InterruptedException();
1009 // Messy in part because we measure in nanosecs, but wait in millisecs
1010 int s; long ns, ms;
1011 if ((s = status) >= 0 && (ns = unit.toNanos(timeout)) > 0L) {
1012 long deadline = System.nanoTime() + ns;
1013 ForkJoinPool p = null;
1014 ForkJoinPool.WorkQueue w = null;
1015 Thread t = Thread.currentThread();
1016 if (t instanceof ForkJoinWorkerThread) {
1017 ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
1018 p = wt.pool;
1019 w = wt.workQueue;
1020 p.helpJoinOnce(w, this); // no retries on failure
1021 }
1022 else
1023 ForkJoinPool.externalHelpJoin(this);
1024 boolean canBlock = false;
1025 boolean interrupted = false;
1026 try {
1027 while ((s = status) >= 0) {
1028 if (w != null && w.qlock < 0)
1029 cancelIgnoringExceptions(this);
1030 else if (!canBlock) {
1031 if (p == null || p.tryCompensate())
1087 * exception.
1088 */
1089 public final void quietlyInvoke() {
1090 doInvoke();
1091 }
1092
1093 /**
1094 * Possibly executes tasks until the pool hosting the current task
1095 * {@link ForkJoinPool#isQuiescent is quiescent}. This method may
1096 * be of use in designs in which many tasks are forked, but none
1097 * are explicitly joined, instead executing them until all are
1098 * processed.
1099 */
1100 public static void helpQuiesce() {
1101 Thread t;
1102 if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
1103 ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
1104 wt.pool.helpQuiescePool(wt.workQueue);
1105 }
1106 else
1107 ForkJoinPool.externalHelpQuiescePool();
1108 }
1109
1110 /**
1111 * Resets the internal bookkeeping state of this task, allowing a
1112 * subsequent {@code fork}. This method allows repeated reuse of
1113 * this task, but only if reuse occurs when this task has either
1114 * never been forked, or has been forked, then completed and all
1115 * outstanding joins of this task have also completed. Effects
1116 * under any other usage conditions are not guaranteed.
1117 * This method may be useful when executing
1118 * pre-constructed trees of subtasks in loops.
1119 *
1120 * <p>Upon completion of this method, {@code isDone()} reports
1121 * {@code false}, and {@code getException()} reports {@code
1122 * null}. However, the value returned by {@code getRawResult} is
1123 * unaffected. To clear this value, you can invoke {@code
1124 * setRawResult(null)}.
1125 */
1126 public void reinitialize() {
1127 if ((status & DONE_MASK) == EXCEPTIONAL)
1372 public final void run() { invoke(); }
1373 private static final long serialVersionUID = 5232453952276885070L;
1374 }
1375
1376 /**
1377 * Adaptor for Runnables without results
1378 */
1379 static final class AdaptedRunnableAction extends ForkJoinTask<Void>
1380 implements RunnableFuture<Void> {
1381 final Runnable runnable;
1382 AdaptedRunnableAction(Runnable runnable) {
1383 if (runnable == null) throw new NullPointerException();
1384 this.runnable = runnable;
1385 }
1386 public final Void getRawResult() { return null; }
1387 public final void setRawResult(Void v) { }
1388 public final boolean exec() { runnable.run(); return true; }
1389 public final void run() { invoke(); }
1390 private static final long serialVersionUID = 5232453952276885070L;
1391 }
1392
1393 /**
1394 * Adaptor for Callables
1395 */
1396 static final class AdaptedCallable<T> extends ForkJoinTask<T>
1397 implements RunnableFuture<T> {
1398 final Callable<? extends T> callable;
1399 T result;
1400 AdaptedCallable(Callable<? extends T> callable) {
1401 if (callable == null) throw new NullPointerException();
1402 this.callable = callable;
1403 }
1404 public final T getRawResult() { return result; }
1405 public final void setRawResult(T v) { result = v; }
1406 public final boolean exec() {
1407 try {
1408 result = callable.call();
1409 return true;
1410 } catch (Error err) {
1411 throw err;
|
447 expungeStaleExceptions();
448 ExceptionNode[] t = exceptionTable;
449 int i = h & (t.length - 1);
450 for (ExceptionNode e = t[i]; ; e = e.next) {
451 if (e == null) {
452 t[i] = new ExceptionNode(this, ex, t[i]);
453 break;
454 }
455 if (e.get() == this) // already present
456 break;
457 }
458 } finally {
459 lock.unlock();
460 }
461 s = setCompletion(EXCEPTIONAL);
462 }
463 return s;
464 }
465
466 /**
467 * Records exception and possibly propagates.
468 *
469 * @return status on exit
470 */
471 private int setExceptionalCompletion(Throwable ex) {
472 int s = recordExceptionalCompletion(ex);
473 if ((s & DONE_MASK) == EXCEPTIONAL)
474 internalPropagateException(ex);
475 return s;
476 }
477
478 /**
479 * Hook for exception propagation support for tasks with completers.
480 */
481 void internalPropagateException(Throwable ex) {
482 }
483
484 /**
485 * Cancels, ignoring any exceptions thrown by cancel. Used during
486 * worker and pool shutdown. Cancel is spec'ed not to throw any
487 * exceptions, but if it does anyway, we have no recourse during
488 * shutdown, so guard against this case.
489 */
490 static final void cancelIgnoringExceptions(ForkJoinTask<?> t) {
491 if (t != null && t.status >= 0) {
492 try {
493 t.cancel(false);
494 } catch (Throwable ignore) {
495 }
496 }
497 }
498
499 /**
500 * Removes exception node and clears status.
501 */
502 private void clearExceptionalCompletion() {
503 int h = System.identityHashCode(this);
504 final ReentrantLock lock = exceptionTableLock;
505 lock.lock();
506 try {
507 ExceptionNode[] t = exceptionTable;
508 int i = h & (t.length - 1);
509 ExceptionNode e = t[i];
510 ExceptionNode pred = null;
511 while (e != null) {
512 ExceptionNode next = e.next;
513 if (e.get() == this) {
514 if (pred == null)
515 t[i] = next;
516 else
517 pred.next = next;
518 break;
519 }
520 pred = e;
618 static final void helpExpungeStaleExceptions() {
619 final ReentrantLock lock = exceptionTableLock;
620 if (lock.tryLock()) {
621 try {
622 expungeStaleExceptions();
623 } finally {
624 lock.unlock();
625 }
626 }
627 }
628
629 /**
630 * A version of "sneaky throw" to relay exceptions
631 */
632 static void rethrow(final Throwable ex) {
633 if (ex != null) {
634 if (ex instanceof Error)
635 throw (Error)ex;
636 if (ex instanceof RuntimeException)
637 throw (RuntimeException)ex;
638 ForkJoinTask.<RuntimeException>uncheckedThrow(ex);
639 }
640 }
641
642 /**
643 * The sneaky part of sneaky throw, relying on generics
644 * limitations to evade compiler complaints about rethrowing
645 * unchecked exceptions
646 */
647 @SuppressWarnings("unchecked") static <T extends Throwable>
648 void uncheckedThrow(Throwable t) throws T {
649 if (t != null)
650 throw (T)t; // rely on vacuous cast
651 }
652
653 /**
654 * Throws exception, if any, associated with the given status.
655 */
656 private void reportException(int s) {
657 if (s == CANCELLED)
658 throw new CancellationException();
659 if (s == EXCEPTIONAL)
660 rethrow(getThrowableException());
661 }
662
663 // public methods
664
665 /**
666 * Arranges to asynchronously execute this task in the pool the
667 * current task is running in, if applicable, or using the {@link
668 * ForkJoinPool#commonPool()} if not {@link #inForkJoinPool}. While
669 * it is not necessarily enforced, it is a usage error to fork a
670 * task more than once unless it has completed and been
671 * reinitialized. Subsequent modifications to the state of this
672 * task or any data it operates on are not necessarily
673 * consistently observable by any thread other than the one
674 * executing it unless preceded by a call to {@link #join} or
675 * related methods, or a call to {@link #isDone} returning {@code
676 * true}.
677 *
678 * @return {@code this}, to simplify usage
679 */
680 public final ForkJoinTask<V> fork() {
681 Thread t;
682 if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
683 ((ForkJoinWorkerThread)t).workQueue.push(this);
684 else
685 ForkJoinPool.common.externalPush(this);
686 return this;
687 }
688
689 /**
690 * Returns the result of the computation when it {@link #isDone is
691 * done}. This method differs from {@link #get()} in that
692 * abnormal completion results in {@code RuntimeException} or
693 * {@code Error}, not {@code ExecutionException}, and that
694 * interrupts of the calling thread do <em>not</em> cause the
695 * method to abruptly return by throwing {@code
696 * InterruptedException}.
697 *
698 * @return the computed result
699 */
700 public final V join() {
701 int s;
702 if ((s = doJoin() & DONE_MASK) != NORMAL)
703 reportException(s);
704 return getRawResult();
705 }
841
842 /**
843 * Attempts to cancel execution of this task. This attempt will
844 * fail if the task has already completed or could not be
845 * cancelled for some other reason. If successful, and this task
846 * has not started when {@code cancel} is called, execution of
847 * this task is suppressed. After this method returns
848 * successfully, unless there is an intervening call to {@link
849 * #reinitialize}, subsequent calls to {@link #isCancelled},
850 * {@link #isDone}, and {@code cancel} will return {@code true}
851 * and calls to {@link #join} and related methods will result in
852 * {@code CancellationException}.
853 *
854 * <p>This method may be overridden in subclasses, but if so, must
855 * still ensure that these properties hold. In particular, the
856 * {@code cancel} method itself must not throw exceptions.
857 *
858 * <p>This method is designed to be invoked by <em>other</em>
859 * tasks. To terminate the current task, you can just return or
860 * throw an unchecked exception from its computation method, or
861 * invoke {@link #completeExceptionally(Throwable)}.
862 *
863 * @param mayInterruptIfRunning this value has no effect in the
864 * default implementation because interrupts are not used to
865 * control cancellation.
866 *
867 * @return {@code true} if this task is now cancelled
868 */
869 public boolean cancel(boolean mayInterruptIfRunning) {
870 return (setCompletion(CANCELLED) & DONE_MASK) == CANCELLED;
871 }
872
873 public final boolean isDone() {
874 return status < 0;
875 }
876
877 public final boolean isCancelled() {
878 return (status & DONE_MASK) == CANCELLED;
879 }
880
881 /**
991
992 /**
993 * Waits if necessary for at most the given time for the computation
994 * to complete, and then retrieves its result, if available.
995 *
996 * @param timeout the maximum time to wait
997 * @param unit the time unit of the timeout argument
998 * @return the computed result
999 * @throws CancellationException if the computation was cancelled
1000 * @throws ExecutionException if the computation threw an
1001 * exception
1002 * @throws InterruptedException if the current thread is not a
1003 * member of a ForkJoinPool and was interrupted while waiting
1004 * @throws TimeoutException if the wait timed out
1005 */
1006 public final V get(long timeout, TimeUnit unit)
1007 throws InterruptedException, ExecutionException, TimeoutException {
1008 if (Thread.interrupted())
1009 throw new InterruptedException();
1010 // Messy in part because we measure in nanosecs, but wait in millisecs
1011 int s; long ms;
1012 long ns = unit.toNanos(timeout);
1013 if ((s = status) >= 0 && ns > 0L) {
1014 long deadline = System.nanoTime() + ns;
1015 ForkJoinPool p = null;
1016 ForkJoinPool.WorkQueue w = null;
1017 Thread t = Thread.currentThread();
1018 if (t instanceof ForkJoinWorkerThread) {
1019 ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
1020 p = wt.pool;
1021 w = wt.workQueue;
1022 p.helpJoinOnce(w, this); // no retries on failure
1023 }
1024 else
1025 ForkJoinPool.externalHelpJoin(this);
1026 boolean canBlock = false;
1027 boolean interrupted = false;
1028 try {
1029 while ((s = status) >= 0) {
1030 if (w != null && w.qlock < 0)
1031 cancelIgnoringExceptions(this);
1032 else if (!canBlock) {
1033 if (p == null || p.tryCompensate())
1089 * exception.
1090 */
1091 public final void quietlyInvoke() {
1092 doInvoke();
1093 }
1094
1095 /**
1096 * Possibly executes tasks until the pool hosting the current task
1097 * {@link ForkJoinPool#isQuiescent is quiescent}. This method may
1098 * be of use in designs in which many tasks are forked, but none
1099 * are explicitly joined, instead executing them until all are
1100 * processed.
1101 */
1102 public static void helpQuiesce() {
1103 Thread t;
1104 if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
1105 ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
1106 wt.pool.helpQuiescePool(wt.workQueue);
1107 }
1108 else
1109 ForkJoinPool.quiesceCommonPool();
1110 }
1111
1112 /**
1113 * Resets the internal bookkeeping state of this task, allowing a
1114 * subsequent {@code fork}. This method allows repeated reuse of
1115 * this task, but only if reuse occurs when this task has either
1116 * never been forked, or has been forked, then completed and all
1117 * outstanding joins of this task have also completed. Effects
1118 * under any other usage conditions are not guaranteed.
1119 * This method may be useful when executing
1120 * pre-constructed trees of subtasks in loops.
1121 *
1122 * <p>Upon completion of this method, {@code isDone()} reports
1123 * {@code false}, and {@code getException()} reports {@code
1124 * null}. However, the value returned by {@code getRawResult} is
1125 * unaffected. To clear this value, you can invoke {@code
1126 * setRawResult(null)}.
1127 */
1128 public void reinitialize() {
1129 if ((status & DONE_MASK) == EXCEPTIONAL)
1374 public final void run() { invoke(); }
1375 private static final long serialVersionUID = 5232453952276885070L;
1376 }
1377
1378 /**
1379 * Adaptor for Runnables without results
1380 */
1381 static final class AdaptedRunnableAction extends ForkJoinTask<Void>
1382 implements RunnableFuture<Void> {
1383 final Runnable runnable;
1384 AdaptedRunnableAction(Runnable runnable) {
1385 if (runnable == null) throw new NullPointerException();
1386 this.runnable = runnable;
1387 }
1388 public final Void getRawResult() { return null; }
1389 public final void setRawResult(Void v) { }
1390 public final boolean exec() { runnable.run(); return true; }
1391 public final void run() { invoke(); }
1392 private static final long serialVersionUID = 5232453952276885070L;
1393 }
1394
1395 /**
1396 * Adaptor for Runnables in which failure forces worker exception
1397 */
1398 static final class RunnableExecuteAction extends ForkJoinTask<Void> {
1399 final Runnable runnable;
1400 RunnableExecuteAction(Runnable runnable) {
1401 if (runnable == null) throw new NullPointerException();
1402 this.runnable = runnable;
1403 }
1404 public final Void getRawResult() { return null; }
1405 public final void setRawResult(Void v) { }
1406 public final boolean exec() { runnable.run(); return true; }
1407 void internalPropagateException(Throwable ex) {
1408 rethrow(ex); // rethrow outside exec() catches.
1409 }
1410 private static final long serialVersionUID = 5232453952276885070L;
1411 }
1412
1413 /**
1414 * Adaptor for Callables
1415 */
1416 static final class AdaptedCallable<T> extends ForkJoinTask<T>
1417 implements RunnableFuture<T> {
1418 final Callable<? extends T> callable;
1419 T result;
1420 AdaptedCallable(Callable<? extends T> callable) {
1421 if (callable == null) throw new NullPointerException();
1422 this.callable = callable;
1423 }
1424 public final T getRawResult() { return result; }
1425 public final void setRawResult(T v) { result = v; }
1426 public final boolean exec() {
1427 try {
1428 result = callable.call();
1429 return true;
1430 } catch (Error err) {
1431 throw err;
|