< prev index next >

src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java

Print this page
8259800: timeout in tck test testForkJoin(ForkJoinPool8Test)
Reviewed-by: martin


 259      * control bits occupy only (some of) the upper half (16 bits) of
 260      * status field. The lower bits are used for user-defined tags.
 261      */
 262     private static final int DONE         = 1 << 31; // must be negative
 263     private static final int ABNORMAL     = 1 << 16;
 264     private static final int THROWN       = 1 << 17;
 265     private static final int SMASK        = 0xffff;  // short bits for tags
 266     private static final int UNCOMPENSATE = 1 << 16; // helpJoin return sentinel
 267 
 268     // Fields
 269     volatile int status;                // accessed directly by pool and workers
 270     private transient volatile Aux aux; // either waiters or thrown Exception
 271 
 272     // Support for atomic operations
 273     private static final VarHandle STATUS;
 274     private static final VarHandle AUX;
 275     private int getAndBitwiseOrStatus(int v) {
 276         return (int)STATUS.getAndBitwiseOr(this, v);
 277     }
 278     private boolean casStatus(int c, int v) {
 279         return STATUS.weakCompareAndSet(this, c, v);
 280     }
 281     private boolean casAux(Aux c, Aux v) {
 282         return AUX.compareAndSet(this, c, v);
 283     }
 284 
 285     /** Removes and unparks waiters */
 286     private void signalWaiters() {
 287         for (Aux a; (a = aux) != null && a.ex == null; ) {
 288             if (casAux(a, null)) {             // detach entire list
 289                 for (Thread t; a != null; a = a.next) {
 290                     if ((t = a.thread) != Thread.currentThread() && t != null)
 291                         LockSupport.unpark(t); // don't self-signal
 292                 }
 293                 break;
 294             }
 295         }
 296     }
 297 
 298     /**
 299      * Possibly blocks until task is done or interrupted or timed out.
 300      *
 301      * @param interruptible true if wait can be cancelled by interrupt
 302      * @param deadline if non-zero use timed waits and possibly timeout
 303      * @param pool if nonnull pool to uncompensate after unblocking
 304      * @return status on exit, or ABNORMAL if interrupted while waiting
 305      */
 306     private int awaitDone(boolean interruptible, long deadline,
 307                           ForkJoinPool pool) {
 308         int s;
 309         boolean interrupted = false, queued = false, parked = false;
 310         Aux node = null;
 311         while ((s = status) >= 0) {
 312             Aux a; long ns;
 313             if (parked && Thread.interrupted()) {
 314                 if (interruptible) {
 315                     s = ABNORMAL;
 316                     break;
 317                 }
 318                 interrupted = true;
 319             }
 320             else if (queued) {
 321                 if (deadline != 0L) {
 322                     if ((ns = deadline - System.nanoTime()) <= 0L)
 323                         break;
 324                     LockSupport.parkNanos(ns);
 325                 }
 326                 else
 327                     LockSupport.park();
 328                 parked = true;
 329             }
 330             else if (node != null) {
 331                 if ((a = aux) != null && a.ex != null)
 332                     Thread.onSpinWait();     // exception in progress
 333                 else if (queued = casAux(node.next = a, node))
 334                     LockSupport.setCurrentBlocker(this);
 335             }
 336             else {
 337                 try {
 338                     node = new Aux(Thread.currentThread(), null);
 339                 } catch (Throwable ex) {     // try to cancel if cannot create
 340                     casStatus(s, s | (DONE | ABNORMAL));
 341                 }
 342             }
 343         }
 344         if (pool != null)
 345             pool.uncompensate();
 346 
 347         if (queued) {
 348             LockSupport.setCurrentBlocker(null);
 349             if (s >= 0) { // cancellation similar to AbstractQueuedSynchronizer
 350                 outer: for (Aux a; (a = aux) != null && a.ex == null; ) {
 351                     for (Aux trail = null;;) {
 352                         Aux next = a.next;
 353                         if (a == node) {
 354                             if (trail != null)
 355                                 trail.casNext(trail, next);
 356                             else if (casAux(a, next))
 357                                 break outer; // cannot be re-encountered
 358                             break;           // restart
 359                         } else {
 360                             trail = a;
 361                             if ((a = next) == null)
 362                                 break outer;
 363                         }
 364                     }
 365                 }
 366             }
 367             else {
 368                 signalWaiters();             // help clean or signal
 369                 if (interrupted)
 370                     Thread.currentThread().interrupt();
 371             }
 372         }
 373         return s;
 374     }
 375 
 376     /**
 377      * Sets DONE status and wakes up threads waiting to join this task.
 378      * @return status on exit
 379      */
 380     private int setDone() {
 381         int s = getAndBitwiseOrStatus(DONE) | DONE;
 382         signalWaiters();
 383         return s;
 384     }
 385 
 386     /**
 387      * Sets ABNORMAL DONE status unless already done, and wakes up threads
 388      * waiting to join this task.
 389      * @return status on exit
 390      */
 391     private int trySetCancelled() {
 392         int s;
 393         do {} while ((s = status) >= 0 && !casStatus(s, s |= (DONE | ABNORMAL)));
 394         signalWaiters();
 395         return s;
 396     }


 446      */
 447     final int doExec() {
 448         int s; boolean completed;
 449         if ((s = status) >= 0) {
 450             try {
 451                 completed = exec();
 452             } catch (Throwable rex) {
 453                 s = trySetException(rex);
 454                 completed = false;
 455             }
 456             if (completed)
 457                 s = setDone();
 458         }
 459         return s;
 460     }
 461 
 462     /**
 463      * Helps and/or waits for completion from join, get, or invoke;
 464      * called from either internal or external threads.
 465      *

 466      * @param ran true if task known to have been exec'd
 467      * @param interruptible true if park interruptibly when external
 468      * @param timed true if use timed wait
 469      * @param nanos if timed, timeout value
 470      * @return ABNORMAL if interrupted, else status on exit
 471      */
 472     private int awaitJoin(boolean ran, boolean interruptible, boolean timed,

 473                           long nanos) {
 474         boolean internal; ForkJoinPool p; ForkJoinPool.WorkQueue q; int s;
 475         Thread t; ForkJoinWorkerThread wt;
 476         if (internal = ((t = Thread.currentThread())
 477                         instanceof ForkJoinWorkerThread)) {
 478             p = (wt = (ForkJoinWorkerThread)t).pool;



 479             q = wt.workQueue;
 480         }
 481         else {

 482             p = ForkJoinPool.common;
 483             q = ForkJoinPool.commonQueue();




 484             if (interruptible && Thread.interrupted())
 485                 return ABNORMAL;
 486         }
 487         if ((s = status) < 0)
 488             return s;
 489         long deadline = 0L;
 490         if (timed) {
 491             if (nanos <= 0L)
 492                 return 0;
 493             else if ((deadline = nanos + System.nanoTime()) == 0L)
 494                 deadline = 1L;
 495         }
 496         ForkJoinPool uncompensate = null;
 497         if (q != null && p != null) {            // try helping
 498             if ((!timed || p.isSaturated()) &&
 499                 ((this instanceof CountedCompleter) ?
 500                  (s = p.helpComplete(this, q, internal)) < 0 :
 501                  (q.tryRemove(this, internal) && (s = doExec()) < 0)))




 502                 return s;

 503             if (internal) {
 504                 if ((s = p.helpJoin(this, q)) < 0)
 505                     return s;
 506                 if (s == UNCOMPENSATE)
 507                     uncompensate = p;
 508                 interruptible = false;





































 509             }
 510         }
 511         return awaitDone(interruptible, deadline, uncompensate);






























 512     }
 513 
 514     /**
 515      * Cancels, ignoring any exceptions thrown by cancel.  Cancel is
 516      * spec'ed not to throw any exceptions, but if it does anyway, we
 517      * have no recourse, so guard against this case.
 518      */
 519     static final void cancelIgnoringExceptions(Future<?> t) {
 520         if (t != null) {
 521             try {
 522                 t.cancel(true);
 523             } catch (Throwable ignore) {
 524             }
 525         }
 526     }
 527 
 528     /**
 529      * Returns a rethrowable exception for this task, if available.
 530      * To provide accurate stack traces, if the exception was not
 531      * thrown by the current thread, we try to create a new exception


 647             (w = (ForkJoinWorkerThread)t).workQueue.push(this, w.pool);
 648         else
 649             ForkJoinPool.common.externalPush(this);
 650         return this;
 651     }
 652 
 653     /**
 654      * Returns the result of the computation when it
 655      * {@linkplain #isDone is done}.
 656      * This method differs from {@link #get()} in that abnormal
 657      * completion results in {@code RuntimeException} or {@code Error},
 658      * not {@code ExecutionException}, and that interrupts of the
 659      * calling thread do <em>not</em> cause the method to abruptly
 660      * return by throwing {@code InterruptedException}.
 661      *
 662      * @return the computed result
 663      */
 664     public final V join() {
 665         int s;
 666         if ((s = status) >= 0)
 667             s = awaitJoin(false, false, false, 0L);
 668         if ((s & ABNORMAL) != 0)
 669             reportException(s);
 670         return getRawResult();
 671     }
 672 
 673     /**
 674      * Commences performing this task, awaits its completion if
 675      * necessary, and returns its result, or throws an (unchecked)
 676      * {@code RuntimeException} or {@code Error} if the underlying
 677      * computation did so.
 678      *
 679      * @return the computed result
 680      */
 681     public final V invoke() {
 682         int s;
 683         if ((s = doExec()) >= 0)
 684             s = awaitJoin(true, false, false, 0L);
 685         if ((s & ABNORMAL) != 0)
 686             reportException(s);
 687         return getRawResult();
 688     }
 689 
 690     /**
 691      * Forks the given tasks, returning when {@code isDone} holds for
 692      * each task or an (unchecked) exception is encountered, in which
 693      * case the exception is rethrown. If more than one task
 694      * encounters an exception, then this method throws any one of
 695      * these exceptions. If any task encounters an exception, the
 696      * other may be cancelled. However, the execution status of
 697      * individual tasks is not guaranteed upon exceptional return. The
 698      * status of each task may be obtained using {@link
 699      * #getException()} and related methods to check if they have been
 700      * cancelled, completed normally or exceptionally, or left
 701      * unprocessed.
 702      *
 703      * @param t1 the first task
 704      * @param t2 the second task
 705      * @throws NullPointerException if any task is null
 706      */
 707     public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
 708         int s1, s2;
 709         if (t1 == null || t2 == null)
 710             throw new NullPointerException();
 711         t2.fork();
 712         if ((s1 = t1.doExec()) >= 0)
 713             s1 = t1.awaitJoin(true, false, false, 0L);
 714         if ((s1 & ABNORMAL) != 0) {
 715             cancelIgnoringExceptions(t2);
 716             t1.reportException(s1);
 717         }
 718         else if (((s2 = t2.awaitJoin(false, false, false, 0L)) & ABNORMAL) != 0)
 719             t2.reportException(s2);
 720     }
 721 
 722     /**
 723      * Forks the given tasks, returning when {@code isDone} holds for
 724      * each task or an (unchecked) exception is encountered, in which
 725      * case the exception is rethrown. If more than one task
 726      * encounters an exception, then this method throws any one of
 727      * these exceptions. If any task encounters an exception, others
 728      * may be cancelled. However, the execution status of individual
 729      * tasks is not guaranteed upon exceptional return. The status of
 730      * each task may be obtained using {@link #getException()} and
 731      * related methods to check if they have been cancelled, completed
 732      * normally or exceptionally, or left unprocessed.
 733      *
 734      * @param tasks the tasks
 735      * @throws NullPointerException if any task is null
 736      */
 737     public static void invokeAll(ForkJoinTask<?>... tasks) {
 738         Throwable ex = null;
 739         int last = tasks.length - 1;
 740         for (int i = last; i >= 0; --i) {
 741             ForkJoinTask<?> t;
 742             if ((t = tasks[i]) == null) {
 743                 ex = new NullPointerException();
 744                 break;
 745             }
 746             if (i == 0) {
 747                 int s;
 748                 if ((s = t.doExec()) >= 0)
 749                     s = t.awaitJoin(true, false, false, 0L);
 750                 if ((s & ABNORMAL) != 0)
 751                     ex = t.getException(s);
 752                 break;
 753             }
 754             t.fork();
 755         }
 756         if (ex == null) {
 757             for (int i = 1; i <= last; ++i) {
 758                 ForkJoinTask<?> t;
 759                 if ((t = tasks[i]) != null) {
 760                     int s;
 761                     if ((s = t.status) >= 0)
 762                         s = t.awaitJoin(false, false, false, 0L);
 763                     if ((s & ABNORMAL) != 0 && (ex = t.getException(s)) != null)
 764                         break;
 765                 }
 766             }
 767         }
 768         if (ex != null) {
 769             for (int i = 1; i <= last; ++i)
 770                 cancelIgnoringExceptions(tasks[i]);
 771             rethrow(ex);
 772         }
 773     }
 774 
 775     /**
 776      * Forks all tasks in the specified collection, returning when
 777      * {@code isDone} holds for each task or an (unchecked) exception
 778      * is encountered, in which case the exception is rethrown. If
 779      * more than one task encounters an exception, then this method
 780      * throws any one of these exceptions. If any task encounters an
 781      * exception, others may be cancelled. However, the execution
 782      * status of individual tasks is not guaranteed upon exceptional


 792      */
 793     public static <T extends ForkJoinTask<?>> Collection<T> invokeAll(Collection<T> tasks) {
 794         if (!(tasks instanceof RandomAccess) || !(tasks instanceof List<?>)) {
 795             invokeAll(tasks.toArray(new ForkJoinTask<?>[0]));
 796             return tasks;
 797         }
 798         @SuppressWarnings("unchecked")
 799         List<? extends ForkJoinTask<?>> ts =
 800             (List<? extends ForkJoinTask<?>>) tasks;
 801         Throwable ex = null;
 802         int last = ts.size() - 1;  // nearly same as array version
 803         for (int i = last; i >= 0; --i) {
 804             ForkJoinTask<?> t;
 805             if ((t = ts.get(i)) == null) {
 806                 ex = new NullPointerException();
 807                 break;
 808             }
 809             if (i == 0) {
 810                 int s;
 811                 if ((s = t.doExec()) >= 0)
 812                     s = t.awaitJoin(true, false, false, 0L);
 813                 if ((s & ABNORMAL) != 0)
 814                     ex = t.getException(s);
 815                 break;
 816             }
 817             t.fork();
 818         }
 819         if (ex == null) {
 820             for (int i = 1; i <= last; ++i) {
 821                 ForkJoinTask<?> t;
 822                 if ((t = ts.get(i)) != null) {
 823                     int s;
 824                     if ((s = t.status) >= 0)
 825                         s = t.awaitJoin(false, false, false, 0L);
 826                     if ((s & ABNORMAL) != 0 && (ex = t.getException(s)) != null)
 827                         break;
 828                 }
 829             }
 830         }
 831         if (ex != null) {
 832             for (int i = 1; i <= last; ++i)
 833                 cancelIgnoringExceptions(ts.get(i));
 834             rethrow(ex);
 835         }
 836         return tasks;
 837     }
 838 
 839     /**
 840      * Attempts to cancel execution of this task. This attempt will
 841      * fail if the task has already completed or could not be
 842      * cancelled for some other reason. If successful, and this task
 843      * has not started when {@code cancel} is called, execution of
 844      * this task is suppressed. After this method returns
 845      * successfully, unless there is an intervening call to {@link


 956      * invocations of {@code join} and related operations.
 957      *
 958      * @since 1.8
 959      */
 960     public final void quietlyComplete() {
 961         setDone();
 962     }
 963 
 964     /**
 965      * Waits if necessary for the computation to complete, and then
 966      * retrieves its result.
 967      *
 968      * @return the computed result
 969      * @throws CancellationException if the computation was cancelled
 970      * @throws ExecutionException if the computation threw an
 971      * exception
 972      * @throws InterruptedException if the current thread is not a
 973      * member of a ForkJoinPool and was interrupted while waiting
 974      */
 975     public final V get() throws InterruptedException, ExecutionException {
 976         int s;
 977         if (((s = awaitJoin(false, true, false, 0L)) & ABNORMAL) != 0)
 978             reportExecutionException(s);
 979         return getRawResult();
 980     }
 981 
 982     /**
 983      * Waits if necessary for at most the given time for the computation
 984      * to complete, and then retrieves its result, if available.
 985      *
 986      * @param timeout the maximum time to wait
 987      * @param unit the time unit of the timeout argument
 988      * @return the computed result
 989      * @throws CancellationException if the computation was cancelled
 990      * @throws ExecutionException if the computation threw an
 991      * exception
 992      * @throws InterruptedException if the current thread is not a
 993      * member of a ForkJoinPool and was interrupted while waiting
 994      * @throws TimeoutException if the wait timed out
 995      */
 996     public final V get(long timeout, TimeUnit unit)
 997         throws InterruptedException, ExecutionException, TimeoutException {
 998         int s;
 999         if ((s = awaitJoin(false, true, true, unit.toNanos(timeout))) >= 0 ||
1000             (s & ABNORMAL) != 0)
1001             reportExecutionException(s);
1002         return getRawResult();
1003     }
1004 
1005     /**
1006      * Joins this task, without returning its result or throwing its
1007      * exception. This method may be useful when processing
1008      * collections of tasks when some have been cancelled or otherwise
1009      * known to have aborted.
1010      */
1011     public final void quietlyJoin() {
1012         if (status >= 0)
1013             awaitJoin(false, false, false, 0L);
1014     }
1015 

