1 /*
   2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   3  *
   4  * This code is free software; you can redistribute it and/or modify it
   5  * under the terms of the GNU General Public License version 2 only, as
   6  * published by the Free Software Foundation.  Oracle designates this
   7  * particular file as subject to the "Classpath" exception as provided
   8  * by Oracle in the LICENSE file that accompanied this code.
   9  *
  10  * This code is distributed in the hope that it will be useful, but WITHOUT
  11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  13  * version 2 for more details (a copy is included in the LICENSE file that
  14  * accompanied this code).
  15  *
  16  * You should have received a copy of the GNU General Public License version
  17  * 2 along with this work; if not, write to the Free Software Foundation,
  18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  19  *
  20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  21  * or visit www.oracle.com if you need additional information or have any
  22  * questions.
  23  */
  24 
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/publicdomain/zero/1.0/
  34  */
  35 
  36 package java.util.concurrent;
  37 
  38 import java.util.concurrent.locks.LockSupport;
  39 
  40 /**
  41  * A cancellable asynchronous computation.  This class provides a base
  42  * implementation of {@link Future}, with methods to start and cancel
  43  * a computation, query to see if the computation is complete, and
  44  * retrieve the result of the computation.  The result can only be
  45  * retrieved when the computation has completed; the {@code get}
  46  * methods will block if the computation has not yet completed.  Once
  47  * the computation has completed, the computation cannot be restarted
  48  * or cancelled (unless the computation is invoked using
  49  * {@link #runAndReset}).
  50  *
  51  * <p>A {@code FutureTask} can be used to wrap a {@link Callable} or
  52  * {@link Runnable} object.  Because {@code FutureTask} implements
  53  * {@code Runnable}, a {@code FutureTask} can be submitted to an
  54  * {@link Executor} for execution.
  55  *
  56  * <p>In addition to serving as a standalone class, this class provides
  57  * {@code protected} functionality that may be useful when creating
  58  * customized task classes.
  59  *
  60  * @since 1.5
  61  * @author Doug Lea
  62  * @param <V> The result type returned by this FutureTask's {@code get} methods
  63  */
  64 public class FutureTask<V> implements RunnableFuture<V> {
  65     /*
  66      * Revision notes: This differs from previous versions of this
  67      * class that relied on AbstractQueuedSynchronizer, mainly to
  68      * avoid surprising users about retaining interrupt status during
  69      * cancellation races. Sync control in the current design relies
  70      * on a "state" field updated via CAS to track completion, along
  71      * with a simple Treiber stack to hold waiting threads.
  72      *
  73      * Style note: As usual, we bypass overhead of using
  74      * AtomicXFieldUpdaters and instead directly use Unsafe intrinsics.
  75      */
  76 
  77     /**
  78      * The run state of this task, initially NEW.  The run state
  79      * transitions to a terminal state only in methods set,
  80      * setException, and cancel.  During completion, state may take on
  81      * transient values of COMPLETING (while outcome is being set) or
  82      * INTERRUPTING (only while interrupting the runner to satisfy a
  83      * cancel(true)). Transitions from these intermediate to final
  84      * states use cheaper ordered/lazy writes because values are unique
  85      * and cannot be further modified.
  86      *
  87      * Possible state transitions:
  88      * NEW -> COMPLETING -> NORMAL
  89      * NEW -> COMPLETING -> EXCEPTIONAL
  90      * NEW -> CANCELLED
  91      * NEW -> INTERRUPTING -> INTERRUPTED
  92      */
  93     private volatile int state;
  94     private static final int NEW          = 0;
  95     private static final int COMPLETING   = 1;
  96     private static final int NORMAL       = 2;
  97     private static final int EXCEPTIONAL  = 3;
  98     private static final int CANCELLED    = 4;
  99     private static final int INTERRUPTING = 5;
 100     private static final int INTERRUPTED  = 6;
 101 
 102     /** The underlying callable; nulled out after running */
 103     private Callable<V> callable;
 104     /** The result to return or exception to throw from get() */
 105     private Object outcome; // non-volatile, protected by state reads/writes
 106     /** The thread running the callable; CASed during run() */
 107     private volatile Thread runner;
 108     /** Treiber stack of waiting threads */
 109     private volatile WaitNode waiters;
 110 
 111     /**
 112      * Returns result or throws exception for completed task.
 113      *
 114      * @param s completed state value
 115      */
 116     @SuppressWarnings("unchecked")
 117     private V report(int s) throws ExecutionException {
 118         Object x = outcome;
 119         if (s == NORMAL)
 120             return (V)x;
 121         if (s >= CANCELLED)
 122             throw new CancellationException();
 123         throw new ExecutionException((Throwable)x);
 124     }
 125 
 126     /**
 127      * Creates a {@code FutureTask} that will, upon running, execute the
 128      * given {@code Callable}.
 129      *
 130      * @param  callable the callable task
 131      * @throws NullPointerException if the callable is null
 132      */
 133     public FutureTask(Callable<V> callable) {
 134         if (callable == null)
 135             throw new NullPointerException();
 136         this.callable = callable;
 137         this.state = NEW;       // ensure visibility of callable
 138     }
 139 
 140     /**
 141      * Creates a {@code FutureTask} that will, upon running, execute the
 142      * given {@code Runnable}, and arrange that {@code get} will return the
 143      * given result on successful completion.
 144      *
 145      * @param runnable the runnable task
 146      * @param result the result to return on successful completion. If
 147      * you don't need a particular result, consider using
 148      * constructions of the form:
 149      * {@code Future<?> f = new FutureTask<Void>(runnable, null)}
 150      * @throws NullPointerException if the runnable is null
 151      */
 152     public FutureTask(Runnable runnable, V result) {
 153         this.callable = Executors.callable(runnable, result);
 154         this.state = NEW;       // ensure visibility of callable
 155     }
 156 
 157     public boolean isCancelled() {
 158         return state >= CANCELLED;
 159     }
 160 
 161     public boolean isDone() {
 162         return state != NEW;
 163     }
 164 
 165     public boolean cancel(boolean mayInterruptIfRunning) {
 166         if (!(state == NEW &&
 167               U.compareAndSwapInt(this, STATE, NEW,
 168                   mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
 169             return false;
 170         try {    // in case call to interrupt throws exception
 171             if (mayInterruptIfRunning) {
 172                 try {
 173                     Thread t = runner;
 174                     if (t != null)
 175                         t.interrupt();
 176                 } finally { // final state
 177                     U.putOrderedInt(this, STATE, INTERRUPTED);
 178                 }
 179             }
 180         } finally {
 181             finishCompletion();
 182         }
 183         return true;
 184     }
 185 
 186     /**
 187      * @throws CancellationException {@inheritDoc}
 188      */
 189     public V get() throws InterruptedException, ExecutionException {
 190         int s = state;
 191         if (s <= COMPLETING)
 192             s = awaitDone(false, 0L);
 193         return report(s);
 194     }
 195 
 196     /**
 197      * @throws CancellationException {@inheritDoc}
 198      */
 199     public V get(long timeout, TimeUnit unit)
 200         throws InterruptedException, ExecutionException, TimeoutException {
 201         if (unit == null)
 202             throw new NullPointerException();
 203         int s = state;
 204         if (s <= COMPLETING &&
 205             (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
 206             throw new TimeoutException();
 207         return report(s);
 208     }
 209 
 210     /**
 211      * Protected method invoked when this task transitions to state
 212      * {@code isDone} (whether normally or via cancellation). The
 213      * default implementation does nothing.  Subclasses may override
 214      * this method to invoke completion callbacks or perform
 215      * bookkeeping. Note that you can query status inside the
 216      * implementation of this method to determine whether this task
 217      * has been cancelled.
 218      */
 219     protected void done() { }
 220 
 221     /**
 222      * Sets the result of this future to the given value unless
 223      * this future has already been set or has been cancelled.
 224      *
 225      * <p>This method is invoked internally by the {@link #run} method
 226      * upon successful completion of the computation.
 227      *
 228      * @param v the value
 229      */
 230     protected void set(V v) {
 231         if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
 232             outcome = v;
 233             U.putOrderedInt(this, STATE, NORMAL); // final state
 234             finishCompletion();
 235         }
 236     }
 237 
 238     /**
 239      * Causes this future to report an {@link ExecutionException}
 240      * with the given throwable as its cause, unless this future has
 241      * already been set or has been cancelled.
 242      *
 243      * <p>This method is invoked internally by the {@link #run} method
 244      * upon failure of the computation.
 245      *
 246      * @param t the cause of failure
 247      */
 248     protected void setException(Throwable t) {
 249         if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
 250             outcome = t;
 251             U.putOrderedInt(this, STATE, EXCEPTIONAL); // final state
 252             finishCompletion();
 253         }
 254     }
 255 
 256     public void run() {
 257         if (state != NEW ||
 258             !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
 259             return;
 260         try {
 261             Callable<V> c = callable;
 262             if (c != null && state == NEW) {
 263                 V result;
 264                 boolean ran;
 265                 try {
 266                     result = c.call();
 267                     ran = true;
 268                 } catch (Throwable ex) {
 269                     result = null;
 270                     ran = false;
 271                     setException(ex);
 272                 }
 273                 if (ran)
 274                     set(result);
 275             }
 276         } finally {
 277             // runner must be non-null until state is settled to
 278             // prevent concurrent calls to run()
 279             runner = null;
 280             // state must be re-read after nulling runner to prevent
 281             // leaked interrupts
 282             int s = state;
 283             if (s >= INTERRUPTING)
 284                 handlePossibleCancellationInterrupt(s);
 285         }
 286     }
 287 
 288     /**
 289      * Executes the computation without setting its result, and then
 290      * resets this future to initial state, failing to do so if the
 291      * computation encounters an exception or is cancelled.  This is
 292      * designed for use with tasks that intrinsically execute more
 293      * than once.
 294      *
 295      * @return {@code true} if successfully run and reset
 296      */
 297     protected boolean runAndReset() {
 298         if (state != NEW ||
 299             !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
 300             return false;
 301         boolean ran = false;
 302         int s = state;
 303         try {
 304             Callable<V> c = callable;
 305             if (c != null && s == NEW) {
 306                 try {
 307                     c.call(); // don't set result
 308                     ran = true;
 309                 } catch (Throwable ex) {
 310                     setException(ex);
 311                 }
 312             }
 313         } finally {
 314             // runner must be non-null until state is settled to
 315             // prevent concurrent calls to run()
 316             runner = null;
 317             // state must be re-read after nulling runner to prevent
 318             // leaked interrupts
 319             s = state;
 320             if (s >= INTERRUPTING)
 321                 handlePossibleCancellationInterrupt(s);
 322         }
 323         return ran && s == NEW;
 324     }
 325 
 326     /**
 327      * Ensures that any interrupt from a possible cancel(true) is only
 328      * delivered to a task while in run or runAndReset.
 329      */
 330     private void handlePossibleCancellationInterrupt(int s) {
 331         // It is possible for our interrupter to stall before getting a
 332         // chance to interrupt us.  Let's spin-wait patiently.
 333         if (s == INTERRUPTING)
 334             while (state == INTERRUPTING)
 335                 Thread.yield(); // wait out pending interrupt
 336 
 337         // assert state == INTERRUPTED;
 338 
 339         // We want to clear any interrupt we may have received from
 340         // cancel(true).  However, it is permissible to use interrupts
 341         // as an independent mechanism for a task to communicate with
 342         // its caller, and there is no way to clear only the
 343         // cancellation interrupt.
 344         //
 345         // Thread.interrupted();
 346     }
 347 
 348     /**
 349      * Simple linked list nodes to record waiting threads in a Treiber
 350      * stack.  See other classes such as Phaser and SynchronousQueue
 351      * for more detailed explanation.
 352      */
 353     static final class WaitNode {
 354         volatile Thread thread;
 355         volatile WaitNode next;
 356         WaitNode() { thread = Thread.currentThread(); }
 357     }
 358 
 359     /**
 360      * Removes and signals all waiting threads, invokes done(), and
 361      * nulls out callable.
 362      */
 363     private void finishCompletion() {
 364         // assert state > COMPLETING;
 365         for (WaitNode q; (q = waiters) != null;) {
 366             if (U.compareAndSwapObject(this, WAITERS, q, null)) {
 367                 for (;;) {
 368                     Thread t = q.thread;
 369                     if (t != null) {
 370                         q.thread = null;
 371                         LockSupport.unpark(t);
 372                     }
 373                     WaitNode next = q.next;
 374                     if (next == null)
 375                         break;
 376                     q.next = null; // unlink to help gc
 377                     q = next;
 378                 }
 379                 break;
 380             }
 381         }
 382 
 383         done();
 384 
 385         callable = null;        // to reduce footprint
 386     }
 387 
 388     /**
 389      * Awaits completion or aborts on interrupt or timeout.
 390      *
 391      * @param timed true if use timed waits
 392      * @param nanos time to wait, if timed
 393      * @return state upon completion or at timeout
 394      */
 395     private int awaitDone(boolean timed, long nanos)
 396         throws InterruptedException {
 397         // The code below is very delicate, to achieve these goals:
 398         // - call nanoTime exactly once for each call to park
 399         // - if nanos <= 0L, return promptly without allocation or nanoTime
 400         // - if nanos == Long.MIN_VALUE, don't underflow
 401         // - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
 402         //   and we suffer a spurious wakeup, we will do no worse than
 403         //   to park-spin for a while
 404         long startTime = 0L;    // Special value 0L means not yet parked
 405         WaitNode q = null;
 406         boolean queued = false;
 407         for (;;) {
 408             int s = state;
 409             if (s > COMPLETING) {
 410                 if (q != null)
 411                     q.thread = null;
 412                 return s;
 413             }
 414             else if (s == COMPLETING)
 415                 // We may have already promised (via isDone) that we are done
 416                 // so never return empty-handed or throw InterruptedException
 417                 Thread.yield();
 418             else if (Thread.interrupted()) {
 419                 removeWaiter(q);
 420                 throw new InterruptedException();
 421             }
 422             else if (q == null) {
 423                 if (timed && nanos <= 0L)
 424                     return s;
 425                 q = new WaitNode();
 426             }
 427             else if (!queued)
 428                 queued = U.compareAndSwapObject(this, WAITERS,
 429                                                 q.next = waiters, q);
 430             else if (timed) {
 431                 final long parkNanos;
 432                 if (startTime == 0L) { // first time
 433                     startTime = System.nanoTime();
 434                     if (startTime == 0L)
 435                         startTime = 1L;
 436                     parkNanos = nanos;
 437                 } else {
 438                     long elapsed = System.nanoTime() - startTime;
 439                     if (elapsed >= nanos) {
 440                         removeWaiter(q);
 441                         return state;
 442                     }
 443                     parkNanos = nanos - elapsed;
 444                 }
 445                 // nanoTime may be slow; recheck before parking
 446                 if (state < COMPLETING)
 447                     LockSupport.parkNanos(this, parkNanos);
 448             }
 449             else
 450                 LockSupport.park(this);
 451         }
 452     }
 453 
 454     /**
 455      * Tries to unlink a timed-out or interrupted wait node to avoid
 456      * accumulating garbage.  Internal nodes are simply unspliced
 457      * without CAS since it is harmless if they are traversed anyway
 458      * by releasers.  To avoid effects of unsplicing from already
 459      * removed nodes, the list is retraversed in case of an apparent
 460      * race.  This is slow when there are a lot of nodes, but we don't
 461      * expect lists to be long enough to outweigh higher-overhead
 462      * schemes.
 463      */
 464     private void removeWaiter(WaitNode node) {
 465         if (node != null) {
 466             node.thread = null;
 467             retry:
 468             for (;;) {          // restart on removeWaiter race
 469                 for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
 470                     s = q.next;
 471                     if (q.thread != null)
 472                         pred = q;
 473                     else if (pred != null) {
 474                         pred.next = s;
 475                         if (pred.thread == null) // check for race
 476                             continue retry;
 477                     }
 478                     else if (!U.compareAndSwapObject(this, WAITERS, q, s))
 479                         continue retry;
 480                 }
 481                 break;
 482             }
 483         }
 484     }
 485 
 486     // Unsafe mechanics
 487     private static final jdk.internal.misc.Unsafe U = jdk.internal.misc.Unsafe.getUnsafe();
 488     private static final long STATE;
 489     private static final long RUNNER;
 490     private static final long WAITERS;
 491     static {
 492         try {
 493             STATE = U.objectFieldOffset
 494                 (FutureTask.class.getDeclaredField("state"));
 495             RUNNER = U.objectFieldOffset
 496                 (FutureTask.class.getDeclaredField("runner"));
 497             WAITERS = U.objectFieldOffset
 498                 (FutureTask.class.getDeclaredField("waiters"));
 499         } catch (ReflectiveOperationException e) {
 500             throw new Error(e);
 501         }
 502 
 503         // Reduce the risk of rare disastrous classloading in first call to
 504         // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
 505         Class<?> ensureLoaded = LockSupport.class;
 506     }
 507 
 508 }