1 /*
   2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   3  *
   4  * This code is free software; you can redistribute it and/or modify it
   5  * under the terms of the GNU General Public License version 2 only, as
   6  * published by the Free Software Foundation.  Oracle designates this
   7  * particular file as subject to the "Classpath" exception as provided
   8  * by Oracle in the LICENSE file that accompanied this code.
   9  *
  10  * This code is distributed in the hope that it will be useful, but WITHOUT
  11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  13  * version 2 for more details (a copy is included in the LICENSE file that
  14  * accompanied this code).
  15  *
  16  * You should have received a copy of the GNU General Public License version
  17  * 2 along with this work; if not, write to the Free Software Foundation,
  18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  19  *
  20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  21  * or visit www.oracle.com if you need additional information or have any
  22  * questions.
  23  */
  24 
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/publicdomain/zero/1.0/
  34  */
  35 
  36 package java.util.concurrent;
  37 import java.util.concurrent.atomic.*;
  38 import java.util.concurrent.locks.*;
  39 import java.util.*;
  40 
  41 /**
  42  * A {@link ThreadPoolExecutor} that can additionally schedule
  43  * commands to run after a given delay, or to execute
  44  * periodically. This class is preferable to {@link java.util.Timer}
  45  * when multiple worker threads are needed, or when the additional
  46  * flexibility or capabilities of {@link ThreadPoolExecutor} (which
  47  * this class extends) are required.
  48  *
  49  * <p>Delayed tasks execute no sooner than they are enabled, but
  50  * without any real-time guarantees about when, after they are
  51  * enabled, they will commence. Tasks scheduled for exactly the same
  52  * execution time are enabled in first-in-first-out (FIFO) order of
  53  * submission.
  54  *
  55  * <p>When a submitted task is cancelled before it is run, execution
  56  * is suppressed. By default, such a cancelled task is not
  57  * automatically removed from the work queue until its delay
  58  * elapses. While this enables further inspection and monitoring, it
  59  * may also cause unbounded retention of cancelled tasks. To avoid
  60  * this, set {@link #setRemoveOnCancelPolicy} to {@code true}, which
  61  * causes tasks to be immediately removed from the work queue at
  62  * time of cancellation.
  63  *
  64  * <p>Successive executions of a task scheduled via
  65  * {@code scheduleAtFixedRate} or
  66  * {@code scheduleWithFixedDelay} do not overlap. While different
  67  * executions may be performed by different threads, the effects of
  68  * prior executions <a
  69  * href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
  70  * those of subsequent ones.
  71  *
  72  * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
  73  * of the inherited tuning methods are not useful for it. In
  74  * particular, because it acts as a fixed-sized pool using
  75  * {@code corePoolSize} threads and an unbounded queue, adjustments
  76  * to {@code maximumPoolSize} have no useful effect. Additionally, it
  77  * is almost never a good idea to set {@code corePoolSize} to zero or
  78  * use {@code allowCoreThreadTimeOut} because this may leave the pool
  79  * without threads to handle tasks once they become eligible to run.
  80  *
  81  * <p><b>Extension notes:</b> This class overrides the
  82  * {@link ThreadPoolExecutor#execute execute} and
  83  * {@link AbstractExecutorService#submit(Runnable) submit}
  84  * methods to generate internal {@link ScheduledFuture} objects to
  85  * control per-task delays and scheduling.  To preserve
  86  * functionality, any further overrides of these methods in
  87  * subclasses must invoke superclass versions, which effectively
  88  * disables additional task customization.  However, this class
  89  * provides alternative protected extension method
  90  * {@code decorateTask} (one version each for {@code Runnable} and
  91  * {@code Callable}) that can be used to customize the concrete task
  92  * types used to execute commands entered via {@code execute},
  93  * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
  94  * and {@code scheduleWithFixedDelay}.  By default, a
  95  * {@code ScheduledThreadPoolExecutor} uses a task type extending
  96  * {@link FutureTask}. However, this may be modified or replaced using
  97  * subclasses of the form:
  98  *
  99  *  <pre> {@code
 100  * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
 101  *
 102  *   static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
 103  *
 104  *   protected <V> RunnableScheduledFuture<V> decorateTask(
 105  *                Runnable r, RunnableScheduledFuture<V> task) {
 106  *       return new CustomTask<V>(r, task);
 107  *   }
 108  *
 109  *   protected <V> RunnableScheduledFuture<V> decorateTask(
 110  *                Callable<V> c, RunnableScheduledFuture<V> task) {
 111  *       return new CustomTask<V>(c, task);
 112  *   }
 113  *   // ... add constructors, etc.
 114  * }}</pre>
 115  *
 116  * @since 1.5
 117  * @author Doug Lea
 118  */
 119 public class ScheduledThreadPoolExecutor
 120         extends ThreadPoolExecutor
 121         implements ScheduledExecutorService {
 122 
 123     /*
 124      * This class specializes ThreadPoolExecutor implementation by
 125      *
 126      * 1. Using a custom task type, ScheduledFutureTask for
 127      *    tasks, even those that don't require scheduling (i.e.,
 128      *    those submitted using ExecutorService execute, not
 129      *    ScheduledExecutorService methods) which are treated as
 130      *    delayed tasks with a delay of zero.
 131      *
 132      * 2. Using a custom queue (DelayedWorkQueue), a variant of
 133      *    unbounded DelayQueue. The lack of capacity constraint and
 134      *    the fact that corePoolSize and maximumPoolSize are
 135      *    effectively identical simplifies some execution mechanics
 136      *    (see delayedExecute) compared to ThreadPoolExecutor.
 137      *
 138      * 3. Supporting optional run-after-shutdown parameters, which
 139      *    leads to overrides of shutdown methods to remove and cancel
 140      *    tasks that should NOT be run after shutdown, as well as
 141      *    different recheck logic when task (re)submission overlaps
 142      *    with a shutdown.
 143      *
 144      * 4. Task decoration methods to allow interception and
 145      *    instrumentation, which are needed because subclasses cannot
 146      *    otherwise override submit methods to get this effect. These
 147      *    don't have any impact on pool control logic though.
 148      */
 149 
 150     /**
 151      * False if should cancel/suppress periodic tasks on shutdown.
 152      */
 153     private volatile boolean continueExistingPeriodicTasksAfterShutdown;
 154 
 155     /**
 156      * False if should cancel non-periodic tasks on shutdown.
 157      */
 158     private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
 159 
 160     /**
 161      * True if ScheduledFutureTask.cancel should remove from queue
 162      */
 163     private volatile boolean removeOnCancel = false;
 164 
 165     /**
 166      * Sequence number to break scheduling ties, and in turn to
 167      * guarantee FIFO order among tied entries.
 168      */
 169     private static final AtomicLong sequencer = new AtomicLong(0);
 170 
 171     /**
 172      * Returns current nanosecond time.
 173      */
 174     final long now() {
 175         return System.nanoTime();
 176     }
 177 
 178     private class ScheduledFutureTask<V>
 179             extends FutureTask<V> implements RunnableScheduledFuture<V> {
 180 
 181         /** Sequence number to break ties FIFO */
 182         private final long sequenceNumber;
 183 
 184         /** The time the task is enabled to execute in nanoTime units */
 185         private long time;
 186 
 187         /**
 188          * Period in nanoseconds for repeating tasks.  A positive
 189          * value indicates fixed-rate execution.  A negative value
 190          * indicates fixed-delay execution.  A value of 0 indicates a
 191          * non-repeating task.
 192          */
 193         private final long period;
 194 
 195         /** The actual task to be re-enqueued by reExecutePeriodic */
 196         RunnableScheduledFuture<V> outerTask = this;
 197 
 198         /**
 199          * Index into delay queue, to support faster cancellation.
 200          */
 201         int heapIndex;
 202 
 203         /**
 204          * Creates a one-shot action with given nanoTime-based trigger time.
 205          */
 206         ScheduledFutureTask(Runnable r, V result, long ns) {
 207             super(r, result);
 208             this.time = ns;
 209             this.period = 0;
 210             this.sequenceNumber = sequencer.getAndIncrement();
 211         }
 212 
 213         /**
 214          * Creates a periodic action with given nano time and period.
 215          */
 216         ScheduledFutureTask(Runnable r, V result, long ns, long period) {
 217             super(r, result);
 218             this.time = ns;
 219             this.period = period;
 220             this.sequenceNumber = sequencer.getAndIncrement();
 221         }
 222 
 223         /**
 224          * Creates a one-shot action with given nanoTime-based trigger.
 225          */
 226         ScheduledFutureTask(Callable<V> callable, long ns) {
 227             super(callable);
 228             this.time = ns;
 229             this.period = 0;
 230             this.sequenceNumber = sequencer.getAndIncrement();
 231         }
 232 
 233         public long getDelay(TimeUnit unit) {
 234             return unit.convert(time - now(), TimeUnit.NANOSECONDS);
 235         }
 236 
 237         public int compareTo(Delayed other) {
 238             if (other == this) // compare zero ONLY if same object
 239                 return 0;
 240             if (other instanceof ScheduledFutureTask) {
 241                 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
 242                 long diff = time - x.time;
 243                 if (diff < 0)
 244                     return -1;
 245                 else if (diff > 0)
 246                     return 1;
 247                 else if (sequenceNumber < x.sequenceNumber)
 248                     return -1;
 249                 else
 250                     return 1;
 251             }
 252             long d = (getDelay(TimeUnit.NANOSECONDS) -
 253                       other.getDelay(TimeUnit.NANOSECONDS));
 254             return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
 255         }
 256 
 257         /**
 258          * Returns true if this is a periodic (not a one-shot) action.
 259          *
 260          * @return true if periodic
 261          */
 262         public boolean isPeriodic() {
 263             return period != 0;
 264         }
 265 
 266         /**
 267          * Sets the next time to run for a periodic task.
 268          */
 269         private void setNextRunTime() {
 270             long p = period;
 271             if (p > 0)
 272                 time += p;
 273             else
 274                 time = triggerTime(-p);
 275         }
 276 
 277         public boolean cancel(boolean mayInterruptIfRunning) {
 278             boolean cancelled = super.cancel(mayInterruptIfRunning);
 279             if (cancelled && removeOnCancel && heapIndex >= 0)
 280                 remove(this);
 281             return cancelled;
 282         }
 283 
 284         /**
 285          * Overrides FutureTask version so as to reset/requeue if periodic.
 286          */
 287         public void run() {
 288             boolean periodic = isPeriodic();
 289             if (!canRunInCurrentRunState(periodic))
 290                 cancel(false);
 291             else if (!periodic)
 292                 ScheduledFutureTask.super.run();
 293             else if (ScheduledFutureTask.super.runAndReset()) {
 294                 setNextRunTime();
 295                 reExecutePeriodic(outerTask);
 296             }
 297         }
 298     }
 299 
 300     /**
 301      * Returns true if can run a task given current run state
 302      * and run-after-shutdown parameters.
 303      *
 304      * @param periodic true if this task periodic, false if delayed
 305      */
 306     boolean canRunInCurrentRunState(boolean periodic) {
 307         return isRunningOrShutdown(periodic ?
 308                                    continueExistingPeriodicTasksAfterShutdown :
 309                                    executeExistingDelayedTasksAfterShutdown);
 310     }
 311 
 312     /**
 313      * Main execution method for delayed or periodic tasks.  If pool
 314      * is shut down, rejects the task. Otherwise adds task to queue
 315      * and starts a thread, if necessary, to run it.  (We cannot
 316      * prestart the thread to run the task because the task (probably)
 317      * shouldn't be run yet,) If the pool is shut down while the task
 318      * is being added, cancel and remove it if required by state and
 319      * run-after-shutdown parameters.
 320      *
 321      * @param task the task
 322      */
 323     private void delayedExecute(RunnableScheduledFuture<?> task) {
 324         if (isShutdown())
 325             reject(task);
 326         else {
 327             super.getQueue().add(task);
 328             if (isShutdown() &&
 329                 !canRunInCurrentRunState(task.isPeriodic()) &&
 330                 remove(task))
 331                 task.cancel(false);
 332             else
 333                 ensurePrestart();
 334         }
 335     }
 336 
 337     /**
 338      * Requeues a periodic task unless current run state precludes it.
 339      * Same idea as delayedExecute except drops task rather than rejecting.
 340      *
 341      * @param task the task
 342      */
 343     void reExecutePeriodic(RunnableScheduledFuture<?> task) {
 344         if (canRunInCurrentRunState(true)) {
 345             super.getQueue().add(task);
 346             if (!canRunInCurrentRunState(true) && remove(task))
 347                 task.cancel(false);
 348             else
 349                 ensurePrestart();
 350         }
 351     }
 352 
 353     /**
 354      * Cancels and clears the queue of all tasks that should not be run
 355      * due to shutdown policy.  Invoked within super.shutdown.
 356      */
 357     @Override void onShutdown() {
 358         BlockingQueue<Runnable> q = super.getQueue();
 359         boolean keepDelayed =
 360             getExecuteExistingDelayedTasksAfterShutdownPolicy();
 361         boolean keepPeriodic =
 362             getContinueExistingPeriodicTasksAfterShutdownPolicy();
 363         if (!keepDelayed && !keepPeriodic) {
 364             for (Object e : q.toArray())
 365                 if (e instanceof RunnableScheduledFuture<?>)
 366                     ((RunnableScheduledFuture<?>) e).cancel(false);
 367             q.clear();
 368         }
 369         else {
 370             // Traverse snapshot to avoid iterator exceptions
 371             for (Object e : q.toArray()) {
 372                 if (e instanceof RunnableScheduledFuture) {
 373                     RunnableScheduledFuture<?> t =
 374                         (RunnableScheduledFuture<?>)e;
 375                     if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
 376                         t.isCancelled()) { // also remove if already cancelled
 377                         if (q.remove(t))
 378                             t.cancel(false);
 379                     }
 380                 }
 381             }
 382         }
 383         tryTerminate();
 384     }
 385 
 386     /**
 387      * Modifies or replaces the task used to execute a runnable.
 388      * This method can be used to override the concrete
 389      * class used for managing internal tasks.
 390      * The default implementation simply returns the given task.
 391      *
 392      * @param runnable the submitted Runnable
 393      * @param task the task created to execute the runnable
 394      * @return a task that can execute the runnable
 395      * @since 1.6
 396      */
 397     protected <V> RunnableScheduledFuture<V> decorateTask(
 398         Runnable runnable, RunnableScheduledFuture<V> task) {
 399         return task;
 400     }
 401 
 402     /**
 403      * Modifies or replaces the task used to execute a callable.
 404      * This method can be used to override the concrete
 405      * class used for managing internal tasks.
 406      * The default implementation simply returns the given task.
 407      *
 408      * @param callable the submitted Callable
 409      * @param task the task created to execute the callable
 410      * @return a task that can execute the callable
 411      * @since 1.6
 412      */
 413     protected <V> RunnableScheduledFuture<V> decorateTask(
 414         Callable<V> callable, RunnableScheduledFuture<V> task) {
 415         return task;
 416     }
 417 
 418     /**
 419      * Creates a new {@code ScheduledThreadPoolExecutor} with the
 420      * given core pool size.
 421      *
 422      * @param corePoolSize the number of threads to keep in the pool, even
 423      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
 424      * @throws IllegalArgumentException if {@code corePoolSize < 0}
 425      */
 426     public ScheduledThreadPoolExecutor(int corePoolSize) {
 427         super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
 428               new DelayedWorkQueue());
 429     }
 430 
 431     /**
 432      * Creates a new {@code ScheduledThreadPoolExecutor} with the
 433      * given initial parameters.
 434      *
 435      * @param corePoolSize the number of threads to keep in the pool, even
 436      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
 437      * @param threadFactory the factory to use when the executor
 438      *        creates a new thread
 439      * @throws IllegalArgumentException if {@code corePoolSize < 0}
 440      * @throws NullPointerException if {@code threadFactory} is null
 441      */
 442     public ScheduledThreadPoolExecutor(int corePoolSize,
 443                                        ThreadFactory threadFactory) {
 444         super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
 445               new DelayedWorkQueue(), threadFactory);
 446     }
 447 
 448     /**
 449      * Creates a new ScheduledThreadPoolExecutor with the given
 450      * initial parameters.
 451      *
 452      * @param corePoolSize the number of threads to keep in the pool, even
 453      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
 454      * @param handler the handler to use when execution is blocked
 455      *        because the thread bounds and queue capacities are reached
 456      * @throws IllegalArgumentException if {@code corePoolSize < 0}
 457      * @throws NullPointerException if {@code handler} is null
 458      */
 459     public ScheduledThreadPoolExecutor(int corePoolSize,
 460                                        RejectedExecutionHandler handler) {
 461         super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
 462               new DelayedWorkQueue(), handler);
 463     }
 464 
 465     /**
 466      * Creates a new ScheduledThreadPoolExecutor with the given
 467      * initial parameters.
 468      *
 469      * @param corePoolSize the number of threads to keep in the pool, even
 470      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
 471      * @param threadFactory the factory to use when the executor
 472      *        creates a new thread
 473      * @param handler the handler to use when execution is blocked
 474      *        because the thread bounds and queue capacities are reached
 475      * @throws IllegalArgumentException if {@code corePoolSize < 0}
 476      * @throws NullPointerException if {@code threadFactory} or
 477      *         {@code handler} is null
 478      */
 479     public ScheduledThreadPoolExecutor(int corePoolSize,
 480                                        ThreadFactory threadFactory,
 481                                        RejectedExecutionHandler handler) {
 482         super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
 483               new DelayedWorkQueue(), threadFactory, handler);
 484     }
 485 
 486     /**
 487      * Returns the trigger time of a delayed action.
 488      */
 489     private long triggerTime(long delay, TimeUnit unit) {
 490         return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
 491     }
 492 
 493     /**
 494      * Returns the trigger time of a delayed action.
 495      */
 496     long triggerTime(long delay) {
 497         return now() +
 498             ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
 499     }
 500 
 501     /**
 502      * Constrains the values of all delays in the queue to be within
 503      * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
 504      * This may occur if a task is eligible to be dequeued, but has
 505      * not yet been, while some other task is added with a delay of
 506      * Long.MAX_VALUE.
 507      */
 508     private long overflowFree(long delay) {
 509         Delayed head = (Delayed) super.getQueue().peek();
 510         if (head != null) {
 511             long headDelay = head.getDelay(TimeUnit.NANOSECONDS);
 512             if (headDelay < 0 && (delay - headDelay < 0))
 513                 delay = Long.MAX_VALUE + headDelay;
 514         }
 515         return delay;
 516     }
 517 
 518     /**
 519      * @throws RejectedExecutionException {@inheritDoc}
 520      * @throws NullPointerException       {@inheritDoc}
 521      */
 522     public ScheduledFuture<?> schedule(Runnable command,
 523                                        long delay,
 524                                        TimeUnit unit) {
 525         if (command == null || unit == null)
 526             throw new NullPointerException();
 527         RunnableScheduledFuture<?> t = decorateTask(command,
 528             new ScheduledFutureTask<Void>(command, null,
 529                                           triggerTime(delay, unit)));
 530         delayedExecute(t);
 531         return t;
 532     }
 533 
 534     /**
 535      * @throws RejectedExecutionException {@inheritDoc}
 536      * @throws NullPointerException       {@inheritDoc}
 537      */
 538     public <V> ScheduledFuture<V> schedule(Callable<V> callable,
 539                                            long delay,
 540                                            TimeUnit unit) {
 541         if (callable == null || unit == null)
 542             throw new NullPointerException();
 543         RunnableScheduledFuture<V> t = decorateTask(callable,
 544             new ScheduledFutureTask<V>(callable,
 545                                        triggerTime(delay, unit)));
 546         delayedExecute(t);
 547         return t;
 548     }
 549 
 550     /**
 551      * @throws RejectedExecutionException {@inheritDoc}
 552      * @throws NullPointerException       {@inheritDoc}
 553      * @throws IllegalArgumentException   {@inheritDoc}
 554      */
 555     public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
 556                                                   long initialDelay,
 557                                                   long period,
 558                                                   TimeUnit unit) {
 559         if (command == null || unit == null)
 560             throw new NullPointerException();
 561         if (period <= 0)
 562             throw new IllegalArgumentException();
 563         ScheduledFutureTask<Void> sft =
 564             new ScheduledFutureTask<Void>(command,
 565                                           null,
 566                                           triggerTime(initialDelay, unit),
 567                                           unit.toNanos(period));
 568         RunnableScheduledFuture<Void> t = decorateTask(command, sft);
 569         sft.outerTask = t;
 570         delayedExecute(t);
 571         return t;
 572     }
 573 
 574     /**
 575      * @throws RejectedExecutionException {@inheritDoc}
 576      * @throws NullPointerException       {@inheritDoc}
 577      * @throws IllegalArgumentException   {@inheritDoc}
 578      */
 579     public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
 580                                                      long initialDelay,
 581                                                      long delay,
 582                                                      TimeUnit unit) {
 583         if (command == null || unit == null)
 584             throw new NullPointerException();
 585         if (delay <= 0)
 586             throw new IllegalArgumentException();
 587         ScheduledFutureTask<Void> sft =
 588             new ScheduledFutureTask<Void>(command,
 589                                           null,
 590                                           triggerTime(initialDelay, unit),
 591                                           unit.toNanos(-delay));
 592         RunnableScheduledFuture<Void> t = decorateTask(command, sft);
 593         sft.outerTask = t;
 594         delayedExecute(t);
 595         return t;
 596     }
 597 
 598     /**
 599      * Executes {@code command} with zero required delay.
 600      * This has effect equivalent to
 601      * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
 602      * Note that inspections of the queue and of the list returned by
 603      * {@code shutdownNow} will access the zero-delayed
 604      * {@link ScheduledFuture}, not the {@code command} itself.
 605      *
 606      * <p>A consequence of the use of {@code ScheduledFuture} objects is
 607      * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
 608      * called with a null second {@code Throwable} argument, even if the
 609      * {@code command} terminated abruptly.  Instead, the {@code Throwable}
 610      * thrown by such a task can be obtained via {@link Future#get}.
 611      *
 612      * @throws RejectedExecutionException at discretion of
 613      *         {@code RejectedExecutionHandler}, if the task
 614      *         cannot be accepted for execution because the
 615      *         executor has been shut down
 616      * @throws NullPointerException {@inheritDoc}
 617      */
 618     public void execute(Runnable command) {
 619         schedule(command, 0, TimeUnit.NANOSECONDS);
 620     }
 621 
 622     // Override AbstractExecutorService methods
 623 
 624     /**
 625      * @throws RejectedExecutionException {@inheritDoc}
 626      * @throws NullPointerException       {@inheritDoc}
 627      */
 628     public Future<?> submit(Runnable task) {
 629         return schedule(task, 0, TimeUnit.NANOSECONDS);
 630     }
 631 
 632     /**
 633      * @throws RejectedExecutionException {@inheritDoc}
 634      * @throws NullPointerException       {@inheritDoc}
 635      */
 636     public <T> Future<T> submit(Runnable task, T result) {
 637         return schedule(Executors.callable(task, result),
 638                         0, TimeUnit.NANOSECONDS);
 639     }
 640 
 641     /**
 642      * @throws RejectedExecutionException {@inheritDoc}
 643      * @throws NullPointerException       {@inheritDoc}
 644      */
 645     public <T> Future<T> submit(Callable<T> task) {
 646         return schedule(task, 0, TimeUnit.NANOSECONDS);
 647     }
 648 
 649     /**
 650      * Sets the policy on whether to continue executing existing
 651      * periodic tasks even when this executor has been {@code shutdown}.
 652      * In this case, these tasks will only terminate upon
 653      * {@code shutdownNow} or after setting the policy to
 654      * {@code false} when already shutdown.
 655      * This value is by default {@code false}.
 656      *
 657      * @param value if {@code true}, continue after shutdown, else don't.
 658      * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
 659      */
 660     public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
 661         continueExistingPeriodicTasksAfterShutdown = value;
 662         if (!value && isShutdown())
 663             onShutdown();
 664     }
 665 
 666     /**
 667      * Gets the policy on whether to continue executing existing
 668      * periodic tasks even when this executor has been {@code shutdown}.
 669      * In this case, these tasks will only terminate upon
 670      * {@code shutdownNow} or after setting the policy to
 671      * {@code false} when already shutdown.
 672      * This value is by default {@code false}.
 673      *
 674      * @return {@code true} if will continue after shutdown
 675      * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
 676      */
 677     public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
 678         return continueExistingPeriodicTasksAfterShutdown;
 679     }
 680 
 681     /**
 682      * Sets the policy on whether to execute existing delayed
 683      * tasks even when this executor has been {@code shutdown}.
 684      * In this case, these tasks will only terminate upon
 685      * {@code shutdownNow}, or after setting the policy to
 686      * {@code false} when already shutdown.
 687      * This value is by default {@code true}.
 688      *
 689      * @param value if {@code true}, execute after shutdown, else don't.
 690      * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
 691      */
 692     public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
 693         executeExistingDelayedTasksAfterShutdown = value;
 694         if (!value && isShutdown())
 695             onShutdown();
 696     }
 697 
 698     /**
 699      * Gets the policy on whether to execute existing delayed
 700      * tasks even when this executor has been {@code shutdown}.
 701      * In this case, these tasks will only terminate upon
 702      * {@code shutdownNow}, or after setting the policy to
 703      * {@code false} when already shutdown.
 704      * This value is by default {@code true}.
 705      *
 706      * @return {@code true} if will execute after shutdown
 707      * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
 708      */
 709     public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
 710         return executeExistingDelayedTasksAfterShutdown;
 711     }
 712 
 713     /**
 714      * Sets the policy on whether cancelled tasks should be immediately
 715      * removed from the work queue at time of cancellation.  This value is
 716      * by default {@code false}.
 717      *
 718      * @param value if {@code true}, remove on cancellation, else don't
 719      * @see #getRemoveOnCancelPolicy
 720      * @since 1.7
 721      */
 722     public void setRemoveOnCancelPolicy(boolean value) {
 723         removeOnCancel = value;
 724     }
 725 
 726     /**
 727      * Gets the policy on whether cancelled tasks should be immediately
 728      * removed from the work queue at time of cancellation.  This value is
 729      * by default {@code false}.
 730      *
 731      * @return {@code true} if cancelled tasks are immediately removed
 732      *         from the queue
 733      * @see #setRemoveOnCancelPolicy
 734      * @since 1.7
 735      */
 736     public boolean getRemoveOnCancelPolicy() {
 737         return removeOnCancel;
 738     }
 739 
 740     /**
 741      * Initiates an orderly shutdown in which previously submitted
 742      * tasks are executed, but no new tasks will be accepted.
 743      * Invocation has no additional effect if already shut down.
 744      *
 745      * <p>This method does not wait for previously submitted tasks to
 746      * complete execution.  Use {@link #awaitTermination awaitTermination}
 747      * to do that.
 748      *
 749      * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
 750      * has been set {@code false}, existing delayed tasks whose delays
 751      * have not yet elapsed are cancelled.  And unless the {@code
 752      * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
 753      * {@code true}, future executions of existing periodic tasks will
 754      * be cancelled.
 755      *
 756      * @throws SecurityException {@inheritDoc}
 757      */
 758     public void shutdown() {
 759         super.shutdown();
 760     }
 761 
 762     /**
 763      * Attempts to stop all actively executing tasks, halts the
 764      * processing of waiting tasks, and returns a list of the tasks
 765      * that were awaiting execution.
 766      *
 767      * <p>This method does not wait for actively executing tasks to
 768      * terminate.  Use {@link #awaitTermination awaitTermination} to
 769      * do that.
 770      *
 771      * <p>There are no guarantees beyond best-effort attempts to stop
 772      * processing actively executing tasks.  This implementation
 773      * cancels tasks via {@link Thread#interrupt}, so any task that
 774      * fails to respond to interrupts may never terminate.
 775      *
 776      * @return list of tasks that never commenced execution.
 777      *         Each element of this list is a {@link ScheduledFuture},
 778      *         including those tasks submitted using {@code execute},
 779      *         which are for scheduling purposes used as the basis of a
 780      *         zero-delay {@code ScheduledFuture}.
 781      * @throws SecurityException {@inheritDoc}
 782      */
 783     public List<Runnable> shutdownNow() {
 784         return super.shutdownNow();
 785     }
 786 
 787     /**
 788      * Returns the task queue used by this executor.  Each element of
 789      * this queue is a {@link ScheduledFuture}, including those
 790      * tasks submitted using {@code execute} which are for scheduling
 791      * purposes used as the basis of a zero-delay
 792      * {@code ScheduledFuture}.  Iteration over this queue is
 793      * <em>not</em> guaranteed to traverse tasks in the order in
 794      * which they will execute.
 795      *
 796      * @return the task queue
 797      */
 798     public BlockingQueue<Runnable> getQueue() {
 799         return super.getQueue();
 800     }
 801 
 802     /**
 803      * Specialized delay queue. To mesh with TPE declarations, this
 804      * class must be declared as a BlockingQueue<Runnable> even though
 805      * it can only hold RunnableScheduledFutures.
 806      */
 807     static class DelayedWorkQueue extends AbstractQueue<Runnable>
 808         implements BlockingQueue<Runnable> {
 809 
 810         /*
 811          * A DelayedWorkQueue is based on a heap-based data structure
 812          * like those in DelayQueue and PriorityQueue, except that
 813          * every ScheduledFutureTask also records its index into the
 814          * heap array. This eliminates the need to find a task upon
 815          * cancellation, greatly speeding up removal (down from O(n)
 816          * to O(log n)), and reducing garbage retention that would
 817          * otherwise occur by waiting for the element to rise to top
 818          * before clearing. But because the queue may also hold
 819          * RunnableScheduledFutures that are not ScheduledFutureTasks,
 820          * we are not guaranteed to have such indices available, in
 821          * which case we fall back to linear search. (We expect that
 822          * most tasks will not be decorated, and that the faster cases
 823          * will be much more common.)
 824          *
 825          * All heap operations must record index changes -- mainly
 826          * within siftUp and siftDown. Upon removal, a task's
 827          * heapIndex is set to -1. Note that ScheduledFutureTasks can
 828          * appear at most once in the queue (this need not be true for
 829          * other kinds of tasks or work queues), so are uniquely
 830          * identified by heapIndex.
 831          */
 832 
 833         private static final int INITIAL_CAPACITY = 16;
 834         private RunnableScheduledFuture[] queue =
 835             new RunnableScheduledFuture[INITIAL_CAPACITY];
 836         private final ReentrantLock lock = new ReentrantLock();
 837         private int size = 0;
 838 
 839         /**
 840          * Thread designated to wait for the task at the head of the
 841          * queue.  This variant of the Leader-Follower pattern
 842          * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
 843          * minimize unnecessary timed waiting.  When a thread becomes
 844          * the leader, it waits only for the next delay to elapse, but
 845          * other threads await indefinitely.  The leader thread must
 846          * signal some other thread before returning from take() or
 847          * poll(...), unless some other thread becomes leader in the
 848          * interim.  Whenever the head of the queue is replaced with a
 849          * task with an earlier expiration time, the leader field is
 850          * invalidated by being reset to null, and some waiting
 851          * thread, but not necessarily the current leader, is
 852          * signalled.  So waiting threads must be prepared to acquire
 853          * and lose leadership while waiting.
 854          */
 855         private Thread leader = null;
 856 
 857         /**
 858          * Condition signalled when a newer task becomes available at the
 859          * head of the queue or a new thread may need to become leader.
 860          */
 861         private final Condition available = lock.newCondition();
 862 
 863         /**
 864          * Set f's heapIndex if it is a ScheduledFutureTask.
 865          */
 866         private void setIndex(RunnableScheduledFuture f, int idx) {
 867             if (f instanceof ScheduledFutureTask)
 868                 ((ScheduledFutureTask)f).heapIndex = idx;
 869         }
 870 
 871         /**
 872          * Sift element added at bottom up to its heap-ordered spot.
 873          * Call only when holding lock.
 874          */
 875         private void siftUp(int k, RunnableScheduledFuture key) {
 876             while (k > 0) {
 877                 int parent = (k - 1) >>> 1;
 878                 RunnableScheduledFuture e = queue[parent];
 879                 if (key.compareTo(e) >= 0)
 880                     break;
 881                 queue[k] = e;
 882                 setIndex(e, k);
 883                 k = parent;
 884             }
 885             queue[k] = key;
 886             setIndex(key, k);
 887         }
 888 
 889         /**
 890          * Sift element added at top down to its heap-ordered spot.
 891          * Call only when holding lock.
 892          */
 893         private void siftDown(int k, RunnableScheduledFuture key) {
 894             int half = size >>> 1;
 895             while (k < half) {
 896                 int child = (k << 1) + 1;
 897                 RunnableScheduledFuture c = queue[child];
 898                 int right = child + 1;
 899                 if (right < size && c.compareTo(queue[right]) > 0)
 900                     c = queue[child = right];
 901                 if (key.compareTo(c) <= 0)
 902                     break;
 903                 queue[k] = c;
 904                 setIndex(c, k);
 905                 k = child;
 906             }
 907             queue[k] = key;
 908             setIndex(key, k);
 909         }
 910 
 911         /**
 912          * Resize the heap array.  Call only when holding lock.
 913          */
 914         private void grow() {
 915             int oldCapacity = queue.length;
 916             int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
 917             if (newCapacity < 0) // overflow
 918                 newCapacity = Integer.MAX_VALUE;
 919             queue = Arrays.copyOf(queue, newCapacity);
 920         }
 921 
 922         /**
 923          * Find index of given object, or -1 if absent
 924          */
 925         private int indexOf(Object x) {
 926             if (x != null) {
 927                 if (x instanceof ScheduledFutureTask) {
 928                     int i = ((ScheduledFutureTask) x).heapIndex;
 929                     // Sanity check; x could conceivably be a
 930                     // ScheduledFutureTask from some other pool.
 931                     if (i >= 0 && i < size && queue[i] == x)
 932                         return i;
 933                 } else {
 934                     for (int i = 0; i < size; i++)
 935                         if (x.equals(queue[i]))
 936                             return i;
 937                 }
 938             }
 939             return -1;
 940         }
 941 
 942         public boolean contains(Object x) {
 943             final ReentrantLock lock = this.lock;
 944             lock.lock();
 945             try {
 946                 return indexOf(x) != -1;
 947             } finally {
 948                 lock.unlock();
 949             }
 950         }
 951 
 952         public boolean remove(Object x) {
 953             final ReentrantLock lock = this.lock;
 954             lock.lock();
 955             try {
 956                 int i = indexOf(x);
 957                 if (i < 0)
 958                     return false;
 959 
 960                 setIndex(queue[i], -1);
 961                 int s = --size;
 962                 RunnableScheduledFuture replacement = queue[s];
 963                 queue[s] = null;
 964                 if (s != i) {
 965                     siftDown(i, replacement);
 966                     if (queue[i] == replacement)
 967                         siftUp(i, replacement);
 968                 }
 969                 return true;
 970             } finally {
 971                 lock.unlock();
 972             }
 973         }
 974 
 975         public int size() {
 976             final ReentrantLock lock = this.lock;
 977             lock.lock();
 978             try {
 979                 return size;
 980             } finally {
 981                 lock.unlock();
 982             }
 983         }
 984 
 985         public boolean isEmpty() {
 986             return size() == 0;
 987         }
 988 
 989         public int remainingCapacity() {
 990             return Integer.MAX_VALUE;
 991         }
 992 
 993         public RunnableScheduledFuture peek() {
 994             final ReentrantLock lock = this.lock;
 995             lock.lock();
 996             try {
 997                 return queue[0];
 998             } finally {
 999                 lock.unlock();
1000             }
1001         }
1002 
1003         public boolean offer(Runnable x) {
1004             if (x == null)
1005                 throw new NullPointerException();
1006             RunnableScheduledFuture e = (RunnableScheduledFuture)x;
1007             final ReentrantLock lock = this.lock;
1008             lock.lock();
1009             try {
1010                 int i = size;
1011                 if (i >= queue.length)
1012                     grow();
1013                 size = i + 1;
1014                 if (i == 0) {
1015                     queue[0] = e;
1016                     setIndex(e, 0);
1017                 } else {
1018                     siftUp(i, e);
1019                 }
1020                 if (queue[0] == e) {
1021                     leader = null;
1022                     available.signal();
1023                 }
1024             } finally {
1025                 lock.unlock();
1026             }
1027             return true;
1028         }
1029 
1030         public void put(Runnable e) {
1031             offer(e);
1032         }
1033 
1034         public boolean add(Runnable e) {
1035             return offer(e);
1036         }
1037 
1038         public boolean offer(Runnable e, long timeout, TimeUnit unit) {
1039             return offer(e);
1040         }
1041 
1042         /**
1043          * Performs common bookkeeping for poll and take: Replaces
1044          * first element with last and sifts it down.  Call only when
1045          * holding lock.
1046          * @param f the task to remove and return
1047          */
1048         private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) {
1049             int s = --size;
1050             RunnableScheduledFuture x = queue[s];
1051             queue[s] = null;
1052             if (s != 0)
1053                 siftDown(0, x);
1054             setIndex(f, -1);
1055             return f;
1056         }
1057 
1058         public RunnableScheduledFuture poll() {
1059             final ReentrantLock lock = this.lock;
1060             lock.lock();
1061             try {
1062                 RunnableScheduledFuture first = queue[0];
1063                 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
1064                     return null;
1065                 else
1066                     return finishPoll(first);
1067             } finally {
1068                 lock.unlock();
1069             }
1070         }
1071 
1072         public RunnableScheduledFuture take() throws InterruptedException {
1073             final ReentrantLock lock = this.lock;
1074             lock.lockInterruptibly();
1075             try {
1076                 for (;;) {
1077                     RunnableScheduledFuture first = queue[0];
1078                     if (first == null)
1079                         available.await();
1080                     else {
1081                         long delay = first.getDelay(TimeUnit.NANOSECONDS);
1082                         if (delay <= 0)
1083                             return finishPoll(first);
1084                         else if (leader != null)
1085                             available.await();
1086                         else {
1087                             Thread thisThread = Thread.currentThread();
1088                             leader = thisThread;
1089                             try {
1090                                 available.awaitNanos(delay);
1091                             } finally {
1092                                 if (leader == thisThread)
1093                                     leader = null;
1094                             }
1095                         }
1096                     }
1097                 }
1098             } finally {
1099                 if (leader == null && queue[0] != null)
1100                     available.signal();
1101                 lock.unlock();
1102             }
1103         }
1104 
1105         public RunnableScheduledFuture poll(long timeout, TimeUnit unit)
1106             throws InterruptedException {
1107             long nanos = unit.toNanos(timeout);
1108             final ReentrantLock lock = this.lock;
1109             lock.lockInterruptibly();
1110             try {
1111                 for (;;) {
1112                     RunnableScheduledFuture first = queue[0];
1113                     if (first == null) {
1114                         if (nanos <= 0)
1115                             return null;
1116                         else
1117                             nanos = available.awaitNanos(nanos);
1118                     } else {
1119                         long delay = first.getDelay(TimeUnit.NANOSECONDS);
1120                         if (delay <= 0)
1121                             return finishPoll(first);
1122                         if (nanos <= 0)
1123                             return null;
1124                         if (nanos < delay || leader != null)
1125                             nanos = available.awaitNanos(nanos);
1126                         else {
1127                             Thread thisThread = Thread.currentThread();
1128                             leader = thisThread;
1129                             try {
1130                                 long timeLeft = available.awaitNanos(delay);
1131                                 nanos -= delay - timeLeft;
1132                             } finally {
1133                                 if (leader == thisThread)
1134                                     leader = null;
1135                             }
1136                         }
1137                     }
1138                 }
1139             } finally {
1140                 if (leader == null && queue[0] != null)
1141                     available.signal();
1142                 lock.unlock();
1143             }
1144         }
1145 
1146         public void clear() {
1147             final ReentrantLock lock = this.lock;
1148             lock.lock();
1149             try {
1150                 for (int i = 0; i < size; i++) {
1151                     RunnableScheduledFuture t = queue[i];
1152                     if (t != null) {
1153                         queue[i] = null;
1154                         setIndex(t, -1);
1155                     }
1156                 }
1157                 size = 0;
1158             } finally {
1159                 lock.unlock();
1160             }
1161         }
1162 
1163         /**
1164          * Return and remove first element only if it is expired.
1165          * Used only by drainTo.  Call only when holding lock.
1166          */
1167         private RunnableScheduledFuture pollExpired() {
1168             RunnableScheduledFuture first = queue[0];
1169             if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
1170                 return null;
1171             return finishPoll(first);
1172         }
1173 
1174         public int drainTo(Collection<? super Runnable> c) {
1175             if (c == null)
1176                 throw new NullPointerException();
1177             if (c == this)
1178                 throw new IllegalArgumentException();
1179             final ReentrantLock lock = this.lock;
1180             lock.lock();
1181             try {
1182                 RunnableScheduledFuture first;
1183                 int n = 0;
1184                 while ((first = pollExpired()) != null) {
1185                     c.add(first);
1186                     ++n;
1187                 }
1188                 return n;
1189             } finally {
1190                 lock.unlock();
1191             }
1192         }
1193 
1194         public int drainTo(Collection<? super Runnable> c, int maxElements) {
1195             if (c == null)
1196                 throw new NullPointerException();
1197             if (c == this)
1198                 throw new IllegalArgumentException();
1199             if (maxElements <= 0)
1200                 return 0;
1201             final ReentrantLock lock = this.lock;
1202             lock.lock();
1203             try {
1204                 RunnableScheduledFuture first;
1205                 int n = 0;
1206                 while (n < maxElements && (first = pollExpired()) != null) {
1207                     c.add(first);
1208                     ++n;
1209                 }
1210                 return n;
1211             } finally {
1212                 lock.unlock();
1213             }
1214         }
1215 
1216         public Object[] toArray() {
1217             final ReentrantLock lock = this.lock;
1218             lock.lock();
1219             try {
1220                 return Arrays.copyOf(queue, size, Object[].class);
1221             } finally {
1222                 lock.unlock();
1223             }
1224         }
1225 
1226         @SuppressWarnings("unchecked")
1227         public <T> T[] toArray(T[] a) {
1228             final ReentrantLock lock = this.lock;
1229             lock.lock();
1230             try {
1231                 if (a.length < size)
1232                     return (T[]) Arrays.copyOf(queue, size, a.getClass());
1233                 System.arraycopy(queue, 0, a, 0, size);
1234                 if (a.length > size)
1235                     a[size] = null;
1236                 return a;
1237             } finally {
1238                 lock.unlock();
1239             }
1240         }
1241 
1242         public Iterator<Runnable> iterator() {
1243             return new Itr(Arrays.copyOf(queue, size));
1244         }
1245 
1246         /**
1247          * Snapshot iterator that works off copy of underlying q array.
1248          */
1249         private class Itr implements Iterator<Runnable> {
1250             final RunnableScheduledFuture[] array;
1251             int cursor = 0;     // index of next element to return
1252             int lastRet = -1;   // index of last element, or -1 if no such
1253 
1254             Itr(RunnableScheduledFuture[] array) {
1255                 this.array = array;
1256             }
1257 
1258             public boolean hasNext() {
1259                 return cursor < array.length;
1260             }
1261 
1262             public Runnable next() {
1263                 if (cursor >= array.length)
1264                     throw new NoSuchElementException();
1265                 lastRet = cursor;
1266                 return array[cursor++];
1267             }
1268 
1269             public void remove() {
1270                 if (lastRet < 0)
1271                     throw new IllegalStateException();
1272                 DelayedWorkQueue.this.remove(array[lastRet]);
1273                 lastRet = -1;
1274             }
1275         }
1276     }
1277 }