src/share/classes/java/util/concurrent/ForkJoinTask.java

Print this page




 447                 expungeStaleExceptions();
 448                 ExceptionNode[] t = exceptionTable;
 449                 int i = h & (t.length - 1);
 450                 for (ExceptionNode e = t[i]; ; e = e.next) {
 451                     if (e == null) {
 452                         t[i] = new ExceptionNode(this, ex, t[i]);
 453                         break;
 454                     }
 455                     if (e.get() == this) // already present
 456                         break;
 457                 }
 458             } finally {
 459                 lock.unlock();
 460             }
 461             s = setCompletion(EXCEPTIONAL);
 462         }
 463         return s;
 464     }
 465 
 466     /**
 467      * Records exception and possibly propagates
 468      *
 469      * @return status on exit
 470      */
 471     private int setExceptionalCompletion(Throwable ex) {
 472         int s = recordExceptionalCompletion(ex);
 473         if ((s & DONE_MASK) == EXCEPTIONAL)
 474             internalPropagateException(ex);
 475         return s;
 476     }
 477 
 478     /**
 479      * Hook for exception propagation support for tasks with completers.
 480      */
 481     void internalPropagateException(Throwable ex) {
 482     }
 483 
 484     /**
 485      * Cancels, ignoring any exceptions thrown by cancel. Used during
 486      * worker and pool shutdown. Cancel is spec'ed not to throw any
 487      * exceptions, but if it does anyway, we have no recourse during
 488      * shutdown, so guard against this case.
 489      */
 490     static final void cancelIgnoringExceptions(ForkJoinTask<?> t) {
 491         if (t != null && t.status >= 0) {
 492             try {
 493                 t.cancel(false);
 494             } catch (Throwable ignore) {
 495             }
 496         }
 497     }
 498 
 499     /**
 500      * Removes exception node and clears status
 501      */
 502     private void clearExceptionalCompletion() {
 503         int h = System.identityHashCode(this);
 504         final ReentrantLock lock = exceptionTableLock;
 505         lock.lock();
 506         try {
 507             ExceptionNode[] t = exceptionTable;
 508             int i = h & (t.length - 1);
 509             ExceptionNode e = t[i];
 510             ExceptionNode pred = null;
 511             while (e != null) {
 512                 ExceptionNode next = e.next;
 513                 if (e.get() == this) {
 514                     if (pred == null)
 515                         t[i] = next;
 516                     else
 517                         pred.next = next;
 518                     break;
 519                 }
 520                 pred = e;


 618     static final void helpExpungeStaleExceptions() {
 619         final ReentrantLock lock = exceptionTableLock;
 620         if (lock.tryLock()) {
 621             try {
 622                 expungeStaleExceptions();
 623             } finally {
 624                 lock.unlock();
 625             }
 626         }
 627     }
 628 
 629     /**
 630      * A version of "sneaky throw" to relay exceptions
 631      */
 632     static void rethrow(final Throwable ex) {
 633         if (ex != null) {
 634             if (ex instanceof Error)
 635                 throw (Error)ex;
 636             if (ex instanceof RuntimeException)
 637                 throw (RuntimeException)ex;
 638             throw uncheckedThrowable(ex, RuntimeException.class);
 639         }
 640     }
 641 
 642     /**
 643      * The sneaky part of sneaky throw, relying on generics
 644      * limitations to evade compiler complaints about rethrowing
 645      * unchecked exceptions
 646      */
 647     @SuppressWarnings("unchecked") static <T extends Throwable>
 648         T uncheckedThrowable(final Throwable t, final Class<T> c) {
 649         return (T)t; // rely on vacuous cast

 650     }
 651 
 652     /**
 653      * Throws exception, if any, associated with the given status.
 654      */
 655     private void reportException(int s) {
 656         if (s == CANCELLED)
 657             throw new CancellationException();
 658         if (s == EXCEPTIONAL)
 659             rethrow(getThrowableException());
 660     }
 661 
 662     // public methods
 663 
 664     /**
 665      * Arranges to asynchronously execute this task in the pool the
 666      * current task is running in, if applicable, or using the {@link
 667      * ForkJoinPool#commonPool()} if not {@link #inForkJoinPool}.  While
 668      * it is not necessarily enforced, it is a usage error to fork a
 669      * task more than once unless it has completed and been
 670      * reinitialized.  Subsequent modifications to the state of this
 671      * task or any data it operates on are not necessarily
 672      * consistently observable by any thread other than the one
 673      * executing it unless preceded by a call to {@link #join} or
 674      * related methods, or a call to {@link #isDone} returning {@code
 675      * true}.
 676      *
 677      * @return {@code this}, to simplify usage
 678      */
 679     public final ForkJoinTask<V> fork() {
 680         Thread t;
 681         if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
 682             ((ForkJoinWorkerThread)t).workQueue.push(this);
 683         else
 684             ForkJoinPool.commonPool.externalPush(this);
 685         return this;
 686     }
 687 
 688     /**
 689      * Returns the result of the computation when it {@link #isDone is
 690      * done}.  This method differs from {@link #get()} in that
 691      * abnormal completion results in {@code RuntimeException} or
 692      * {@code Error}, not {@code ExecutionException}, and that
 693      * interrupts of the calling thread do <em>not</em> cause the
 694      * method to abruptly return by throwing {@code
 695      * InterruptedException}.
 696      *
 697      * @return the computed result
 698      */
 699     public final V join() {
 700         int s;
 701         if ((s = doJoin() & DONE_MASK) != NORMAL)
 702             reportException(s);
 703         return getRawResult();
 704     }


 840 
 841     /**
 842      * Attempts to cancel execution of this task. This attempt will
 843      * fail if the task has already completed or could not be
 844      * cancelled for some other reason. If successful, and this task
 845      * has not started when {@code cancel} is called, execution of
 846      * this task is suppressed. After this method returns
 847      * successfully, unless there is an intervening call to {@link
 848      * #reinitialize}, subsequent calls to {@link #isCancelled},
 849      * {@link #isDone}, and {@code cancel} will return {@code true}
 850      * and calls to {@link #join} and related methods will result in
 851      * {@code CancellationException}.
 852      *
 853      * <p>This method may be overridden in subclasses, but if so, must
 854      * still ensure that these properties hold. In particular, the
 855      * {@code cancel} method itself must not throw exceptions.
 856      *
 857      * <p>This method is designed to be invoked by <em>other</em>
 858      * tasks. To terminate the current task, you can just return or
 859      * throw an unchecked exception from its computation method, or
 860      * invoke {@link #completeExceptionally}.
 861      *
 862      * @param mayInterruptIfRunning this value has no effect in the
 863      * default implementation because interrupts are not used to
 864      * control cancellation.
 865      *
 866      * @return {@code true} if this task is now cancelled
 867      */
 868     public boolean cancel(boolean mayInterruptIfRunning) {
 869         return (setCompletion(CANCELLED) & DONE_MASK) == CANCELLED;
 870     }
 871 
 872     public final boolean isDone() {
 873         return status < 0;
 874     }
 875 
 876     public final boolean isCancelled() {
 877         return (status & DONE_MASK) == CANCELLED;
 878     }
 879 
 880     /**


 990 
 991     /**
 992      * Waits if necessary for at most the given time for the computation
 993      * to complete, and then retrieves its result, if available.
 994      *
 995      * @param timeout the maximum time to wait
 996      * @param unit the time unit of the timeout argument
 997      * @return the computed result
 998      * @throws CancellationException if the computation was cancelled
 999      * @throws ExecutionException if the computation threw an
1000      * exception
1001      * @throws InterruptedException if the current thread is not a
1002      * member of a ForkJoinPool and was interrupted while waiting
1003      * @throws TimeoutException if the wait timed out
1004      */
1005     public final V get(long timeout, TimeUnit unit)
1006         throws InterruptedException, ExecutionException, TimeoutException {
1007         if (Thread.interrupted())
1008             throw new InterruptedException();
1009         // Messy in part because we measure in nanosecs, but wait in millisecs
1010         int s; long ns, ms;
1011         if ((s = status) >= 0 && (ns = unit.toNanos(timeout)) > 0L) {

1012             long deadline = System.nanoTime() + ns;
1013             ForkJoinPool p = null;
1014             ForkJoinPool.WorkQueue w = null;
1015             Thread t = Thread.currentThread();
1016             if (t instanceof ForkJoinWorkerThread) {
1017                 ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
1018                 p = wt.pool;
1019                 w = wt.workQueue;
1020                 p.helpJoinOnce(w, this); // no retries on failure
1021             }
1022             else
1023                 ForkJoinPool.externalHelpJoin(this);
1024             boolean canBlock = false;
1025             boolean interrupted = false;
1026             try {
1027                 while ((s = status) >= 0) {
1028                     if (w != null && w.qlock < 0)
1029                         cancelIgnoringExceptions(this);
1030                     else if (!canBlock) {
1031                         if (p == null || p.tryCompensate())


1087      * exception.
1088      */
1089     public final void quietlyInvoke() {
1090         doInvoke();
1091     }
1092 
1093     /**
1094      * Possibly executes tasks until the pool hosting the current task
1095      * {@link ForkJoinPool#isQuiescent is quiescent}. This method may
1096      * be of use in designs in which many tasks are forked, but none
1097      * are explicitly joined, instead executing them until all are
1098      * processed.
1099      */
1100     public static void helpQuiesce() {
1101         Thread t;
1102         if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
1103             ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
1104             wt.pool.helpQuiescePool(wt.workQueue);
1105         }
1106         else
1107             ForkJoinPool.externalHelpQuiescePool();
1108     }
1109 
1110     /**
1111      * Resets the internal bookkeeping state of this task, allowing a
1112      * subsequent {@code fork}. This method allows repeated reuse of
1113      * this task, but only if reuse occurs when this task has either
1114      * never been forked, or has been forked, then completed and all
1115      * outstanding joins of this task have also completed. Effects
1116      * under any other usage conditions are not guaranteed.
1117      * This method may be useful when executing
1118      * pre-constructed trees of subtasks in loops.
1119      *
1120      * <p>Upon completion of this method, {@code isDone()} reports
1121      * {@code false}, and {@code getException()} reports {@code
1122      * null}. However, the value returned by {@code getRawResult} is
1123      * unaffected. To clear this value, you can invoke {@code
1124      * setRawResult(null)}.
1125      */
1126     public void reinitialize() {
1127         if ((status & DONE_MASK) == EXCEPTIONAL)


1372         public final void run() { invoke(); }
1373         private static final long serialVersionUID = 5232453952276885070L;
1374     }
1375 
1376     /**
1377      * Adaptor for Runnables without results
1378      */
1379     static final class AdaptedRunnableAction extends ForkJoinTask<Void>
1380         implements RunnableFuture<Void> {
1381         final Runnable runnable;
1382         AdaptedRunnableAction(Runnable runnable) {
1383             if (runnable == null) throw new NullPointerException();
1384             this.runnable = runnable;
1385         }
1386         public final Void getRawResult() { return null; }
1387         public final void setRawResult(Void v) { }
1388         public final boolean exec() { runnable.run(); return true; }
1389         public final void run() { invoke(); }
1390         private static final long serialVersionUID = 5232453952276885070L;
1391     }


















1392 
1393     /**
1394      * Adaptor for Callables
1395      */
1396     static final class AdaptedCallable<T> extends ForkJoinTask<T>
1397         implements RunnableFuture<T> {
1398         final Callable<? extends T> callable;
1399         T result;
1400         AdaptedCallable(Callable<? extends T> callable) {
1401             if (callable == null) throw new NullPointerException();
1402             this.callable = callable;
1403         }
1404         public final T getRawResult() { return result; }
1405         public final void setRawResult(T v) { result = v; }
1406         public final boolean exec() {
1407             try {
1408                 result = callable.call();
1409                 return true;
1410             } catch (Error err) {
1411                 throw err;




 447                 expungeStaleExceptions();
 448                 ExceptionNode[] t = exceptionTable;
 449                 int i = h & (t.length - 1);
 450                 for (ExceptionNode e = t[i]; ; e = e.next) {
 451                     if (e == null) {
 452                         t[i] = new ExceptionNode(this, ex, t[i]);
 453                         break;
 454                     }
 455                     if (e.get() == this) // already present
 456                         break;
 457                 }
 458             } finally {
 459                 lock.unlock();
 460             }
 461             s = setCompletion(EXCEPTIONAL);
 462         }
 463         return s;
 464     }
 465 
 466     /**
 467      * Records exception and possibly propagates.
 468      *
 469      * @return status on exit
 470      */
 471     private int setExceptionalCompletion(Throwable ex) {
 472         int s = recordExceptionalCompletion(ex);
 473         if ((s & DONE_MASK) == EXCEPTIONAL)
 474             internalPropagateException(ex);
 475         return s;
 476     }
 477 
 478     /**
 479      * Hook for exception propagation support for tasks with completers.
 480      */
 481     void internalPropagateException(Throwable ex) {
 482     }
 483 
 484     /**
 485      * Cancels, ignoring any exceptions thrown by cancel. Used during
 486      * worker and pool shutdown. Cancel is spec'ed not to throw any
 487      * exceptions, but if it does anyway, we have no recourse during
 488      * shutdown, so guard against this case.
 489      */
 490     static final void cancelIgnoringExceptions(ForkJoinTask<?> t) {
 491         if (t != null && t.status >= 0) {
 492             try {
 493                 t.cancel(false);
 494             } catch (Throwable ignore) {
 495             }
 496         }
 497     }
 498 
 499     /**
 500      * Removes exception node and clears status.
 501      */
 502     private void clearExceptionalCompletion() {
 503         int h = System.identityHashCode(this);
 504         final ReentrantLock lock = exceptionTableLock;
 505         lock.lock();
 506         try {
 507             ExceptionNode[] t = exceptionTable;
 508             int i = h & (t.length - 1);
 509             ExceptionNode e = t[i];
 510             ExceptionNode pred = null;
 511             while (e != null) {
 512                 ExceptionNode next = e.next;
 513                 if (e.get() == this) {
 514                     if (pred == null)
 515                         t[i] = next;
 516                     else
 517                         pred.next = next;
 518                     break;
 519                 }
 520                 pred = e;


 618     static final void helpExpungeStaleExceptions() {
 619         final ReentrantLock lock = exceptionTableLock;
 620         if (lock.tryLock()) {
 621             try {
 622                 expungeStaleExceptions();
 623             } finally {
 624                 lock.unlock();
 625             }
 626         }
 627     }
 628 
 629     /**
 630      * A version of "sneaky throw" to relay exceptions
 631      */
 632     static void rethrow(final Throwable ex) {
 633         if (ex != null) {
 634             if (ex instanceof Error)
 635                 throw (Error)ex;
 636             if (ex instanceof RuntimeException)
 637                 throw (RuntimeException)ex;
 638             ForkJoinTask.<RuntimeException>uncheckedThrow(ex);
 639         }
 640     }
 641 
 642     /**
 643      * The sneaky part of sneaky throw, relying on generics
 644      * limitations to evade compiler complaints about rethrowing
 645      * unchecked exceptions
 646      */
 647     @SuppressWarnings("unchecked") static <T extends Throwable>
 648         void uncheckedThrow(Throwable t) throws T {
 649         if (t != null)
 650             throw (T)t; // rely on vacuous cast
 651     }
 652 
 653     /**
 654      * Throws exception, if any, associated with the given status.
 655      */
 656     private void reportException(int s) {
 657         if (s == CANCELLED)
 658             throw new CancellationException();
 659         if (s == EXCEPTIONAL)
 660             rethrow(getThrowableException());
 661     }
 662 
 663     // public methods
 664 
 665     /**
 666      * Arranges to asynchronously execute this task in the pool the
 667      * current task is running in, if applicable, or using the {@link
 668      * ForkJoinPool#commonPool()} if not {@link #inForkJoinPool}.  While
 669      * it is not necessarily enforced, it is a usage error to fork a
 670      * task more than once unless it has completed and been
 671      * reinitialized.  Subsequent modifications to the state of this
 672      * task or any data it operates on are not necessarily
 673      * consistently observable by any thread other than the one
 674      * executing it unless preceded by a call to {@link #join} or
 675      * related methods, or a call to {@link #isDone} returning {@code
 676      * true}.
 677      *
 678      * @return {@code this}, to simplify usage
 679      */
 680     public final ForkJoinTask<V> fork() {
 681         Thread t;
 682         if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
 683             ((ForkJoinWorkerThread)t).workQueue.push(this);
 684         else
 685             ForkJoinPool.common.externalPush(this);
 686         return this;
 687     }
 688 
 689     /**
 690      * Returns the result of the computation when it {@link #isDone is
 691      * done}.  This method differs from {@link #get()} in that
 692      * abnormal completion results in {@code RuntimeException} or
 693      * {@code Error}, not {@code ExecutionException}, and that
 694      * interrupts of the calling thread do <em>not</em> cause the
 695      * method to abruptly return by throwing {@code
 696      * InterruptedException}.
 697      *
 698      * @return the computed result
 699      */
 700     public final V join() {
 701         int s;
 702         if ((s = doJoin() & DONE_MASK) != NORMAL)
 703             reportException(s);
 704         return getRawResult();
 705     }


 841 
 842     /**
 843      * Attempts to cancel execution of this task. This attempt will
 844      * fail if the task has already completed or could not be
 845      * cancelled for some other reason. If successful, and this task
 846      * has not started when {@code cancel} is called, execution of
 847      * this task is suppressed. After this method returns
 848      * successfully, unless there is an intervening call to {@link
 849      * #reinitialize}, subsequent calls to {@link #isCancelled},
 850      * {@link #isDone}, and {@code cancel} will return {@code true}
 851      * and calls to {@link #join} and related methods will result in
 852      * {@code CancellationException}.
 853      *
 854      * <p>This method may be overridden in subclasses, but if so, must
 855      * still ensure that these properties hold. In particular, the
 856      * {@code cancel} method itself must not throw exceptions.
 857      *
 858      * <p>This method is designed to be invoked by <em>other</em>
 859      * tasks. To terminate the current task, you can just return or
 860      * throw an unchecked exception from its computation method, or
 861      * invoke {@link #completeExceptionally(Throwable)}.
 862      *
 863      * @param mayInterruptIfRunning this value has no effect in the
 864      * default implementation because interrupts are not used to
 865      * control cancellation.
 866      *
 867      * @return {@code true} if this task is now cancelled
 868      */
 869     public boolean cancel(boolean mayInterruptIfRunning) {
 870         return (setCompletion(CANCELLED) & DONE_MASK) == CANCELLED;
 871     }
 872 
 873     public final boolean isDone() {
 874         return status < 0;
 875     }
 876 
 877     public final boolean isCancelled() {
 878         return (status & DONE_MASK) == CANCELLED;
 879     }
 880 
 881     /**


 991 
 992     /**
 993      * Waits if necessary for at most the given time for the computation
 994      * to complete, and then retrieves its result, if available.
 995      *
 996      * @param timeout the maximum time to wait
 997      * @param unit the time unit of the timeout argument
 998      * @return the computed result
 999      * @throws CancellationException if the computation was cancelled
1000      * @throws ExecutionException if the computation threw an
1001      * exception
1002      * @throws InterruptedException if the current thread is not a
1003      * member of a ForkJoinPool and was interrupted while waiting
1004      * @throws TimeoutException if the wait timed out
1005      */
1006     public final V get(long timeout, TimeUnit unit)
1007         throws InterruptedException, ExecutionException, TimeoutException {
1008         if (Thread.interrupted())
1009             throw new InterruptedException();
1010         // Messy in part because we measure in nanosecs, but wait in millisecs
1011         int s; long ms;
1012         long ns = unit.toNanos(timeout);
1013         if ((s = status) >= 0 && ns > 0L) {
1014             long deadline = System.nanoTime() + ns;
1015             ForkJoinPool p = null;
1016             ForkJoinPool.WorkQueue w = null;
1017             Thread t = Thread.currentThread();
1018             if (t instanceof ForkJoinWorkerThread) {
1019                 ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
1020                 p = wt.pool;
1021                 w = wt.workQueue;
1022                 p.helpJoinOnce(w, this); // no retries on failure
1023             }
1024             else
1025                 ForkJoinPool.externalHelpJoin(this);
1026             boolean canBlock = false;
1027             boolean interrupted = false;
1028             try {
1029                 while ((s = status) >= 0) {
1030                     if (w != null && w.qlock < 0)
1031                         cancelIgnoringExceptions(this);
1032                     else if (!canBlock) {
1033                         if (p == null || p.tryCompensate())


1089      * exception.
1090      */
1091     public final void quietlyInvoke() {
1092         doInvoke();
1093     }
1094 
1095     /**
1096      * Possibly executes tasks until the pool hosting the current task
1097      * {@link ForkJoinPool#isQuiescent is quiescent}. This method may
1098      * be of use in designs in which many tasks are forked, but none
1099      * are explicitly joined, instead executing them until all are
1100      * processed.
1101      */
1102     public static void helpQuiesce() {
1103         Thread t;
1104         if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
1105             ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
1106             wt.pool.helpQuiescePool(wt.workQueue);
1107         }
1108         else
1109             ForkJoinPool.quiesceCommonPool();
1110     }
1111 
1112     /**
1113      * Resets the internal bookkeeping state of this task, allowing a
1114      * subsequent {@code fork}. This method allows repeated reuse of
1115      * this task, but only if reuse occurs when this task has either
1116      * never been forked, or has been forked, then completed and all
1117      * outstanding joins of this task have also completed. Effects
1118      * under any other usage conditions are not guaranteed.
1119      * This method may be useful when executing
1120      * pre-constructed trees of subtasks in loops.
1121      *
1122      * <p>Upon completion of this method, {@code isDone()} reports
1123      * {@code false}, and {@code getException()} reports {@code
1124      * null}. However, the value returned by {@code getRawResult} is
1125      * unaffected. To clear this value, you can invoke {@code
1126      * setRawResult(null)}.
1127      */
1128     public void reinitialize() {
1129         if ((status & DONE_MASK) == EXCEPTIONAL)


1374         public final void run() { invoke(); }
1375         private static final long serialVersionUID = 5232453952276885070L;
1376     }
1377 
1378     /**
1379      * Adaptor for Runnables without results
1380      */
1381     static final class AdaptedRunnableAction extends ForkJoinTask<Void>
1382         implements RunnableFuture<Void> {
1383         final Runnable runnable;
1384         AdaptedRunnableAction(Runnable runnable) {
1385             if (runnable == null) throw new NullPointerException();
1386             this.runnable = runnable;
1387         }
1388         public final Void getRawResult() { return null; }
1389         public final void setRawResult(Void v) { }
1390         public final boolean exec() { runnable.run(); return true; }
1391         public final void run() { invoke(); }
1392         private static final long serialVersionUID = 5232453952276885070L;
1393     }
1394 
1395     /**
1396      * Adaptor for Runnables in which failure forces worker exception
1397      */
1398     static final class RunnableExecuteAction extends ForkJoinTask<Void> {
1399         final Runnable runnable;
1400         RunnableExecuteAction(Runnable runnable) {
1401             if (runnable == null) throw new NullPointerException();
1402             this.runnable = runnable;
1403         }
1404         public final Void getRawResult() { return null; }
1405         public final void setRawResult(Void v) { }
1406         public final boolean exec() { runnable.run(); return true; }
1407         void internalPropagateException(Throwable ex) {
1408             rethrow(ex); // rethrow outside exec() catches.
1409         }
1410         private static final long serialVersionUID = 5232453952276885070L;
1411     }
1412 
1413     /**
1414      * Adaptor for Callables
1415      */
1416     static final class AdaptedCallable<T> extends ForkJoinTask<T>
1417         implements RunnableFuture<T> {
1418         final Callable<? extends T> callable;
1419         T result;
1420         AdaptedCallable(Callable<? extends T> callable) {
1421             if (callable == null) throw new NullPointerException();
1422             this.callable = callable;
1423         }
1424         public final T getRawResult() { return result; }
1425         public final void setRawResult(T v) { result = v; }
1426         public final boolean exec() {
1427             try {
1428                 result = callable.call();
1429                 return true;
1430             } catch (Error err) {
1431                 throw err;