1 /*
   2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   3  *
   4  * This code is free software; you can redistribute it and/or modify it
   5  * under the terms of the GNU General Public License version 2 only, as
   6  * published by the Free Software Foundation.  Oracle designates this
   7  * particular file as subject to the "Classpath" exception as provided
   8  * by Oracle in the LICENSE file that accompanied this code.
   9  *
  10  * This code is distributed in the hope that it will be useful, but WITHOUT
  11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  13  * version 2 for more details (a copy is included in the LICENSE file that
  14  * accompanied this code).
  15  *
  16  * You should have received a copy of the GNU General Public License version
  17  * 2 along with this work; if not, write to the Free Software Foundation,
  18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  19  *
  20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  21  * or visit www.oracle.com if you need additional information or have any
  22  * questions.
  23  */
  24 
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/publicdomain/zero/1.0/
  34  */
  35 
  36 package java.util.concurrent;
  37 import java.util.concurrent.locks.LockSupport;
  38 
  39 /**
  40  * A cancellable asynchronous computation.  This class provides a base
  41  * implementation of {@link Future}, with methods to start and cancel
  42  * a computation, query to see if the computation is complete, and
  43  * retrieve the result of the computation.  The result can only be
  44  * retrieved when the computation has completed; the {@code get}
  45  * methods will block if the computation has not yet completed.  Once
  46  * the computation has completed, the computation cannot be restarted
  47  * or cancelled (unless the computation is invoked using
  48  * {@link #runAndReset}).
  49  *
  50  * <p>A {@code FutureTask} can be used to wrap a {@link Callable} or
  51  * {@link Runnable} object.  Because {@code FutureTask} implements
  52  * {@code Runnable}, a {@code FutureTask} can be submitted to an
  53  * {@link Executor} for execution.
  54  *
  55  * <p>In addition to serving as a standalone class, this class provides
  56  * {@code protected} functionality that may be useful when creating
  57  * customized task classes.
  58  *
  59  * @since 1.5
  60  * @author Doug Lea
  61  * @param <V> The result type returned by this FutureTask's {@code get} methods
  62  */
  63 public class FutureTask<V> implements RunnableFuture<V> {
  64     /*
  65      * Revision notes: This differs from previous versions of this
  66      * class that relied on AbstractQueuedSynchronizer, mainly to
  67      * avoid surprising users about retaining interrupt status during
  68      * cancellation races. Sync control in the current design relies
  69      * on a "state" field updated via CAS to track completion, along
  70      * with a simple Treiber stack to hold waiting threads.
  71      *
  72      * Style note: As usual, we bypass overhead of using
  73      * AtomicXFieldUpdaters and instead directly use Unsafe intrinsics.
  74      */
  75 
  76     /**
  77      * The run state of this task, initially NEW.  The run state
  78      * transitions to a terminal state only in methods set,
  79      * setException, and cancel.  During completion, state may take on
  80      * transient values of COMPLETING (while outcome is being set) or
  81      * INTERRUPTING (only while interrupting the runner to satisfy a
  82      * cancel(true)). Transitions from these intermediate to final
  83      * states use cheaper ordered/lazy writes because values are unique
  84      * and cannot be further modified.
  85      *
  86      * Possible state transitions:
  87      * NEW -> COMPLETING -> NORMAL
  88      * NEW -> COMPLETING -> EXCEPTIONAL
  89      * NEW -> CANCELLED
  90      * NEW -> INTERRUPTING -> INTERRUPTED
  91      */
  92     private volatile int state;
  93     private static final int NEW          = 0;
  94     private static final int COMPLETING   = 1;
  95     private static final int NORMAL       = 2;
  96     private static final int EXCEPTIONAL  = 3;
  97     private static final int CANCELLED    = 4;
  98     private static final int INTERRUPTING = 5;
  99     private static final int INTERRUPTED  = 6;
 100 
 101     /** The underlying callable; nulled out after running */
 102     private Callable<V> callable;
 103     /** The result to return or exception to throw from get() */
 104     private Object outcome; // non-volatile, protected by state reads/writes
 105     /** The thread running the callable; CASed during run() */
 106     private volatile Thread runner;
 107     /** Treiber stack of waiting threads */
 108     private volatile WaitNode waiters;
 109 
 110     /**
 111      * Returns result or throws exception for completed task.
 112      *
 113      * @param s completed state value
 114      */
 115     @SuppressWarnings("unchecked")
 116     private V report(int s) throws ExecutionException {
 117         Object x = outcome;
 118         if (s == NORMAL)
 119             return (V)x;
 120         if (s >= CANCELLED)
 121             throw new CancellationException();
 122         throw new ExecutionException((Throwable)x);
 123     }
 124 
 125     /**
 126      * Creates a {@code FutureTask} that will, upon running, execute the
 127      * given {@code Callable}.
 128      *
 129      * @param  callable the callable task
 130      * @throws NullPointerException if the callable is null
 131      */
 132     public FutureTask(Callable<V> callable) {
 133         if (callable == null)
 134             throw new NullPointerException();
 135         this.callable = callable;
 136         this.state = NEW;       // ensure visibility of callable
 137     }
 138 
 139     /**
 140      * Creates a {@code FutureTask} that will, upon running, execute the
 141      * given {@code Runnable}, and arrange that {@code get} will return the
 142      * given result on successful completion.
 143      *
 144      * @param runnable the runnable task
 145      * @param result the result to return on successful completion. If
 146      * you don't need a particular result, consider using
 147      * constructions of the form:
 148      * {@code Future<?> f = new FutureTask<Void>(runnable, null)}
 149      * @throws NullPointerException if the runnable is null
 150      */
 151     public FutureTask(Runnable runnable, V result) {
 152         this.callable = Executors.callable(runnable, result);
 153         this.state = NEW;       // ensure visibility of callable
 154     }
 155 
 156     public boolean isCancelled() {
 157         return state >= CANCELLED;
 158     }
 159 
 160     public boolean isDone() {
 161         return state != NEW;
 162     }
 163 
 164     public boolean cancel(boolean mayInterruptIfRunning) {
 165         if (!(state == NEW &&
 166               UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
 167                   mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
 168             return false;
 169         try {    // in case call to interrupt throws exception
 170             if (mayInterruptIfRunning) {
 171                 try {
 172                     Thread t = runner;
 173                     if (t != null)
 174                         t.interrupt();
 175                 } finally { // final state
 176                     UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
 177                 }
 178             }
 179         } finally {
 180             finishCompletion();
 181         }
 182         return true;
 183     }
 184 
 185     /**
 186      * @throws CancellationException {@inheritDoc}
 187      */
 188     public V get() throws InterruptedException, ExecutionException {
 189         int s = state;
 190         if (s <= COMPLETING)
 191             s = awaitDone(false, 0L);
 192         return report(s);
 193     }
 194 
 195     /**
 196      * @throws CancellationException {@inheritDoc}
 197      */
 198     public V get(long timeout, TimeUnit unit)
 199         throws InterruptedException, ExecutionException, TimeoutException {
 200         if (unit == null)
 201             throw new NullPointerException();
 202         int s = state;
 203         if (s <= COMPLETING &&
 204             (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
 205             throw new TimeoutException();
 206         return report(s);
 207     }
 208 
 209     /**
 210      * Protected method invoked when this task transitions to state
 211      * {@code isDone} (whether normally or via cancellation). The
 212      * default implementation does nothing.  Subclasses may override
 213      * this method to invoke completion callbacks or perform
 214      * bookkeeping. Note that you can query status inside the
 215      * implementation of this method to determine whether this task
 216      * has been cancelled.
 217      */
 218     protected void done() { }
 219 
 220     /**
 221      * Sets the result of this future to the given value unless
 222      * this future has already been set or has been cancelled.
 223      *
 224      * <p>This method is invoked internally by the {@link #run} method
 225      * upon successful completion of the computation.
 226      *
 227      * @param v the value
 228      */
 229     protected void set(V v) {
 230         if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
 231             outcome = v;
 232             UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
 233             finishCompletion();
 234         }
 235     }
 236 
 237     /**
 238      * Causes this future to report an {@link ExecutionException}
 239      * with the given throwable as its cause, unless this future has
 240      * already been set or has been cancelled.
 241      *
 242      * <p>This method is invoked internally by the {@link #run} method
 243      * upon failure of the computation.
 244      *
 245      * @param t the cause of failure
 246      */
 247     protected void setException(Throwable t) {
 248         if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
 249             outcome = t;
 250             UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
 251             finishCompletion();
 252         }
 253     }
 254 
 255     public void run() {
 256         if (state != NEW ||
 257             !UNSAFE.compareAndSwapObject(this, runnerOffset,
 258                                          null, Thread.currentThread()))
 259             return;
 260         try {
 261             Callable<V> c = callable;
 262             if (c != null && state == NEW) {
 263                 V result;
 264                 boolean ran;
 265                 try {
 266                     result = c.call();
 267                     ran = true;
 268                 } catch (Throwable ex) {
 269                     result = null;
 270                     ran = false;
 271                     setException(ex);
 272                 }
 273                 if (ran)
 274                     set(result);
 275             }
 276         } finally {
 277             // runner must be non-null until state is settled to
 278             // prevent concurrent calls to run()
 279             runner = null;
 280             // state must be re-read after nulling runner to prevent
 281             // leaked interrupts
 282             int s = state;
 283             if (s >= INTERRUPTING)
 284                 handlePossibleCancellationInterrupt(s);
 285         }
 286     }
 287 
 288     /**
 289      * Executes the computation without setting its result, and then
 290      * resets this future to initial state, failing to do so if the
 291      * computation encounters an exception or is cancelled.  This is
 292      * designed for use with tasks that intrinsically execute more
 293      * than once.
 294      *
 295      * @return {@code true} if successfully run and reset
 296      */
 297     protected boolean runAndReset() {
 298         if (state != NEW ||
 299             !UNSAFE.compareAndSwapObject(this, runnerOffset,
 300                                          null, Thread.currentThread()))
 301             return false;
 302         boolean ran = false;
 303         int s = state;
 304         try {
 305             Callable<V> c = callable;
 306             if (c != null && s == NEW) {
 307                 try {
 308                     c.call(); // don't set result
 309                     ran = true;
 310                 } catch (Throwable ex) {
 311                     setException(ex);
 312                 }
 313             }
 314         } finally {
 315             // runner must be non-null until state is settled to
 316             // prevent concurrent calls to run()
 317             runner = null;
 318             // state must be re-read after nulling runner to prevent
 319             // leaked interrupts
 320             s = state;
 321             if (s >= INTERRUPTING)
 322                 handlePossibleCancellationInterrupt(s);
 323         }
 324         return ran && s == NEW;
 325     }
 326 
 327     /**
 328      * Ensures that any interrupt from a possible cancel(true) is only
 329      * delivered to a task while in run or runAndReset.
 330      */
 331     private void handlePossibleCancellationInterrupt(int s) {
 332         // It is possible for our interrupter to stall before getting a
 333         // chance to interrupt us.  Let's spin-wait patiently.
 334         if (s == INTERRUPTING)
 335             while (state == INTERRUPTING)
 336                 Thread.yield(); // wait out pending interrupt
 337 
 338         // assert state == INTERRUPTED;
 339 
 340         // We want to clear any interrupt we may have received from
 341         // cancel(true).  However, it is permissible to use interrupts
 342         // as an independent mechanism for a task to communicate with
 343         // its caller, and there is no way to clear only the
 344         // cancellation interrupt.
 345         //
 346         // Thread.interrupted();
 347     }
 348 
 349     /**
 350      * Simple linked list nodes to record waiting threads in a Treiber
 351      * stack.  See other classes such as Phaser and SynchronousQueue
 352      * for more detailed explanation.
 353      */
 354     static final class WaitNode {
 355         volatile Thread thread;
 356         volatile WaitNode next;
 357         WaitNode() { thread = Thread.currentThread(); }
 358     }
 359 
 360     /**
 361      * Removes and signals all waiting threads, invokes done(), and
 362      * nulls out callable.
 363      */
 364     private void finishCompletion() {
 365         // assert state > COMPLETING;
 366         for (WaitNode q; (q = waiters) != null;) {
 367             if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
 368                 for (;;) {
 369                     Thread t = q.thread;
 370                     if (t != null) {
 371                         q.thread = null;
 372                         LockSupport.unpark(t);
 373                     }
 374                     WaitNode next = q.next;
 375                     if (next == null)
 376                         break;
 377                     q.next = null; // unlink to help gc
 378                     q = next;
 379                 }
 380                 break;
 381             }
 382         }
 383 
 384         done();
 385 
 386         callable = null;        // to reduce footprint
 387     }
 388 
 389     /**
 390      * Awaits completion or aborts on interrupt or timeout.
 391      *
 392      * @param timed true if use timed waits
 393      * @param nanos time to wait, if timed
 394      * @return state upon completion
 395      */
 396     private int awaitDone(boolean timed, long nanos)
 397         throws InterruptedException {
 398         final long deadline = timed ? System.nanoTime() + nanos : 0L;
 399         WaitNode q = null;
 400         boolean queued = false;
 401         for (;;) {
 402             if (Thread.interrupted()) {
 403                 removeWaiter(q);
 404                 throw new InterruptedException();
 405             }
 406 
 407             int s = state;
 408             if (s > COMPLETING) {
 409                 if (q != null)
 410                     q.thread = null;
 411                 return s;
 412             }
 413             else if (s == COMPLETING) // cannot time out yet
 414                 Thread.yield();
 415             else if (q == null)
 416                 q = new WaitNode();
 417             else if (!queued)
 418                 queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
 419                                                      q.next = waiters, q);
 420             else if (timed) {
 421                 nanos = deadline - System.nanoTime();
 422                 if (nanos <= 0L) {
 423                     removeWaiter(q);
 424                     return state;
 425                 }
 426                 LockSupport.parkNanos(this, nanos);
 427             }
 428             else
 429                 LockSupport.park(this);
 430         }
 431     }
 432 
 433     /**
 434      * Tries to unlink a timed-out or interrupted wait node to avoid
 435      * accumulating garbage.  Internal nodes are simply unspliced
 436      * without CAS since it is harmless if they are traversed anyway
 437      * by releasers.  To avoid effects of unsplicing from already
 438      * removed nodes, the list is retraversed in case of an apparent
 439      * race.  This is slow when there are a lot of nodes, but we don't
 440      * expect lists to be long enough to outweigh higher-overhead
 441      * schemes.
 442      */
 443     private void removeWaiter(WaitNode node) {
 444         if (node != null) {
 445             node.thread = null;
 446             retry:
 447             for (;;) {          // restart on removeWaiter race
 448                 for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
 449                     s = q.next;
 450                     if (q.thread != null)
 451                         pred = q;
 452                     else if (pred != null) {
 453                         pred.next = s;
 454                         if (pred.thread == null) // check for race
 455                             continue retry;
 456                     }
 457                     else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
 458                                                           q, s))
 459                         continue retry;
 460                 }
 461                 break;
 462             }
 463         }
 464     }
 465 
 466     // Unsafe mechanics
 467     private static final sun.misc.Unsafe UNSAFE;
 468     private static final long stateOffset;
 469     private static final long runnerOffset;
 470     private static final long waitersOffset;
 471     static {
 472         try {
 473             UNSAFE = sun.misc.Unsafe.getUnsafe();
 474             Class<?> k = FutureTask.class;
 475             stateOffset = UNSAFE.objectFieldOffset
 476                 (k.getDeclaredField("state"));
 477             runnerOffset = UNSAFE.objectFieldOffset
 478                 (k.getDeclaredField("runner"));
 479             waitersOffset = UNSAFE.objectFieldOffset
 480                 (k.getDeclaredField("waiters"));
 481         } catch (Exception e) {
 482             throw new Error(e);
 483         }
 484     }
 485 
 486 }