1 /*
   2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   3  *
   4  * This code is free software; you can redistribute it and/or modify it
   5  * under the terms of the GNU General Public License version 2 only, as
   6  * published by the Free Software Foundation.  Oracle designates this
   7  * particular file as subject to the "Classpath" exception as provided
   8  * by Oracle in the LICENSE file that accompanied this code.
   9  *
  10  * This code is distributed in the hope that it will be useful, but WITHOUT
  11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  13  * version 2 for more details (a copy is included in the LICENSE file that
  14  * accompanied this code).
  15  *
  16  * You should have received a copy of the GNU General Public License version
  17  * 2 along with this work; if not, write to the Free Software Foundation,
  18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  19  *
  20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  21  * or visit www.oracle.com if you need additional information or have any
  22  * questions.
  23  */
  24 
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/licenses/publicdomain
  34  */
  35 
  36 package java.util.concurrent;
  37 
  38 import java.io.Serializable;
  39 import java.util.Collection;
  40 import java.util.Collections;
  41 import java.util.List;
  42 import java.util.RandomAccess;
  43 import java.util.Map;
  44 import java.util.WeakHashMap;










  45 
  46 /**
  47  * Abstract base class for tasks that run within a {@link ForkJoinPool}.
  48  * A {@code ForkJoinTask} is a thread-like entity that is much
  49  * lighter weight than a normal thread.  Huge numbers of tasks and
  50  * subtasks may be hosted by a small number of actual threads in a
  51  * ForkJoinPool, at the price of some usage limitations.
  52  *
  53  * <p>A "main" {@code ForkJoinTask} begins execution when submitted
  54  * to a {@link ForkJoinPool}.  Once started, it will usually in turn
  55  * start other subtasks.  As indicated by the name of this class,
  56  * many programs using {@code ForkJoinTask} employ only methods
  57  * {@link #fork} and {@link #join}, or derivatives such as {@link
  58  * #invokeAll(ForkJoinTask...) invokeAll}.  However, this class also
  59  * provides a number of other methods that can come into play in
  60  * advanced usages, as well as extension mechanics that allow
  61  * support of new forms of fork/join processing.
  62  *
  63  * <p>A {@code ForkJoinTask} is a lightweight form of {@link Future}.
  64  * The efficiency of {@code ForkJoinTask}s stems from a set of
  65  * restrictions (that are only partially statically enforceable)
  66  * reflecting their intended use as computational tasks calculating
  67  * pure functions or operating on purely isolated objects.  The
  68  * primary coordination mechanisms are {@link #fork}, that arranges
  69  * asynchronous execution, and {@link #join}, that doesn't proceed
  70  * until the task's result has been computed.  Computations should
  71  * avoid {@code synchronized} methods or blocks, and should minimize
  72  * other blocking synchronization apart from joining other tasks or
  73  * using synchronizers such as Phasers that are advertised to
  74  * cooperate with fork/join scheduling. Tasks should also not perform
  75  * blocking IO, and should ideally access variables that are
  76  * completely independent of those accessed by other running
  77  * tasks. Minor breaches of these restrictions, for example using
  78  * shared output streams, may be tolerable in practice, but frequent
  79  * use may result in poor performance, and the potential to
  80  * indefinitely stall if the number of threads not waiting for IO or
  81  * other external synchronization becomes exhausted. This usage
  82  * restriction is in part enforced by not permitting checked
  83  * exceptions such as {@code IOExceptions} to be thrown. However,
  84  * computations may still encounter unchecked exceptions, that are
  85  * rethrown to callers attempting to join them. These exceptions may
  86  * additionally include {@link RejectedExecutionException} stemming
  87  * from internal resource exhaustion, such as failure to allocate
  88  * internal task queues.
  89  *
  90  * <p>The primary method for awaiting completion and extracting
  91  * results of a task is {@link #join}, but there are several variants:
  92  * The {@link Future#get} methods support interruptible and/or timed
  93  * waits for completion and report results using {@code Future}
  94  * conventions. Method {@link #invoke} is semantically
  95  * equivalent to {@code fork(); join()} but always attempts to begin
  96  * execution in the current thread. The "<em>quiet</em>" forms of
  97  * these methods do not extract results or report exceptions. These
  98  * may be useful when a set of tasks are being executed, and you need
  99  * to delay processing of results or exceptions until all complete.
 100  * Method {@code invokeAll} (available in multiple versions)
 101  * performs the most common form of parallel invocation: forking a set
 102  * of tasks and joining them all.
 103  *
 104  * <p>The execution status of tasks may be queried at several levels
 105  * of detail: {@link #isDone} is true if a task completed in any way
 106  * (including the case where a task was cancelled without executing);
 107  * {@link #isCompletedNormally} is true if a task completed without
 108  * cancellation or encountering an exception; {@link #isCancelled} is
 109  * true if the task was cancelled (in which case {@link #getException}
 110  * returns a {@link java.util.concurrent.CancellationException}); and
 111  * {@link #isCompletedAbnormally} is true if a task was either
 112  * cancelled or encountered an exception, in which case {@link
 113  * #getException} will return either the encountered exception or
 114  * {@link java.util.concurrent.CancellationException}.
 115  *
 116  * <p>The ForkJoinTask class is not usually directly subclassed.
 117  * Instead, you subclass one of the abstract classes that support a
 118  * particular style of fork/join processing, typically {@link
 119  * RecursiveAction} for computations that do not return results, or
 120  * {@link RecursiveTask} for those that do.  Normally, a concrete
 121  * ForkJoinTask subclass declares fields comprising its parameters,
 122  * established in a constructor, and then defines a {@code compute}
 123  * method that somehow uses the control methods supplied by this base
 124  * class. While these methods have {@code public} access (to allow
 125  * instances of different task subclasses to call each other's
 126  * methods), some of them may only be called from within other
 127  * ForkJoinTasks (as may be determined using method {@link
 128  * #inForkJoinPool}).  Attempts to invoke them in other contexts
 129  * result in exceptions or errors, possibly including
 130  * {@code ClassCastException}.
 131  *










 132  * <p>Most base support methods are {@code final}, to prevent
 133  * overriding of implementations that are intrinsically tied to the
 134  * underlying lightweight task scheduling framework.  Developers
 135  * creating new basic styles of fork/join processing should minimally
 136  * implement {@code protected} methods {@link #exec}, {@link
 137  * #setRawResult}, and {@link #getRawResult}, while also introducing
 138  * an abstract computational method that can be implemented in its
 139  * subclasses, possibly relying on other {@code protected} methods
 140  * provided by this class.
 141  *
 142  * <p>ForkJoinTasks should perform relatively small amounts of
 143  * computation. Large tasks should be split into smaller subtasks,
 144  * usually via recursive decomposition. As a very rough rule of thumb,
 145  * a task should perform more than 100 and less than 10000 basic
 146  * computational steps. If tasks are too big, then parallelism cannot
 147  * improve throughput. If too small, then memory and internal task
 148  * maintenance overhead may overwhelm processing.

 149  *
 150  * <p>This class provides {@code adapt} methods for {@link Runnable}
 151  * and {@link Callable}, that may be of use when mixing execution of
 152  * {@code ForkJoinTasks} with other kinds of tasks. When all tasks are
 153  * of this form, consider using a pool constructed in <em>asyncMode</em>.
 154  *
 155  * <p>ForkJoinTasks are {@code Serializable}, which enables them to be
 156  * used in extensions such as remote execution frameworks. It is
 157  * sensible to serialize tasks only before or after, but not during,
 158  * execution. Serialization is not relied on during execution itself.
 159  *
 160  * @since 1.7
 161  * @author Doug Lea
 162  */
 163 public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
 164 
 165     /*
 166      * See the internal documentation of class ForkJoinPool for a
 167      * general implementation overview.  ForkJoinTasks are mainly
 168      * responsible for maintaining their "status" field amidst relays
 169      * to methods in ForkJoinWorkerThread and ForkJoinPool. The
 170      * methods of this class are more-or-less layered into (1) basic
 171      * status maintenance (2) execution and awaiting completion (3)
 172      * user-level methods that additionally report results. This is
 173      * sometimes hard to see because this file orders exported methods
 174      * in a way that flows well in javadocs. In particular, most
 175      * join mechanics are in method quietlyJoin, below.
 176      */
 177 
 178     /*
 179      * The status field holds run control status bits packed into a
 180      * single int to minimize footprint and to ensure atomicity (via
 181      * CAS).  Status is initially zero, and takes on nonnegative
 182      * values until completed, upon which status holds value
 183      * NORMAL, CANCELLED, or EXCEPTIONAL. Tasks undergoing blocking
 184      * waits by other threads have the SIGNAL bit set.  Completion of
 185      * a stolen task with SIGNAL set awakens any waiters via
 186      * notifyAll. Even though suboptimal for some purposes, we use
 187      * basic builtin wait/notify to take advantage of "monitor
 188      * inflation" in JVMs that we would otherwise need to emulate to
 189      * avoid adding further per-task bookkeeping overhead.  We want
 190      * these monitors to be "fat", i.e., not use biasing or thin-lock
 191      * techniques, so use some odd coding idioms that tend to avoid
 192      * them.
 193      */
 194 
 195     /** The run status of this task */
 196     volatile int status; // accessed directly by pool and workers
 197 
 198     private static final int NORMAL      = -1;
 199     private static final int CANCELLED   = -2;
 200     private static final int EXCEPTIONAL = -3;
 201     private static final int SIGNAL      =  1;
 202 
 203     /**
 204      * Table of exceptions thrown by tasks, to enable reporting by
 205      * callers. Because exceptions are rare, we don't directly keep
 206      * them with task objects, but instead use a weak ref table.  Note
 207      * that cancellation exceptions don't appear in the table, but are
 208      * instead recorded as status values.
 209      * TODO: Use ConcurrentReferenceHashMap
 210      */
 211     static final Map<ForkJoinTask<?>, Throwable> exceptionMap =
 212         Collections.synchronizedMap
 213         (new WeakHashMap<ForkJoinTask<?>, Throwable>());
 214 
 215     // Maintaining completion status
 216 
 217     /**
 218      * Marks completion and wakes up threads waiting to join this task,
 219      * also clearing signal request bits.
 220      *
 221      * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
 222      */
 223     private void setCompletion(int completion) {
 224         int s;
 225         while ((s = status) >= 0) {
 226             if (UNSAFE.compareAndSwapInt(this, statusOffset, s, completion)) {
 227                 if (s != 0)
 228                     synchronized (this) { notifyAll(); }
 229                 break;
 230             }
 231         }
 232     }
 233 
 234     /**
 235      * Records exception and sets exceptional completion.
 236      *
 237      * @return status on exit
 238      */
 239     private void setExceptionalCompletion(Throwable rex) {
 240         exceptionMap.put(this, rex);
 241         setCompletion(EXCEPTIONAL);
 242     }
 243 
 244     /**
 245      * Blocks a worker thread until completion. Called only by
 246      * pool. Currently unused -- pool-based waits use timeout
 247      * version below.
 248      */
 249     final void internalAwaitDone() {
 250         int s;         // the odd construction reduces lock bias effects
 251         while ((s = status) >= 0) {
 252             try {
 253                 synchronized (this) {
 254                     if (UNSAFE.compareAndSwapInt(this, statusOffset, s,SIGNAL))
 255                         wait();
 256                 }
 257             } catch (InterruptedException ie) {
 258                 cancelIfTerminating();
 259             }
 260         }
 261     }
 262 
 263     /**
 264      * Blocks a worker thread until completed or timed out.  Called
 265      * only by pool.
 266      *
 267      * @return status on exit
 268      */
 269     final int internalAwaitDone(long millis) {
 270         int s;
 271         if ((s = status) >= 0) {
 272             try {


 273                 synchronized (this) {
 274                     if (UNSAFE.compareAndSwapInt(this, statusOffset, s,SIGNAL))
 275                         wait(millis, 0);


 276                 }
 277             } catch (InterruptedException ie) {
 278                 cancelIfTerminating();
 279             }
 280             s = status;
 281         }
 282         return s;
 283     }
 284 
 285     /**
 286      * Blocks a non-worker-thread until completion.
 287      */
 288     private void externalAwaitDone() {
 289         int s;
 290         while ((s = status) >= 0) {
 291             synchronized (this) {
 292                 if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)){
 293                     boolean interrupted = false;
 294                     while (status >= 0) {










 295                         try {
 296                             wait();
 297                         } catch (InterruptedException ie) {
 298                             interrupted = true;
 299                         }
 300                     }


 301                     if (interrupted)
 302                         Thread.currentThread().interrupt();





















 303                     break;
 304                 }






 305             }
 306         }
 307     }

 308 
 309     /**
 310      * Unless done, calls exec and records status if completed, but
 311      * doesn't wait for completion otherwise. Primary execution method
 312      * for ForkJoinWorkerThread.
 313      */
 314     final void quietlyExec() {
 315         try {
 316             if (status < 0 || !exec())
 317                 return;
 318         } catch (Throwable rex) {
 319             setExceptionalCompletion(rex);
 320             return;
 321         }
 322         setCompletion(NORMAL); // must be outside try block
 323     }
 324 
 325     // public methods
 326 
 327     /**
 328      * Arranges to asynchronously execute this task.  While it is not
 329      * necessarily enforced, it is a usage error to fork a task more
 330      * than once unless it has completed and been reinitialized.
 331      * Subsequent modifications to the state of this task or any data
 332      * it operates on are not necessarily consistently observable by
 333      * any thread other than the one executing it unless preceded by a
 334      * call to {@link #join} or related methods, or a call to {@link
 335      * #isDone} returning {@code true}.
 336      *
 337      * <p>This method may be invoked only from within {@code
 338      * ForkJoinTask} computations (as may be determined using method
 339      * {@link #inForkJoinPool}).  Attempts to invoke in other contexts
 340      * result in exceptions or errors, possibly including {@code
 341      * ClassCastException}.
 342      *
 343      * @return {@code this}, to simplify usage
 344      */
 345     public final ForkJoinTask<V> fork() {
 346         ((ForkJoinWorkerThread) Thread.currentThread())
 347             .pushTask(this);
 348         return this;
 349     }
 350 
 351     /**
 352      * Returns the result of the computation when it {@link #isDone is done}.
 353      * This method differs from {@link #get()} in that
 354      * abnormal completion results in {@code RuntimeException} or
 355      * {@code Error}, not {@code ExecutionException}.



 356      *
 357      * @return the computed result
 358      */
 359     public final V join() {
 360         quietlyJoin();
 361         Throwable ex;
 362         if (status < NORMAL && (ex = getException()) != null)
 363             UNSAFE.throwException(ex);
 364         return getRawResult();
 365     }
 366 
 367     /**
 368      * Commences performing this task, awaits its completion if
 369      * necessary, and returns its result, or throws an (unchecked)
 370      * {@code RuntimeException} or {@code Error} if the underlying
 371      * computation did so.
 372      *
 373      * @return the computed result
 374      */
 375     public final V invoke() {
 376         quietlyInvoke();
 377         Throwable ex;
 378         if (status < NORMAL && (ex = getException()) != null)
 379             UNSAFE.throwException(ex);
 380         return getRawResult();
 381     }
 382 
 383     /**
 384      * Forks the given tasks, returning when {@code isDone} holds for
 385      * each task or an (unchecked) exception is encountered, in which
 386      * case the exception is rethrown. If more than one task
 387      * encounters an exception, then this method throws any one of
 388      * these exceptions. If any task encounters an exception, the
 389      * other may be cancelled. However, the execution status of
 390      * individual tasks is not guaranteed upon exceptional return. The
 391      * status of each task may be obtained using {@link
 392      * #getException()} and related methods to check if they have been
 393      * cancelled, completed normally or exceptionally, or left
 394      * unprocessed.
 395      *
 396      * <p>This method may be invoked only from within {@code
 397      * ForkJoinTask} computations (as may be determined using method
 398      * {@link #inForkJoinPool}).  Attempts to invoke in other contexts
 399      * result in exceptions or errors, possibly including {@code
 400      * ClassCastException}.
 401      *
 402      * @param t1 the first task
 403      * @param t2 the second task
 404      * @throws NullPointerException if any task is null
 405      */
 406     public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
 407         t2.fork();
 408         t1.invoke();
 409         t2.join();
 410     }
 411 
 412     /**
 413      * Forks the given tasks, returning when {@code isDone} holds for
 414      * each task or an (unchecked) exception is encountered, in which
 415      * case the exception is rethrown. If more than one task
 416      * encounters an exception, then this method throws any one of
 417      * these exceptions. If any task encounters an exception, others
 418      * may be cancelled. However, the execution status of individual
 419      * tasks is not guaranteed upon exceptional return. The status of
 420      * each task may be obtained using {@link #getException()} and
 421      * related methods to check if they have been cancelled, completed
 422      * normally or exceptionally, or left unprocessed.
 423      *
 424      * <p>This method may be invoked only from within {@code
 425      * ForkJoinTask} computations (as may be determined using method
 426      * {@link #inForkJoinPool}).  Attempts to invoke in other contexts
 427      * result in exceptions or errors, possibly including {@code
 428      * ClassCastException}.
 429      *
 430      * @param tasks the tasks
 431      * @throws NullPointerException if any task is null
 432      */
 433     public static void invokeAll(ForkJoinTask<?>... tasks) {
 434         Throwable ex = null;
 435         int last = tasks.length - 1;
 436         for (int i = last; i >= 0; --i) {
 437             ForkJoinTask<?> t = tasks[i];
 438             if (t == null) {
 439                 if (ex == null)
 440                     ex = new NullPointerException();
 441             }
 442             else if (i != 0)
 443                 t.fork();
 444             else {
 445                 t.quietlyInvoke();
 446                 if (ex == null && t.status < NORMAL)
 447                     ex = t.getException();
 448             }
 449         }
 450         for (int i = 1; i <= last; ++i) {
 451             ForkJoinTask<?> t = tasks[i];
 452             if (t != null) {
 453                 if (ex != null)
 454                     t.cancel(false);
 455                 else {
 456                     t.quietlyJoin();
 457                     if (ex == null && t.status < NORMAL)
 458                         ex = t.getException();
 459                 }
 460             }
 461         }
 462         if (ex != null)
 463             UNSAFE.throwException(ex);
 464     }
 465 
 466     /**
 467      * Forks all tasks in the specified collection, returning when
 468      * {@code isDone} holds for each task or an (unchecked) exception
 469      * is encountered, in which case the exception is rethrown. If
 470      * more than one task encounters an exception, then this method
 471      * throws any one of these exceptions. If any task encounters an
 472      * exception, others may be cancelled. However, the execution
 473      * status of individual tasks is not guaranteed upon exceptional
 474      * return. The status of each task may be obtained using {@link
 475      * #getException()} and related methods to check if they have been
 476      * cancelled, completed normally or exceptionally, or left
 477      * unprocessed.
 478      *
 479      * <p>This method may be invoked only from within {@code
 480      * ForkJoinTask} computations (as may be determined using method
 481      * {@link #inForkJoinPool}).  Attempts to invoke in other contexts
 482      * result in exceptions or errors, possibly including {@code
 483      * ClassCastException}.
 484      *
 485      * @param tasks the collection of tasks
 486      * @return the tasks argument, to simplify usage
 487      * @throws NullPointerException if tasks or any element are null
 488      */
 489     public static <T extends ForkJoinTask<?>> Collection<T> invokeAll(Collection<T> tasks) {
 490         if (!(tasks instanceof RandomAccess) || !(tasks instanceof List<?>)) {
 491             invokeAll(tasks.toArray(new ForkJoinTask<?>[tasks.size()]));
 492             return tasks;
 493         }
 494         @SuppressWarnings("unchecked")
 495         List<? extends ForkJoinTask<?>> ts =
 496             (List<? extends ForkJoinTask<?>>) tasks;
 497         Throwable ex = null;
 498         int last = ts.size() - 1;
 499         for (int i = last; i >= 0; --i) {
 500             ForkJoinTask<?> t = ts.get(i);
 501             if (t == null) {
 502                 if (ex == null)
 503                     ex = new NullPointerException();
 504             }
 505             else if (i != 0)
 506                 t.fork();
 507             else {
 508                 t.quietlyInvoke();
 509                 if (ex == null && t.status < NORMAL)
 510                     ex = t.getException();
 511             }
 512         }
 513         for (int i = 1; i <= last; ++i) {
 514             ForkJoinTask<?> t = ts.get(i);
 515             if (t != null) {
 516                 if (ex != null)
 517                     t.cancel(false);
 518                 else {
 519                     t.quietlyJoin();
 520                     if (ex == null && t.status < NORMAL)
 521                         ex = t.getException();
 522                 }
 523             }
 524         }
 525         if (ex != null)
 526             UNSAFE.throwException(ex);
 527         return tasks;
 528     }
 529 
 530     /**
 531      * Attempts to cancel execution of this task. This attempt will
 532      * fail if the task has already completed, has already been
 533      * cancelled, or could not be cancelled for some other reason. If
 534      * successful, and this task has not started when cancel is
 535      * called, execution of this task is suppressed, {@link
 536      * #isCancelled} will report true, and {@link #join} will result
 537      * in a {@code CancellationException} being thrown.



 538      *
 539      * <p>This method may be overridden in subclasses, but if so, must
 540      * still ensure that these minimal properties hold. In particular,
 541      * the {@code cancel} method itself must not throw exceptions.
 542      *
 543      * <p>This method is designed to be invoked by <em>other</em>
 544      * tasks. To terminate the current task, you can just return or
 545      * throw an unchecked exception from its computation method, or
 546      * invoke {@link #completeExceptionally}.
 547      *
 548      * @param mayInterruptIfRunning this value is ignored in the
 549      * default implementation because tasks are not
 550      * cancelled via interruption
 551      *
 552      * @return {@code true} if this task is now cancelled
 553      */
 554     public boolean cancel(boolean mayInterruptIfRunning) {
 555         setCompletion(CANCELLED);
 556         return status == CANCELLED;
 557     }
 558 
 559     /**
 560      * Cancels, ignoring any exceptions thrown by cancel. Used during
 561      * worker and pool shutdown. Cancel is spec'ed not to throw any
 562      * exceptions, but if it does anyway, we have no recourse during
 563      * shutdown, so guard against this case.
 564      */
 565     final void cancelIgnoringExceptions() {
 566         try {
 567             cancel(false);
 568         } catch (Throwable ignore) {
 569         }
 570     }
 571 
 572     /**
 573      * Cancels if current thread is a terminating worker thread,
 574      * ignoring any exceptions thrown by cancel.
 575      */
 576     final void cancelIfTerminating() {
 577         Thread t = Thread.currentThread();
 578         if ((t instanceof ForkJoinWorkerThread) &&
 579             ((ForkJoinWorkerThread) t).isTerminating()) {
 580             try {
 581                 cancel(false);
 582             } catch (Throwable ignore) {
 583             }
 584         }
 585     }
 586 
 587     public final boolean isDone() {
 588         return status < 0;
 589     }
 590 
 591     public final boolean isCancelled() {
 592         return status == CANCELLED;
 593     }
 594 
 595     /**
 596      * Returns {@code true} if this task threw an exception or was cancelled.
 597      *
 598      * @return {@code true} if this task threw an exception or was cancelled
 599      */
 600     public final boolean isCompletedAbnormally() {
 601         return status < NORMAL;
 602     }
 603 
 604     /**
 605      * Returns {@code true} if this task completed without throwing an
 606      * exception and was not cancelled.
 607      *
 608      * @return {@code true} if this task completed without throwing an
 609      * exception and was not cancelled
 610      */
 611     public final boolean isCompletedNormally() {
 612         return status == NORMAL;
 613     }
 614 
 615     /**
 616      * Returns the exception thrown by the base computation, or a
 617      * {@code CancellationException} if cancelled, or {@code null} if
 618      * none or if the method has not yet completed.
 619      *
 620      * @return the exception, or {@code null} if none
 621      */
 622     public final Throwable getException() {
 623         int s = status;
 624         return ((s >= NORMAL)    ? null :
 625                 (s == CANCELLED) ? new CancellationException() :
 626                 exceptionMap.get(this));
 627     }
 628 
 629     /**
 630      * Completes this task abnormally, and if not already aborted or
 631      * cancelled, causes it to throw the given exception upon
 632      * {@code join} and related operations. This method may be used
 633      * to induce exceptions in asynchronous tasks, or to force
 634      * completion of tasks that would not otherwise complete.  Its use
 635      * in other situations is discouraged.  This method is
 636      * overridable, but overridden versions must invoke {@code super}
 637      * implementation to maintain guarantees.
 638      *
 639      * @param ex the exception to throw. If this exception is not a
 640      * {@code RuntimeException} or {@code Error}, the actual exception
 641      * thrown will be a {@code RuntimeException} with cause {@code ex}.
 642      */
 643     public void completeExceptionally(Throwable ex) {
 644         setExceptionalCompletion((ex instanceof RuntimeException) ||
 645                                  (ex instanceof Error) ? ex :
 646                                  new RuntimeException(ex));
 647     }
 648 
 649     /**
 650      * Completes this task, and if not already aborted or cancelled,
 651      * returning the given value as the result of subsequent
 652      * invocations of {@code join} and related operations. This method
 653      * may be used to provide results for asynchronous tasks, or to
 654      * provide alternative handling for tasks that would not otherwise
 655      * complete normally. Its use in other situations is
 656      * discouraged. This method is overridable, but overridden
 657      * versions must invoke {@code super} implementation to maintain
 658      * guarantees.
 659      *
 660      * @param value the result value for this task
 661      */
 662     public void complete(V value) {
 663         try {
 664             setRawResult(value);
 665         } catch (Throwable rex) {
 666             setExceptionalCompletion(rex);
 667             return;
 668         }
 669         setCompletion(NORMAL);
 670     }
 671 
 672     /**
 673      * Waits if necessary for the computation to complete, and then
 674      * retrieves its result.
 675      *
 676      * @return the computed result
 677      * @throws CancellationException if the computation was cancelled
 678      * @throws ExecutionException if the computation threw an
 679      * exception
 680      * @throws InterruptedException if the current thread is not a
 681      * member of a ForkJoinPool and was interrupted while waiting
 682      */
 683     public final V get() throws InterruptedException, ExecutionException {
 684         int s;
 685         if (Thread.currentThread() instanceof ForkJoinWorkerThread) {
 686             quietlyJoin();
 687             s = status;
 688         }
 689         else {
 690             while ((s = status) >= 0) {
 691                 synchronized (this) { // interruptible form of awaitDone
 692                     if (UNSAFE.compareAndSwapInt(this, statusOffset,
 693                                                  s, SIGNAL)) {
 694                         while (status >= 0)
 695                             wait();
 696                     }
 697                 }
 698             }
 699         }
 700         if (s < NORMAL) {
 701             Throwable ex;
 702             if (s == CANCELLED)
 703                 throw new CancellationException();
 704             if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
 705                 throw new ExecutionException(ex);
 706         }
 707         return getRawResult();
 708     }
 709 
 710     /**
 711      * Waits if necessary for at most the given time for the computation
 712      * to complete, and then retrieves its result, if available.
 713      *
 714      * @param timeout the maximum time to wait
 715      * @param unit the time unit of the timeout argument
 716      * @return the computed result
 717      * @throws CancellationException if the computation was cancelled
 718      * @throws ExecutionException if the computation threw an
 719      * exception
 720      * @throws InterruptedException if the current thread is not a
 721      * member of a ForkJoinPool and was interrupted while waiting
 722      * @throws TimeoutException if the wait timed out
 723      */
 724     public final V get(long timeout, TimeUnit unit)
 725         throws InterruptedException, ExecutionException, TimeoutException {

 726         Thread t = Thread.currentThread();
 727         ForkJoinPool pool;
 728         if (t instanceof ForkJoinWorkerThread) {
 729             ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
 730             if (status >= 0 && w.unpushTask(this))
 731                 quietlyExec();
 732             pool = w.pool;
 733         }
 734         else
 735             pool = null;
 736         /*
 737          * Timed wait loop intermixes cases for FJ (pool != null) and
 738          * non FJ threads. For FJ, decrement pool count but don't try
 739          * for replacement; increment count on completion. For non-FJ,
 740          * deal with interrupts. This is messy, but a little less so
 741          * than is splitting the FJ and nonFJ cases.
 742          */
 743         boolean interrupted = false;
 744         boolean dec = false; // true if pool count decremented
 745         long nanos = unit.toNanos(timeout);
 746         for (;;) {
 747             if (pool == null && Thread.interrupted()) {
 748                 interrupted = true;
 749                 break;
 750             }
 751             int s = status;
 752             if (s < 0)
 753                 break;
 754             if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)) {
 755                 long startTime = System.nanoTime();
 756                 long nt; // wait time
 757                 while (status >= 0 &&
 758                        (nt = nanos - (System.nanoTime() - startTime)) > 0) {
 759                     if (pool != null && !dec)
 760                         dec = pool.tryDecrementRunningCount();
 761                     else {
 762                         long ms = nt / 1000000;
 763                         int ns = (int) (nt % 1000000);
 764                         try {
 765                             synchronized (this) {
 766                                 if (status >= 0)
 767                                     wait(ms, ns);
 768                             }
 769                         } catch (InterruptedException ie) {
 770                             if (pool != null)
 771                                 cancelIfTerminating();
 772                             else {
 773                                 interrupted = true;
 774                                 break;
 775                             }
 776                         }
 777                     }
 778                 }
 779                 break;
 780             }
 781         }
 782         if (pool != null && dec)
 783             pool.incrementRunningCount();
 784         if (interrupted)
 785             throw new InterruptedException();
 786         int es = status;
 787         if (es != NORMAL) {
 788             Throwable ex;
 789             if (es == CANCELLED)
 790                 throw new CancellationException();
 791             if (es == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
 792                 throw new ExecutionException(ex);
 793             throw new TimeoutException();
 794         }
 795         return getRawResult();
 796     }
 797 
 798     /**
 799      * Joins this task, without returning its result or throwing its
 800      * exception. This method may be useful when processing
 801      * collections of tasks when some have been cancelled or otherwise
 802      * known to have aborted.
 803      */
 804     public final void quietlyJoin() {
 805         Thread t;
 806         if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
 807             ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
 808             if (status >= 0) {
 809                 if (w.unpushTask(this)) {
 810                     boolean completed;
 811                     try {
 812                         completed = exec();
 813                     } catch (Throwable rex) {
 814                         setExceptionalCompletion(rex);
 815                         return;
 816                     }
 817                     if (completed) {
 818                         setCompletion(NORMAL);
 819                         return;
 820                     }
 821                 }
 822                 w.joinTask(this);
 823             }
 824         }
 825         else
 826             externalAwaitDone();
 827     }
 828 
 829     /**
 830      * Commences performing this task and awaits its completion if
 831      * necessary, without returning its result or throwing its
 832      * exception.
 833      */
 834     public final void quietlyInvoke() {
 835         if (status >= 0) {
 836             boolean completed;
 837             try {
 838                 completed = exec();
 839             } catch (Throwable rex) {
 840                 setExceptionalCompletion(rex);
 841                 return;
 842             }
 843             if (completed)
 844                 setCompletion(NORMAL);
 845             else
 846                 quietlyJoin();
 847         }
 848     }
 849 
 850     /**
 851      * Possibly executes tasks until the pool hosting the current task
 852      * {@link ForkJoinPool#isQuiescent is quiescent}. This method may
 853      * be of use in designs in which many tasks are forked, but none
 854      * are explicitly joined, instead executing them until all are
 855      * processed.
 856      *
 857      * <p>This method may be invoked only from within {@code
 858      * ForkJoinTask} computations (as may be determined using method
 859      * {@link #inForkJoinPool}).  Attempts to invoke in other contexts
 860      * result in exceptions or errors, possibly including {@code
 861      * ClassCastException}.
 862      */
 863     public static void helpQuiesce() {
 864         ((ForkJoinWorkerThread) Thread.currentThread())
 865             .helpQuiescePool();
 866     }
 867 
 868     /**
 869      * Resets the internal bookkeeping state of this task, allowing a
 870      * subsequent {@code fork}. This method allows repeated reuse of
 871      * this task, but only if reuse occurs when this task has either
 872      * never been forked, or has been forked, then completed and all
 873      * outstanding joins of this task have also completed. Effects
 874      * under any other usage conditions are not guaranteed.
 875      * This method may be useful when executing
 876      * pre-constructed trees of subtasks in loops.






 877      */
 878     public void reinitialize() {
 879         if (status == EXCEPTIONAL)
 880             exceptionMap.remove(this);
 881         status = 0;
 882     }
 883 
 884     /**
 885      * Returns the pool hosting the current task execution, or null
 886      * if this task is executing outside of any ForkJoinPool.
 887      *
 888      * @see #inForkJoinPool
 889      * @return the pool, or {@code null} if none
 890      */
 891     public static ForkJoinPool getPool() {
 892         Thread t = Thread.currentThread();
 893         return (t instanceof ForkJoinWorkerThread) ?
 894             ((ForkJoinWorkerThread) t).pool : null;
 895     }
 896 
 897     /**
 898      * Returns {@code true} if the current thread is executing as a
 899      * ForkJoinPool computation.
 900      *
 901      * @return {@code true} if the current thread is executing as a
 902      * ForkJoinPool computation, or false otherwise

 903      */
 904     public static boolean inForkJoinPool() {
 905         return Thread.currentThread() instanceof ForkJoinWorkerThread;
 906     }
 907 
 908     /**
 909      * Tries to unschedule this task for execution. This method will
 910      * typically succeed if this task is the most recently forked task
 911      * by the current thread, and has not commenced executing in
 912      * another thread.  This method may be useful when arranging
 913      * alternative local processing of tasks that could have been, but
 914      * were not, stolen.
 915      *
 916      * <p>This method may be invoked only from within {@code
 917      * ForkJoinTask} computations (as may be determined using method
 918      * {@link #inForkJoinPool}).  Attempts to invoke in other contexts
 919      * result in exceptions or errors, possibly including {@code
 920      * ClassCastException}.
 921      *
 922      * @return {@code true} if unforked
 923      */
 924     public boolean tryUnfork() {
 925         return ((ForkJoinWorkerThread) Thread.currentThread())
 926             .unpushTask(this);
 927     }
 928 
 929     /**
 930      * Returns an estimate of the number of tasks that have been
 931      * forked by the current worker thread but not yet executed. This
 932      * value may be useful for heuristic decisions about whether to
 933      * fork other tasks.
 934      *
 935      * <p>This method may be invoked only from within {@code
 936      * ForkJoinTask} computations (as may be determined using method
 937      * {@link #inForkJoinPool}).  Attempts to invoke in other contexts
 938      * result in exceptions or errors, possibly including {@code
 939      * ClassCastException}.
 940      *
 941      * @return the number of tasks
 942      */
 943     public static int getQueuedTaskCount() {
 944         return ((ForkJoinWorkerThread) Thread.currentThread())
 945             .getQueueSize();
 946     }
 947 
 948     /**
 949      * Returns an estimate of how many more locally queued tasks are
 950      * held by the current worker thread than there are other worker
 951      * threads that might steal them.  This value may be useful for
 952      * heuristic decisions about whether to fork other tasks. In many
 953      * usages of ForkJoinTasks, at steady state, each worker should
 954      * aim to maintain a small constant surplus (for example, 3) of
 955      * tasks, and to process computations locally if this threshold is
 956      * exceeded.
 957      *
 958      * <p>This method may be invoked only from within {@code
 959      * ForkJoinTask} computations (as may be determined using method
 960      * {@link #inForkJoinPool}).  Attempts to invoke in other contexts
 961      * result in exceptions or errors, possibly including {@code
 962      * ClassCastException}.
 963      *
 964      * @return the surplus number of tasks, which may be negative
 965      */
 966     public static int getSurplusQueuedTaskCount() {
 967         return ((ForkJoinWorkerThread) Thread.currentThread())
 968             .getEstimatedSurplusTaskCount();
 969     }
 970 
 971     // Extension methods
 972 
 973     /**
 974      * Returns the result that would be returned by {@link #join}, even
 975      * if this task completed abnormally, or {@code null} if this task
 976      * is not known to have been completed.  This method is designed
 977      * to aid debugging, as well as to support extensions. Its use in
 978      * any other context is discouraged.
 979      *
 980      * @return the result, or {@code null} if not completed
 981      */
 982     public abstract V getRawResult();
 983 
 984     /**
 985      * Forces the given value to be returned as a result.  This method
 986      * is designed to support extensions, and should not in general be
 987      * called otherwise.
 988      *
 989      * @param value the value
 990      */
 991     protected abstract void setRawResult(V value);
 992 
 993     /**
 994      * Immediately performs the base action of this task.  This method
 995      * is designed to support extensions, and should not in general be
 996      * called otherwise. The return value controls whether this task
 997      * is considered to be done normally. It may return false in
 998      * asynchronous actions that require explicit invocations of
 999      * {@link #complete} to become joinable. It may also throw an
1000      * (unchecked) exception to indicate abnormal exit.
1001      *
1002      * @return {@code true} if completed normally
1003      */
1004     protected abstract boolean exec();
1005 
1006     /**
1007      * Returns, but does not unschedule or execute, a task queued by
1008      * the current thread but not yet executed, if one is immediately
1009      * available. There is no guarantee that this task will actually
1010      * be polled or executed next. Conversely, this method may return
1011      * null even if a task exists but cannot be accessed without
1012      * contention with other threads.  This method is designed
1013      * primarily to support extensions, and is unlikely to be useful
1014      * otherwise.
1015      *
1016      * <p>This method may be invoked only from within {@code
1017      * ForkJoinTask} computations (as may be determined using method
1018      * {@link #inForkJoinPool}).  Attempts to invoke in other contexts
1019      * result in exceptions or errors, possibly including {@code
1020      * ClassCastException}.
1021      *
1022      * @return the next task, or {@code null} if none are available
1023      */
1024     protected static ForkJoinTask<?> peekNextLocalTask() {
1025         return ((ForkJoinWorkerThread) Thread.currentThread())
1026             .peekTask();
1027     }
1028 
1029     /**
1030      * Unschedules and returns, without executing, the next task
1031      * queued by the current thread but not yet executed.  This method
1032      * is designed primarily to support extensions, and is unlikely to
1033      * be useful otherwise.
1034      *
1035      * <p>This method may be invoked only from within {@code
1036      * ForkJoinTask} computations (as may be determined using method
1037      * {@link #inForkJoinPool}).  Attempts to invoke in other contexts
1038      * result in exceptions or errors, possibly including {@code
1039      * ClassCastException}.
1040      *
1041      * @return the next task, or {@code null} if none are available
1042      */
1043     protected static ForkJoinTask<?> pollNextLocalTask() {
1044         return ((ForkJoinWorkerThread) Thread.currentThread())
1045             .pollLocalTask();
1046     }
1047 
1048     /**
1049      * Unschedules and returns, without executing, the next task
1050      * queued by the current thread but not yet executed, if one is
1051      * available, or if not available, a task that was forked by some
1052      * other thread, if available. Availability may be transient, so a
1053      * {@code null} result does not necessarily imply quiescence
1054      * of the pool this task is operating in.  This method is designed
1055      * primarily to support extensions, and is unlikely to be useful
1056      * otherwise.
1057      *
1058      * <p>This method may be invoked only from within {@code
1059      * ForkJoinTask} computations (as may be determined using method
1060      * {@link #inForkJoinPool}).  Attempts to invoke in other contexts
1061      * result in exceptions or errors, possibly including {@code
1062      * ClassCastException}.
1063      *
1064      * @return a task, or {@code null} if none are available
1065      */
1066     protected static ForkJoinTask<?> pollTask() {
1067         return ((ForkJoinWorkerThread) Thread.currentThread())
1068             .pollTask();
1069     }
1070 
1071     /**
1072      * Adaptor for Runnables. This implements RunnableFuture
1073      * to be compliant with AbstractExecutorService constraints
1074      * when used in ForkJoinPool.
1075      */
1076     static final class AdaptedRunnable<T> extends ForkJoinTask<T>
1077         implements RunnableFuture<T> {
1078         final Runnable runnable;
1079         final T resultOnCompletion;
1080         T result;
1081         AdaptedRunnable(Runnable runnable, T result) {
1082             if (runnable == null) throw new NullPointerException();
1083             this.runnable = runnable;
1084             this.resultOnCompletion = result;
1085         }
1086         public T getRawResult() { return result; }
1087         public void setRawResult(T v) { result = v; }
1088         public boolean exec() {
1089             runnable.run();
1090             result = resultOnCompletion;
1091             return true;
1092         }
1093         public void run() { invoke(); }
1094         private static final long serialVersionUID = 5232453952276885070L;
1095     }
1096 
1097     /**
1098      * Adaptor for Callables
1099      */
1100     static final class AdaptedCallable<T> extends ForkJoinTask<T>
1101         implements RunnableFuture<T> {
1102         final Callable<? extends T> callable;
1103         T result;
1104         AdaptedCallable(Callable<? extends T> callable) {
1105             if (callable == null) throw new NullPointerException();
1106             this.callable = callable;
1107         }
1108         public T getRawResult() { return result; }
1109         public void setRawResult(T v) { result = v; }
1110         public boolean exec() {
1111             try {
1112                 result = callable.call();
1113                 return true;
1114             } catch (Error err) {
1115                 throw err;
1116             } catch (RuntimeException rex) {
1117                 throw rex;
1118             } catch (Exception ex) {
1119                 throw new RuntimeException(ex);
1120             }
1121         }
1122         public void run() { invoke(); }
1123         private static final long serialVersionUID = 2838392045355241008L;
1124     }
1125 
1126     /**
1127      * Returns a new {@code ForkJoinTask} that performs the {@code run}
1128      * method of the given {@code Runnable} as its action, and returns
1129      * a null result upon {@link #join}.
1130      *
1131      * @param runnable the runnable action
1132      * @return the task
1133      */
1134     public static ForkJoinTask<?> adapt(Runnable runnable) {
1135         return new AdaptedRunnable<Void>(runnable, null);
1136     }
1137 
1138     /**
1139      * Returns a new {@code ForkJoinTask} that performs the {@code run}
1140      * method of the given {@code Runnable} as its action, and returns
1141      * the given result upon {@link #join}.
1142      *
1143      * @param runnable the runnable action
1144      * @param result the result upon completion
1145      * @return the task
1146      */
1147     public static <T> ForkJoinTask<T> adapt(Runnable runnable, T result) {
1148         return new AdaptedRunnable<T>(runnable, result);
1149     }
1150 
1151     /**
1152      * Returns a new {@code ForkJoinTask} that performs the {@code call}
1153      * method of the given {@code Callable} as its action, and returns
1154      * its result upon {@link #join}, translating any checked exceptions
1155      * encountered into {@code RuntimeException}.
1156      *
1157      * @param callable the callable action
1158      * @return the task
1159      */
1160     public static <T> ForkJoinTask<T> adapt(Callable<? extends T> callable) {
1161         return new AdaptedCallable<T>(callable);
1162     }
1163 
1164     // Serialization support
1165 
1166     private static final long serialVersionUID = -7721805057305804111L;
1167 
1168     /**
1169      * Saves the state to a stream (that is, serializes it).
1170      *
1171      * @serialData the current run status and the exception thrown
1172      * during execution, or {@code null} if none
1173      * @param s the stream
1174      */
1175     private void writeObject(java.io.ObjectOutputStream s)
1176         throws java.io.IOException {
1177         s.defaultWriteObject();
1178         s.writeObject(getException());
1179     }
1180 
1181     /**
1182      * Reconstitutes the instance from a stream (that is, deserializes it).
1183      *
1184      * @param s the stream
1185      */
1186     private void readObject(java.io.ObjectInputStream s)
1187         throws java.io.IOException, ClassNotFoundException {
1188         s.defaultReadObject();
1189         Object ex = s.readObject();
1190         if (ex != null)
1191             setExceptionalCompletion((Throwable) ex);
1192     }
1193 
1194     // Unsafe mechanics
1195 
1196     private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
1197     private static final long statusOffset =
1198         objectFieldOffset("status", ForkJoinTask.class);
1199 
1200     private static long objectFieldOffset(String field, Class<?> klazz) {
1201         try {
1202             return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
1203         } catch (NoSuchFieldException e) {
1204             // Convert Exception to corresponding Error
1205             NoSuchFieldError error = new NoSuchFieldError(field);
1206             error.initCause(e);
1207             throw error;
1208         }
1209     }
1210 }
--- EOF ---