1016     /**
1017      * Commences performing this task and awaits its completion if
1018      * necessary, without returning its result or throwing its
1019      * exception.
1020      */
1021     public final void quietlyInvoke() {
1022         if (doExec() >= 0)
1023             awaitJoin(true, false, false, 0L);






























1024     }
1025 
1026     /**
1027      * Possibly executes tasks until the pool hosting the current task
1028      * {@linkplain ForkJoinPool#isQuiescent is quiescent}.  This
1029      * method may be of use in designs in which many tasks are forked,
1030      * but none are explicitly joined, instead executing them until
1031      * all are processed.
1032      */
1033     public static void helpQuiesce() {
1034         Thread t; ForkJoinWorkerThread w; ForkJoinPool p;
1035         if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread &&
1036             (p = (w = (ForkJoinWorkerThread)t).pool) != null)
1037             p.helpQuiescePool(w.workQueue, Long.MAX_VALUE, false);
1038         else
1039             ForkJoinPool.common.externalHelpQuiescePool(Long.MAX_VALUE, false);
1040     }
1041 
1042     /**
1043      * Resets the internal bookkeeping state of this task, allowing a




 259      * control bits occupy only (some of) the upper half (16 bits) of
 260      * status field. The lower bits are used for user-defined tags.
 261      */
 262     private static final int DONE         = 1 << 31; // must be negative
 263     private static final int ABNORMAL     = 1 << 16;
 264     private static final int THROWN       = 1 << 17;
 265     private static final int SMASK        = 0xffff;  // short bits for tags
 266     private static final int UNCOMPENSATE = 1 << 16; // helpJoin return sentinel
 267 
 268     // Fields
 269     volatile int status;                // accessed directly by pool and workers
 270     private transient volatile Aux aux; // either waiters or thrown Exception
 271 
 272     // Support for atomic operations
 273     private static final VarHandle STATUS;
 274     private static final VarHandle AUX;
 275     private int getAndBitwiseOrStatus(int v) {
 276         return (int)STATUS.getAndBitwiseOr(this, v);
 277     }
 278     private boolean casStatus(int c, int v) {
 279         return STATUS.compareAndSet(this, c, v);
 280     }
 281     private boolean casAux(Aux c, Aux v) {
 282         return AUX.compareAndSet(this, c, v);
 283     }
 284 
 285     /** Removes and unparks waiters */
 286     private void signalWaiters() {
 287         for (Aux a; (a = aux) != null && a.ex == null; ) {
 288             if (casAux(a, null)) {             // detach entire list
 289                 for (Thread t; a != null; a = a.next) {
 290                     if ((t = a.thread) != Thread.currentThread() && t != null)
 291                         LockSupport.unpark(t); // don't self-signal
 292                 }
 293                 break;
 294             }
 295         }
 296     }
 297 
 298     /**














































































 299      * Sets DONE status and wakes up threads waiting to join this task.
 300      * @return status on exit
 301      */
 302     private int setDone() {
 303         int s = getAndBitwiseOrStatus(DONE) | DONE;
 304         signalWaiters();
 305         return s;
 306     }
 307 
 308     /**
 309      * Sets ABNORMAL DONE status unless already done, and wakes up threads
 310      * waiting to join this task.
 311      * @return status on exit
 312      */
 313     private int trySetCancelled() {
 314         int s;
 315         do {} while ((s = status) >= 0 && !casStatus(s, s |= (DONE | ABNORMAL)));
 316         signalWaiters();
 317         return s;
 318     }


 368      */
 369     final int doExec() {
 370         int s; boolean completed;
 371         if ((s = status) >= 0) {
 372             try {
 373                 completed = exec();
 374             } catch (Throwable rex) {
 375                 s = trySetException(rex);
 376                 completed = false;
 377             }
 378             if (completed)
 379                 s = setDone();
 380         }
 381         return s;
 382     }
 383 
 384     /**
 385      * Helps and/or waits for completion from join, get, or invoke;
 386      * called from either internal or external threads.
 387      *
 388      * @param pool if nonnull, known submitted pool, else assumes current pool
 389      * @param ran true if task known to have been exec'd
 390      * @param interruptible true if park interruptibly when external
 391      * @param timed true if use timed wait
 392      * @param nanos if timed, timeout value
 393      * @return ABNORMAL if interrupted, else status on exit
 394      */
 395     private int awaitDone(ForkJoinPool pool, boolean ran,
 396                           boolean interruptible, boolean timed,
 397                           long nanos) {
 398         ForkJoinPool p; boolean internal; int s; Thread t;
 399         ForkJoinPool.WorkQueue q = null;
 400         if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
 401             ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
 402             p = wt.pool;
 403             if (pool == null)
 404                 pool = p;
 405             if (internal = (pool == p))
 406                 q = wt.workQueue;
 407         }
 408         else {
 409             internal = false;
 410             p = ForkJoinPool.common;
 411             if (pool == null)
 412                 pool = p;
 413             if (pool == p && p != null)
 414                 q = p.externalQueue();
 415         }
 416         if (interruptible && Thread.interrupted())
 417             return ABNORMAL;

 418         if ((s = status) < 0)
 419             return s;
 420         long deadline = 0L;
 421         if (timed) {
 422             if (nanos <= 0L)
 423                 return 0;
 424             else if ((deadline = nanos + System.nanoTime()) == 0L)
 425                 deadline = 1L;
 426         }
 427         boolean uncompensate = false;
 428         if (q != null && p != null) {  // try helping
 429             // help even in timed mode if pool has no parallelism
 430             boolean canHelp = !timed || (p.mode & SMASK) == 0;
 431             if (canHelp) {
 432                 if ((this instanceof CountedCompleter) &&
 433                     (s = p.helpComplete(this, q, internal)) < 0)
 434                     return s;
 435                 if (!ran && ((!internal && q.externalTryUnpush(this)) ||
 436                              q.tryRemove(this, internal)) && (s = doExec()) < 0)
 437                     return s;
 438             }
 439             if (internal) {
 440                 if ((s = p.helpJoin(this, q, canHelp)) < 0)
 441                     return s;
 442                 if (s == UNCOMPENSATE)
 443                     uncompensate = true;
 444             }
 445         }
 446         // block until done or cancelled wait
 447         boolean interrupted = false, queued = false;
 448         boolean parked = false, fail = false;
 449         Aux node = null;
 450         while ((s = status) >= 0) {
 451             Aux a; long ns;
 452             if (fail || (fail = (pool != null && pool.mode < 0)))
 453                 casStatus(s, s | (DONE | ABNORMAL)); // try to cancel
 454             else if (parked && Thread.interrupted()) {
 455                 if (interruptible) {
 456                     s = ABNORMAL;
 457                     break;
 458                 }
 459                 interrupted = true;
 460             }
 461             else if (queued) {
 462                 if (deadline != 0L) {
 463                     if ((ns = deadline - System.nanoTime()) <= 0L)
 464                         break;
 465                     LockSupport.parkNanos(ns);
 466                 }
 467                 else
 468                     LockSupport.park();
 469                 parked = true;
 470             }
 471             else if (node != null) {
 472                 if ((a = aux) != null && a.ex != null)
 473                     Thread.onSpinWait();     // exception in progress
 474                 else if (queued = casAux(node.next = a, node))
 475                     LockSupport.setCurrentBlocker(this);
 476             }
 477             else {
 478                 try {
 479                     node = new Aux(Thread.currentThread(), null);
 480                 } catch (Throwable ex) {     // cannot create
 481                     fail = true;
 482                 }
 483             }
 484         }
 485         if (pool != null && uncompensate)
 486             pool.uncompensate();
 487 
 488         if (queued) {
 489             LockSupport.setCurrentBlocker(null);
 490             if (s >= 0) { // cancellation similar to AbstractQueuedSynchronizer
 491                 outer: for (Aux a; (a = aux) != null && a.ex == null; ) {
 492                     for (Aux trail = null;;) {
 493                         Aux next = a.next;
 494                         if (a == node) {
 495                             if (trail != null)
 496                                 trail.casNext(trail, next);
 497                             else if (casAux(a, next))
 498                                 break outer; // cannot be re-encountered
 499                             break;           // restart
 500                         } else {
 501                             trail = a;
 502                             if ((a = next) == null)
 503                                 break outer;
 504                         }
 505                     }
 506                 }
 507             }
 508             else {
 509                 signalWaiters();             // help clean or signal
 510                 if (interrupted)
 511                     Thread.currentThread().interrupt();
 512             }
 513         }
 514         return s;
 515     }
 516 
 517     /**
 518      * Cancels, ignoring any exceptions thrown by cancel.  Cancel is
 519      * spec'ed not to throw any exceptions, but if it does anyway, we
 520      * have no recourse, so guard against this case.
 521      */
 522     static final void cancelIgnoringExceptions(Future<?> t) {
 523         if (t != null) {
 524             try {
 525                 t.cancel(true);
 526             } catch (Throwable ignore) {
 527             }
 528         }
 529     }
 530 
 531     /**
 532      * Returns a rethrowable exception for this task, if available.
 533      * To provide accurate stack traces, if the exception was not
 534      * thrown by the current thread, we try to create a new exception


 650             (w = (ForkJoinWorkerThread)t).workQueue.push(this, w.pool);
 651         else
 652             ForkJoinPool.common.externalPush(this);
 653         return this;
 654     }
 655 
 656     /**
 657      * Returns the result of the computation when it
 658      * {@linkplain #isDone is done}.
 659      * This method differs from {@link #get()} in that abnormal
 660      * completion results in {@code RuntimeException} or {@code Error},
 661      * not {@code ExecutionException}, and that interrupts of the
 662      * calling thread do <em>not</em> cause the method to abruptly
 663      * return by throwing {@code InterruptedException}.
 664      *
 665      * @return the computed result
 666      */
 667     public final V join() {
 668         int s;
 669         if ((s = status) >= 0)
 670             s = awaitDone(null, false, false, false, 0L);
 671         if ((s & ABNORMAL) != 0)
 672             reportException(s);
 673         return getRawResult();
 674     }
 675 
 676     /**
 677      * Commences performing this task, awaits its completion if
 678      * necessary, and returns its result, or throws an (unchecked)
 679      * {@code RuntimeException} or {@code Error} if the underlying
 680      * computation did so.
 681      *
 682      * @return the computed result
 683      */
 684     public final V invoke() {
 685         int s;
 686         if ((s = doExec()) >= 0)
 687             s = awaitDone(null, true, false, false, 0L);
 688         if ((s & ABNORMAL) != 0)
 689             reportException(s);
 690         return getRawResult();
 691     }
 692 
 693     /**
 694      * Forks the given tasks, returning when {@code isDone} holds for
 695      * each task or an (unchecked) exception is encountered, in which
 696      * case the exception is rethrown. If more than one task
 697      * encounters an exception, then this method throws any one of
 698      * these exceptions. If any task encounters an exception, the
 699      * other may be cancelled. However, the execution status of
 700      * individual tasks is not guaranteed upon exceptional return. The
 701      * status of each task may be obtained using {@link
 702      * #getException()} and related methods to check if they have been
 703      * cancelled, completed normally or exceptionally, or left
 704      * unprocessed.
 705      *
 706      * @param t1 the first task
 707      * @param t2 the second task
 708      * @throws NullPointerException if any task is null
 709      */
 710     public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
 711         int s1, s2;
 712         if (t1 == null || t2 == null)
 713             throw new NullPointerException();
 714         t2.fork();
 715         if ((s1 = t1.doExec()) >= 0)
 716             s1 = t1.awaitDone(null, true, false, false, 0L);
 717         if ((s1 & ABNORMAL) != 0) {
 718             cancelIgnoringExceptions(t2);
 719             t1.reportException(s1);
 720         }
 721         else if (((s2 = t2.awaitDone(null, false, false, false, 0L)) & ABNORMAL) != 0)
 722             t2.reportException(s2);
 723     }
 724 
 725     /**
 726      * Forks the given tasks, returning when {@code isDone} holds for
 727      * each task or an (unchecked) exception is encountered, in which
 728      * case the exception is rethrown. If more than one task
 729      * encounters an exception, then this method throws any one of
 730      * these exceptions. If any task encounters an exception, others
 731      * may be cancelled. However, the execution status of individual
 732      * tasks is not guaranteed upon exceptional return. The status of
 733      * each task may be obtained using {@link #getException()} and
 734      * related methods to check if they have been cancelled, completed
 735      * normally or exceptionally, or left unprocessed.
 736      *
 737      * @param tasks the tasks
 738      * @throws NullPointerException if any task is null
 739      */
 740     public static void invokeAll(ForkJoinTask<?>... tasks) {
 741         Throwable ex = null;
 742         int last = tasks.length - 1;
 743         for (int i = last; i >= 0; --i) {
 744             ForkJoinTask<?> t;
 745             if ((t = tasks[i]) == null) {
 746                 ex = new NullPointerException();
 747                 break;
 748             }
 749             if (i == 0) {
 750                 int s;
 751                 if ((s = t.doExec()) >= 0)
 752                     s = t.awaitDone(null, true, false, false, 0L);
 753                 if ((s & ABNORMAL) != 0)
 754                     ex = t.getException(s);
 755                 break;
 756             }
 757             t.fork();
 758         }
 759         if (ex == null) {
 760             for (int i = 1; i <= last; ++i) {
 761                 ForkJoinTask<?> t;
 762                 if ((t = tasks[i]) != null) {
 763                     int s;
 764                     if ((s = t.status) >= 0)
 765                         s = t.awaitDone(null, false, false, false, 0L);
 766                     if ((s & ABNORMAL) != 0 && (ex = t.getException(s)) != null)
 767                         break;
 768                 }
 769             }
 770         }
 771         if (ex != null) {
 772             for (int i = 1; i <= last; ++i)
 773                 cancelIgnoringExceptions(tasks[i]);
 774             rethrow(ex);
 775         }
 776     }
 777 
 778     /**
 779      * Forks all tasks in the specified collection, returning when
 780      * {@code isDone} holds for each task or an (unchecked) exception
 781      * is encountered, in which case the exception is rethrown. If
 782      * more than one task encounters an exception, then this method
 783      * throws any one of these exceptions. If any task encounters an
 784      * exception, others may be cancelled. However, the execution
 785      * status of individual tasks is not guaranteed upon exceptional


 795      */
 796     public static <T extends ForkJoinTask<?>> Collection<T> invokeAll(Collection<T> tasks) {
 797         if (!(tasks instanceof RandomAccess) || !(tasks instanceof List<?>)) {
 798             invokeAll(tasks.toArray(new ForkJoinTask<?>[0]));
 799             return tasks;
 800         }
 801         @SuppressWarnings("unchecked")
 802         List<? extends ForkJoinTask<?>> ts =
 803             (List<? extends ForkJoinTask<?>>) tasks;
 804         Throwable ex = null;
 805         int last = ts.size() - 1;  // nearly same as array version
 806         for (int i = last; i >= 0; --i) {
 807             ForkJoinTask<?> t;
 808             if ((t = ts.get(i)) == null) {
 809                 ex = new NullPointerException();
 810                 break;
 811             }
 812             if (i == 0) {
 813                 int s;
 814                 if ((s = t.doExec()) >= 0)
 815                     s = t.awaitDone(null, true, false, false, 0L);
 816                 if ((s & ABNORMAL) != 0)
 817                     ex = t.getException(s);
 818                 break;
 819             }
 820             t.fork();
 821         }
 822         if (ex == null) {
 823             for (int i = 1; i <= last; ++i) {
 824                 ForkJoinTask<?> t;
 825                 if ((t = ts.get(i)) != null) {
 826                     int s;
 827                     if ((s = t.status) >= 0)
 828                         s = t.awaitDone(null, false, false, false, 0L);
 829                     if ((s & ABNORMAL) != 0 && (ex = t.getException(s)) != null)
 830                         break;
 831                 }
 832             }
 833         }
 834         if (ex != null) {
 835             for (int i = 1; i <= last; ++i)
 836                 cancelIgnoringExceptions(ts.get(i));
 837             rethrow(ex);
 838         }
 839         return tasks;
 840     }
 841 
 842     /**
 843      * Attempts to cancel execution of this task. This attempt will
 844      * fail if the task has already completed or could not be
 845      * cancelled for some other reason. If successful, and this task
 846      * has not started when {@code cancel} is called, execution of
 847      * this task is suppressed. After this method returns
 848      * successfully, unless there is an intervening call to {@link


 959      * invocations of {@code join} and related operations.
 960      *
 961      * @since 1.8
 962      */
 963     public final void quietlyComplete() {
 964         setDone();
 965     }
 966 
 967     /**
 968      * Waits if necessary for the computation to complete, and then
 969      * retrieves its result.
 970      *
 971      * @return the computed result
 972      * @throws CancellationException if the computation was cancelled
 973      * @throws ExecutionException if the computation threw an
 974      * exception
 975      * @throws InterruptedException if the current thread is not a
 976      * member of a ForkJoinPool and was interrupted while waiting
 977      */
 978     public final V get() throws InterruptedException, ExecutionException {
 979         int s = awaitDone(null, false, true, false, 0L);
 980         if ((s & ABNORMAL) != 0)
 981             reportExecutionException(s);
 982         return getRawResult();
 983     }
 984 
 985     /**
 986      * Waits if necessary for at most the given time for the computation
 987      * to complete, and then retrieves its result, if available.
 988      *
 989      * @param timeout the maximum time to wait
 990      * @param unit the time unit of the timeout argument
 991      * @return the computed result
 992      * @throws CancellationException if the computation was cancelled
 993      * @throws ExecutionException if the computation threw an
 994      * exception
 995      * @throws InterruptedException if the current thread is not a
 996      * member of a ForkJoinPool and was interrupted while waiting
 997      * @throws TimeoutException if the wait timed out
 998      */
 999     public final V get(long timeout, TimeUnit unit)
