1 /*
   2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   3  *
   4  * This code is free software; you can redistribute it and/or modify it
   5  * under the terms of the GNU General Public License version 2 only, as
   6  * published by the Free Software Foundation.  Oracle designates this
   7  * particular file as subject to the "Classpath" exception as provided
   8  * by Oracle in the LICENSE file that accompanied this code.
   9  *
  10  * This code is distributed in the hope that it will be useful, but WITHOUT
  11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  13  * version 2 for more details (a copy is included in the LICENSE file that
  14  * accompanied this code).
  15  *
  16  * You should have received a copy of the GNU General Public License version
  17  * 2 along with this work; if not, write to the Free Software Foundation,
  18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  19  *
  20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  21  * or visit www.oracle.com if you need additional information or have any
  22  * questions.
  23  */
  24 
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/publicdomain/zero/1.0/
  34  */
  35 
  36 package java.util.concurrent;
  37 import java.util.concurrent.locks.LockSupport;
  38 
  39 /**
  40  * A cancellable asynchronous computation.  This class provides a base
  41  * implementation of {@link Future}, with methods to start and cancel
  42  * a computation, query to see if the computation is complete, and
  43  * retrieve the result of the computation.  The result can only be
  44  * retrieved when the computation has completed; the {@code get}
  45  * methods will block if the computation has not yet completed.  Once
  46  * the computation has completed, the computation cannot be restarted
  47  * or cancelled (unless the computation is invoked using
  48  * {@link #runAndReset}).
  49  *
  50  * <p>A {@code FutureTask} can be used to wrap a {@link Callable} or
  51  * {@link Runnable} object.  Because {@code FutureTask} implements
  52  * {@code Runnable}, a {@code FutureTask} can be submitted to an
  53  * {@link Executor} for execution.
  54  *
  55  * <p>In addition to serving as a standalone class, this class provides
  56  * {@code protected} functionality that may be useful when creating
  57  * customized task classes.
  58  *
  59  * @since 1.5
  60  * @author Doug Lea
  61  * @param <V> The result type returned by this FutureTask's {@code get} methods
  62  */
  63 public class FutureTask<V> implements RunnableFuture<V> {
  64     /*
  65      * Revision notes: This differs from previous versions of this
  66      * class that relied on AbstractQueuedSynchronizer, mainly to
  67      * avoid surprising users about retaining interrupt status during
  68      * cancellation races. Sync control in the current design relies
  69      * on a "state" field updated via CAS to track completion, along
  70      * with a simple Treiber stack to hold waiting threads.
  71      *
  72      * Style note: As usual, we bypass overhead of using
  73      * AtomicXFieldUpdaters and instead directly use Unsafe intrinsics.
  74      */
  75 
  76     /**
  77      * The run state of this task, initially NEW.  The run state
  78      * transitions to a terminal state only in methods set,
  79      * setException, and cancel.  During completion, state may take on
  80      * transient values of COMPLETING (while outcome is being set) or
  81      * INTERRUPTING (only while interrupting the runner to satisfy a
  82      * cancel(true)). Transitions from these intermediate to final
  83      * states use cheaper ordered/lazy writes because values are unique
  84      * and cannot be further modified.
  85      *
  86      * Possible state transitions:
  87      * NEW -> COMPLETING -> NORMAL
  88      * NEW -> COMPLETING -> EXCEPTIONAL
  89      * NEW -> CANCELLED
  90      * NEW -> INTERRUPTING -> INTERRUPTED
  91      */
  92     private volatile int state;
  93     private static final int NEW          = 0;
  94     private static final int COMPLETING   = 1;
  95     private static final int NORMAL       = 2;
  96     private static final int EXCEPTIONAL  = 3;
  97     private static final int CANCELLED    = 4;
  98     private static final int INTERRUPTING = 5;
  99     private static final int INTERRUPTED  = 6;
 100 
 101     /** The underlying callable; nulled out after running */
 102     private Callable<V> callable;
 103     /** The result to return or exception to throw from get() */
 104     private Object outcome; // non-volatile, protected by state reads/writes
 105     /** The thread running the callable; CASed during run() */
 106     private volatile Thread runner;
 107     /** Treiber stack of waiting threads */
 108     private volatile WaitNode waiters;
 109 
 110     /**
 111      * Returns result or throws exception for completed task.
 112      *
 113      * @param s completed state value
 114      */
 115     @SuppressWarnings("unchecked")
 116     private V report(int s) throws ExecutionException {
 117         Object x = outcome;
 118         if (s == NORMAL)
 119             return (V)x;
 120         if (s >= CANCELLED)
 121             throw new CancellationException();
 122         throw new ExecutionException((Throwable)x);
 123     }
 124 
 125     /**
 126      * Creates a {@code FutureTask} that will, upon running, execute the
 127      * given {@code Callable}.
 128      *
 129      * @param  callable the callable task
 130      * @throws NullPointerException if the callable is null
 131      */
 132     public FutureTask(Callable<V> callable) {
 133         if (callable == null)
 134             throw new NullPointerException();
 135         this.callable = callable;
 136         this.state = NEW;       // ensure visibility of callable
 137     }
 138 
 139     /**
 140      * Creates a {@code FutureTask} that will, upon running, execute the
 141      * given {@code Runnable}, and arrange that {@code get} will return the
 142      * given result on successful completion.
 143      *
 144      * @param runnable the runnable task
 145      * @param result the result to return on successful completion. If
 146      * you don't need a particular result, consider using
 147      * constructions of the form:
 148      * {@code Future<?> f = new FutureTask<Void>(runnable, null)}
 149      * @throws NullPointerException if the runnable is null
 150      */
 151     public FutureTask(Runnable runnable, V result) {
 152         this.callable = Executors.callable(runnable, result);
 153         this.state = NEW;       // ensure visibility of callable
 154     }
 155 
 156     public boolean isCancelled() {
 157         return state >= CANCELLED;
 158     }
 159 
 160     public boolean isDone() {
 161         return state != NEW;
 162     }
 163 
 164     public boolean cancel(boolean mayInterruptIfRunning) {
 165         if (!(state == NEW &&
 166               U.compareAndSwapInt(this, STATE, NEW,
 167                   mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
 168             return false;
 169         try {    // in case call to interrupt throws exception
 170             if (mayInterruptIfRunning) {
 171                 try {
 172                     Thread t = runner;
 173                     if (t != null)
 174                         t.interrupt();
 175                 } finally { // final state
 176                     U.putOrderedInt(this, STATE, INTERRUPTED);
 177                 }
 178             }
 179         } finally {
 180             finishCompletion();
 181         }
 182         return true;
 183     }
 184 
 185     /**
 186      * @throws CancellationException {@inheritDoc}
 187      */
 188     public V get() throws InterruptedException, ExecutionException {
 189         int s = state;
 190         if (s <= COMPLETING)
 191             s = awaitDone(false, 0L);
 192         return report(s);
 193     }
 194 
 195     /**
 196      * @throws CancellationException {@inheritDoc}
 197      */
 198     public V get(long timeout, TimeUnit unit)
 199         throws InterruptedException, ExecutionException, TimeoutException {
 200         if (unit == null)
 201             throw new NullPointerException();
 202         int s = state;
 203         if (s <= COMPLETING &&
 204             (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
 205             throw new TimeoutException();
 206         return report(s);
 207     }
 208 
 209     /**
 210      * Protected method invoked when this task transitions to state
 211      * {@code isDone} (whether normally or via cancellation). The
 212      * default implementation does nothing.  Subclasses may override
 213      * this method to invoke completion callbacks or perform
 214      * bookkeeping. Note that you can query status inside the
 215      * implementation of this method to determine whether this task
 216      * has been cancelled.
 217      */
 218     protected void done() { }
 219 
 220     /**
 221      * Sets the result of this future to the given value unless
 222      * this future has already been set or has been cancelled.
 223      *
 224      * <p>This method is invoked internally by the {@link #run} method
 225      * upon successful completion of the computation.
 226      *
 227      * @param v the value
 228      */
 229     protected void set(V v) {
 230         if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
 231             outcome = v;
 232             U.putOrderedInt(this, STATE, NORMAL); // final state
 233             finishCompletion();
 234         }
 235     }
 236 
 237     /**
 238      * Causes this future to report an {@link ExecutionException}
 239      * with the given throwable as its cause, unless this future has
 240      * already been set or has been cancelled.
 241      *
 242      * <p>This method is invoked internally by the {@link #run} method
 243      * upon failure of the computation.
 244      *
 245      * @param t the cause of failure
 246      */
 247     protected void setException(Throwable t) {
 248         if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
 249             outcome = t;
 250             U.putOrderedInt(this, STATE, EXCEPTIONAL); // final state
 251             finishCompletion();
 252         }
 253     }
 254 
 255     public void run() {
 256         if (state != NEW ||
 257             !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
 258             return;
 259         try {
 260             Callable<V> c = callable;
 261             if (c != null && state == NEW) {
 262                 V result;
 263                 boolean ran;
 264                 try {
 265                     result = c.call();
 266                     ran = true;
 267                 } catch (Throwable ex) {
 268                     result = null;
 269                     ran = false;
 270                     setException(ex);
 271                 }
 272                 if (ran)
 273                     set(result);
 274             }
 275         } finally {
 276             // runner must be non-null until state is settled to
 277             // prevent concurrent calls to run()
 278             runner = null;
 279             // state must be re-read after nulling runner to prevent
 280             // leaked interrupts
 281             int s = state;
 282             if (s >= INTERRUPTING)
 283                 handlePossibleCancellationInterrupt(s);
 284         }
 285     }
 286 
 287     /**
 288      * Executes the computation without setting its result, and then
 289      * resets this future to initial state, failing to do so if the
 290      * computation encounters an exception or is cancelled.  This is
 291      * designed for use with tasks that intrinsically execute more
 292      * than once.
 293      *
 294      * @return {@code true} if successfully run and reset
 295      */
 296     protected boolean runAndReset() {
 297         if (state != NEW ||
 298             !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
 299             return false;
 300         boolean ran = false;
 301         int s = state;
 302         try {
 303             Callable<V> c = callable;
 304             if (c != null && s == NEW) {
 305                 try {
 306                     c.call(); // don't set result
 307                     ran = true;
 308                 } catch (Throwable ex) {
 309                     setException(ex);
 310                 }
 311             }
 312         } finally {
 313             // runner must be non-null until state is settled to
 314             // prevent concurrent calls to run()
 315             runner = null;
 316             // state must be re-read after nulling runner to prevent
 317             // leaked interrupts
 318             s = state;
 319             if (s >= INTERRUPTING)
 320                 handlePossibleCancellationInterrupt(s);
 321         }
 322         return ran && s == NEW;
 323     }
 324 
 325     /**
 326      * Ensures that any interrupt from a possible cancel(true) is only
 327      * delivered to a task while in run or runAndReset.
 328      */
 329     private void handlePossibleCancellationInterrupt(int s) {
 330         // It is possible for our interrupter to stall before getting a
 331         // chance to interrupt us.  Let's spin-wait patiently.
 332         if (s == INTERRUPTING)
 333             while (state == INTERRUPTING)
 334                 Thread.yield(); // wait out pending interrupt
 335 
 336         // assert state == INTERRUPTED;
 337 
 338         // We want to clear any interrupt we may have received from
 339         // cancel(true).  However, it is permissible to use interrupts
 340         // as an independent mechanism for a task to communicate with
 341         // its caller, and there is no way to clear only the
 342         // cancellation interrupt.
 343         //
 344         // Thread.interrupted();
 345     }
 346 
 347     /**
 348      * Simple linked list nodes to record waiting threads in a Treiber
 349      * stack.  See other classes such as Phaser and SynchronousQueue
 350      * for more detailed explanation.
 351      */
 352     static final class WaitNode {
 353         volatile Thread thread;
 354         volatile WaitNode next;
 355         WaitNode() { thread = Thread.currentThread(); }
 356     }
 357 
 358     /**
 359      * Removes and signals all waiting threads, invokes done(), and
 360      * nulls out callable.
 361      */
 362     private void finishCompletion() {
 363         // assert state > COMPLETING;
 364         for (WaitNode q; (q = waiters) != null;) {
 365             if (U.compareAndSwapObject(this, WAITERS, q, null)) {
 366                 for (;;) {
 367                     Thread t = q.thread;
 368                     if (t != null) {
 369                         q.thread = null;
 370                         LockSupport.unpark(t);
 371                     }
 372                     WaitNode next = q.next;
 373                     if (next == null)
 374                         break;
 375                     q.next = null; // unlink to help gc
 376                     q = next;
 377                 }
 378                 break;
 379             }
 380         }
 381 
 382         done();
 383 
 384         callable = null;        // to reduce footprint
 385     }
 386 
 387     /**
 388      * Awaits completion or aborts on interrupt or timeout.
 389      *
 390      * @param timed true if use timed waits
 391      * @param nanos time to wait, if timed
 392      * @return state upon completion or at timeout
 393      */
 394     private int awaitDone(boolean timed, long nanos)
 395         throws InterruptedException {
 396         // The code below is very delicate, to achieve these goals:
 397         // - call nanoTime exactly once for each call to park
 398         // - if nanos <= 0, return promptly without allocation or nanoTime
 399         // - if nanos == Long.MIN_VALUE, don't underflow
 400         // - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
 401         //   and we suffer a spurious wakeup, we will do no worse than
 402         //   to park-spin for a while
 403         long startTime = 0L;    // Special value 0L means not yet parked
 404         WaitNode q = null;
 405         boolean queued = false;
 406         for (;;) {
 407             if (Thread.interrupted()) {
 408                 removeWaiter(q);
 409                 throw new InterruptedException();
 410             }
 411 
 412             int s = state;
 413             if (s > COMPLETING) {
 414                 if (q != null)
 415                     q.thread = null;
 416                 return s;
 417             }
 418             else if (s == COMPLETING) // cannot time out yet
 419                 Thread.yield();
 420             else if (q == null) {
 421                 if (timed && nanos <= 0L)
 422                     return s;
 423                 q = new WaitNode();
 424             }
 425             else if (!queued)
 426                 queued = U.compareAndSwapObject(this, WAITERS,
 427                                                 q.next = waiters, q);
 428             else if (timed) {
 429                 final long parkNanos;
 430                 if (startTime == 0L) { // first time
 431                     startTime = System.nanoTime();
 432                     if (startTime == 0L)
 433                         startTime = 1L;
 434                     parkNanos = nanos;
 435                 } else {
 436                     long elapsed = System.nanoTime() - startTime;
 437                     if (elapsed >= nanos) {
 438                         removeWaiter(q);
 439                         return state;
 440                     }
 441                     parkNanos = nanos - elapsed;
 442                 }
 443                 LockSupport.parkNanos(this, parkNanos);
 444             }
 445             else
 446                 LockSupport.park(this);
 447         }
 448     }
 449 
 450     /**
 451      * Tries to unlink a timed-out or interrupted wait node to avoid
 452      * accumulating garbage.  Internal nodes are simply unspliced
 453      * without CAS since it is harmless if they are traversed anyway
 454      * by releasers.  To avoid effects of unsplicing from already
 455      * removed nodes, the list is retraversed in case of an apparent
 456      * race.  This is slow when there are a lot of nodes, but we don't
 457      * expect lists to be long enough to outweigh higher-overhead
 458      * schemes.
 459      */
 460     private void removeWaiter(WaitNode node) {
 461         if (node != null) {
 462             node.thread = null;
 463             retry:
 464             for (;;) {          // restart on removeWaiter race
 465                 for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
 466                     s = q.next;
 467                     if (q.thread != null)
 468                         pred = q;
 469                     else if (pred != null) {
 470                         pred.next = s;
 471                         if (pred.thread == null) // check for race
 472                             continue retry;
 473                     }
 474                     else if (!U.compareAndSwapObject(this, WAITERS, q, s))
 475                         continue retry;
 476                 }
 477                 break;
 478             }
 479         }
 480     }
 481 
 482     // Unsafe mechanics
 483     private static final sun.misc.Unsafe U;
 484     private static final long STATE;
 485     private static final long RUNNER;
 486     private static final long WAITERS;
 487     static {
 488         try {
 489             U = sun.misc.Unsafe.getUnsafe();
 490             Class<?> k = FutureTask.class;
 491             STATE   = U.objectFieldOffset(k.getDeclaredField("state"));
 492             RUNNER  = U.objectFieldOffset(k.getDeclaredField("runner"));
 493             WAITERS = U.objectFieldOffset(k.getDeclaredField("waiters"));
 494         } catch (Exception e) {
 495             throw new Error(e);
 496         }
 497     }
 498 
 499 }