1000         throws InterruptedException, ExecutionException, TimeoutException {
1001         long nanos = unit.toNanos(timeout);
1002         int s = awaitDone(null, false, true, true, nanos);
1003         if (s >= 0 || (s & ABNORMAL) != 0)
1004             reportExecutionException(s);
1005         return getRawResult();
1006     }
1007 
1008     /**
1009      * Joins this task, without returning its result or throwing its
1010      * exception. This method may be useful when processing
1011      * collections of tasks when some have been cancelled or otherwise
1012      * known to have aborted.
1013      */
1014     public final void quietlyJoin() {
1015         if (status >= 0)
1016             awaitDone(null, false, false, false, 0L);
1017     }
1018 
1019 
1020     /**
1021      * Commences performing this task and awaits its completion if
1022      * necessary, without returning its result or throwing its
1023      * exception.
1024      */
1025     public final void quietlyInvoke() {
1026         if (doExec() >= 0)
1027             awaitDone(null, true, false, false, 0L);
1028     }
1029 
1030     // Versions of join/get for pool.invoke* methods that use external,
1031     // possibly-non-commonPool submits
1032 
1033     final void awaitPoolInvoke(ForkJoinPool pool) {
1034         awaitDone(pool, false, false, false, 0L);
1035     }
1036     final void awaitPoolInvoke(ForkJoinPool pool, long nanos) {
1037         awaitDone(pool, false, true, true, nanos);
1038     }
1039     final V joinForPoolInvoke(ForkJoinPool pool) {
1040         int s = awaitDone(pool, false, false, false, 0L);
1041         if ((s & ABNORMAL) != 0)
1042             reportException(s);
1043         return getRawResult();
1044     }
1045     final V getForPoolInvoke(ForkJoinPool pool)
1046         throws InterruptedException, ExecutionException {
1047         int s = awaitDone(pool, false, true, false, 0L);
1048         if ((s & ABNORMAL) != 0)
1049             reportExecutionException(s);
1050         return getRawResult();
1051     }
1052     final V getForPoolInvoke(ForkJoinPool pool, long nanos)
1053         throws InterruptedException, ExecutionException, TimeoutException {
1054         int s = awaitDone(pool, false, true, true, nanos);
1055         if (s >= 0 || (s & ABNORMAL) != 0)
1056             reportExecutionException(s);
1057         return getRawResult();
1058     }
1059 
1060     /**
1061      * Possibly executes tasks until the pool hosting the current task
1062      * {@linkplain ForkJoinPool#isQuiescent is quiescent}.  This
1063      * method may be of use in designs in which many tasks are forked,
1064      * but none are explicitly joined, instead executing them until
1065      * all are processed.
1066      */
1067     public static void helpQuiesce() {
1068         Thread t; ForkJoinWorkerThread w; ForkJoinPool p;
1069         if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread &&
1070             (p = (w = (ForkJoinWorkerThread)t).pool) != null)
1071             p.helpQuiescePool(w.workQueue, Long.MAX_VALUE, false);
1072         else
1073             ForkJoinPool.common.externalHelpQuiescePool(Long.MAX_VALUE, false);
1074     }
1075 
1076     /**
1077      * Resets the internal bookkeeping state of this task, allowing a


< prev index next >