1 /*
   2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   3  *
   4  * This code is free software; you can redistribute it and/or modify it
   5  * under the terms of the GNU General Public License version 2 only, as
   6  * published by the Free Software Foundation.  Oracle designates this
   7  * particular file as subject to the "Classpath" exception as provided
   8  * by Oracle in the LICENSE file that accompanied this code.
   9  *
  10  * This code is distributed in the hope that it will be useful, but WITHOUT
  11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  13  * version 2 for more details (a copy is included in the LICENSE file that
  14  * accompanied this code).
  15  *
  16  * You should have received a copy of the GNU General Public License version
  17  * 2 along with this work; if not, write to the Free Software Foundation,
  18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  19  *
  20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  21  * or visit www.oracle.com if you need additional information or have any
  22  * questions.
  23  */
  24 
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/publicdomain/zero/1.0/
  34  */
  35 
  36 package java.util.concurrent;
  37 import static java.util.concurrent.TimeUnit.NANOSECONDS;
  38 import java.util.concurrent.atomic.AtomicLong;
  39 import java.util.concurrent.locks.Condition;
  40 import java.util.concurrent.locks.ReentrantLock;
  41 import java.util.*;
  42 
  43 /**
  44  * A {@link ThreadPoolExecutor} that can additionally schedule
  45  * commands to run after a given delay, or to execute
  46  * periodically. This class is preferable to {@link java.util.Timer}
  47  * when multiple worker threads are needed, or when the additional
  48  * flexibility or capabilities of {@link ThreadPoolExecutor} (which
  49  * this class extends) are required.
  50  *
  51  * <p>Delayed tasks execute no sooner than they are enabled, but
  52  * without any real-time guarantees about when, after they are
  53  * enabled, they will commence. Tasks scheduled for exactly the same
  54  * execution time are enabled in first-in-first-out (FIFO) order of
  55  * submission.
  56  *
  57  * <p>When a submitted task is cancelled before it is run, execution
  58  * is suppressed. By default, such a cancelled task is not
  59  * automatically removed from the work queue until its delay
  60  * elapses. While this enables further inspection and monitoring, it
  61  * may also cause unbounded retention of cancelled tasks. To avoid
  62  * this, set {@link #setRemoveOnCancelPolicy} to {@code true}, which
  63  * causes tasks to be immediately removed from the work queue at
  64  * time of cancellation.
  65  *
  66  * <p>Successive executions of a task scheduled via
  67  * {@code scheduleAtFixedRate} or
  68  * {@code scheduleWithFixedDelay} do not overlap. While different
  69  * executions may be performed by different threads, the effects of
  70  * prior executions <a
  71  * href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
  72  * those of subsequent ones.
  73  *
  74  * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
  75  * of the inherited tuning methods are not useful for it. In
  76  * particular, because it acts as a fixed-sized pool using
  77  * {@code corePoolSize} threads and an unbounded queue, adjustments
  78  * to {@code maximumPoolSize} have no useful effect. Additionally, it
  79  * is almost never a good idea to set {@code corePoolSize} to zero or
  80  * use {@code allowCoreThreadTimeOut} because this may leave the pool
  81  * without threads to handle tasks once they become eligible to run.
  82  *
  83  * <p><b>Extension notes:</b> This class overrides the
  84  * {@link ThreadPoolExecutor#execute execute} and
  85  * {@link AbstractExecutorService#submit(Runnable) submit}
  86  * methods to generate internal {@link ScheduledFuture} objects to
  87  * control per-task delays and scheduling.  To preserve
  88  * functionality, any further overrides of these methods in
  89  * subclasses must invoke superclass versions, which effectively
  90  * disables additional task customization.  However, this class
  91  * provides alternative protected extension method
  92  * {@code decorateTask} (one version each for {@code Runnable} and
  93  * {@code Callable}) that can be used to customize the concrete task
  94  * types used to execute commands entered via {@code execute},
  95  * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
  96  * and {@code scheduleWithFixedDelay}.  By default, a
  97  * {@code ScheduledThreadPoolExecutor} uses a task type extending
  98  * {@link FutureTask}. However, this may be modified or replaced using
  99  * subclasses of the form:
 100  *
 101  *  <pre> {@code
 102  * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
 103  *
 104  *   static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
 105  *
 106  *   protected <V> RunnableScheduledFuture<V> decorateTask(
 107  *                Runnable r, RunnableScheduledFuture<V> task) {
 108  *       return new CustomTask<V>(r, task);
 109  *   }
 110  *
 111  *   protected <V> RunnableScheduledFuture<V> decorateTask(
 112  *                Callable<V> c, RunnableScheduledFuture<V> task) {
 113  *       return new CustomTask<V>(c, task);
 114  *   }
 115  *   // ... add constructors, etc.
 116  * }}</pre>
 117  *
 118  * @since 1.5
 119  * @author Doug Lea
 120  */
 121 public class ScheduledThreadPoolExecutor
 122         extends ThreadPoolExecutor
 123         implements ScheduledExecutorService {
 124 
 125     /*
 126      * This class specializes ThreadPoolExecutor implementation by
 127      *
 128      * 1. Using a custom task type, ScheduledFutureTask for
 129      *    tasks, even those that don't require scheduling (i.e.,
 130      *    those submitted using ExecutorService execute, not
 131      *    ScheduledExecutorService methods) which are treated as
 132      *    delayed tasks with a delay of zero.
 133      *
 134      * 2. Using a custom queue (DelayedWorkQueue), a variant of
 135      *    unbounded DelayQueue. The lack of capacity constraint and
 136      *    the fact that corePoolSize and maximumPoolSize are
 137      *    effectively identical simplifies some execution mechanics
 138      *    (see delayedExecute) compared to ThreadPoolExecutor.
 139      *
 140      * 3. Supporting optional run-after-shutdown parameters, which
 141      *    leads to overrides of shutdown methods to remove and cancel
 142      *    tasks that should NOT be run after shutdown, as well as
 143      *    different recheck logic when task (re)submission overlaps
 144      *    with a shutdown.
 145      *
 146      * 4. Task decoration methods to allow interception and
 147      *    instrumentation, which are needed because subclasses cannot
 148      *    otherwise override submit methods to get this effect. These
 149      *    don't have any impact on pool control logic though.
 150      */
 151 
 152     /**
 153      * False if should cancel/suppress periodic tasks on shutdown.
 154      */
 155     private volatile boolean continueExistingPeriodicTasksAfterShutdown;
 156 
 157     /**
 158      * False if should cancel non-periodic tasks on shutdown.
 159      */
 160     private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
 161 
 162     /**
 163      * True if ScheduledFutureTask.cancel should remove from queue
 164      */
 165     private volatile boolean removeOnCancel = false;
 166 
 167     /**
 168      * Sequence number to break scheduling ties, and in turn to
 169      * guarantee FIFO order among tied entries.
 170      */
 171     private static final AtomicLong sequencer = new AtomicLong();
 172 
 173     /**
 174      * Returns current nanosecond time.
 175      */
 176     final long now() {
 177         return System.nanoTime();
 178     }
 179 
 180     private class ScheduledFutureTask<V>
 181             extends FutureTask<V> implements RunnableScheduledFuture<V> {
 182 
 183         /** Sequence number to break ties FIFO */
 184         private final long sequenceNumber;
 185 
 186         /** The time the task is enabled to execute in nanoTime units */
 187         private long time;
 188 
 189         /**
 190          * Period in nanoseconds for repeating tasks.  A positive
 191          * value indicates fixed-rate execution.  A negative value
 192          * indicates fixed-delay execution.  A value of 0 indicates a
 193          * non-repeating task.
 194          */
 195         private final long period;
 196 
 197         /** The actual task to be re-enqueued by reExecutePeriodic */
 198         RunnableScheduledFuture<V> outerTask = this;
 199 
 200         /**
 201          * Index into delay queue, to support faster cancellation.
 202          */
 203         int heapIndex;
 204 
 205         /**
 206          * Creates a one-shot action with given nanoTime-based trigger time.
 207          */
 208         ScheduledFutureTask(Runnable r, V result, long ns) {
 209             super(r, result);
 210             this.time = ns;
 211             this.period = 0;
 212             this.sequenceNumber = sequencer.getAndIncrement();
 213         }
 214 
 215         /**
 216          * Creates a periodic action with given nano time and period.
 217          */
 218         ScheduledFutureTask(Runnable r, V result, long ns, long period) {
 219             super(r, result);
 220             this.time = ns;
 221             this.period = period;
 222             this.sequenceNumber = sequencer.getAndIncrement();
 223         }
 224 
 225         /**
 226          * Creates a one-shot action with given nanoTime-based trigger.
 227          */
 228         ScheduledFutureTask(Callable<V> callable, long ns) {
 229             super(callable);
 230             this.time = ns;
 231             this.period = 0;
 232             this.sequenceNumber = sequencer.getAndIncrement();
 233         }
 234 
 235         public long getDelay(TimeUnit unit) {
 236             return unit.convert(time - now(), NANOSECONDS);
 237         }
 238 
 239         public int compareTo(Delayed other) {
 240             if (other == this) // compare zero ONLY if same object
 241                 return 0;
 242             if (other instanceof ScheduledFutureTask) {
 243                 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
 244                 long diff = time - x.time;
 245                 if (diff < 0)
 246                     return -1;
 247                 else if (diff > 0)
 248                     return 1;
 249                 else if (sequenceNumber < x.sequenceNumber)
 250                     return -1;
 251                 else
 252                     return 1;
 253             }
 254             long d = (getDelay(NANOSECONDS) -
 255                       other.getDelay(NANOSECONDS));
 256             return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
 257         }
 258 
 259         /**
 260          * Returns true if this is a periodic (not a one-shot) action.
 261          *
 262          * @return true if periodic
 263          */
 264         public boolean isPeriodic() {
 265             return period != 0;
 266         }
 267 
 268         /**
 269          * Sets the next time to run for a periodic task.
 270          */
 271         private void setNextRunTime() {
 272             long p = period;
 273             if (p > 0)
 274                 time += p;
 275             else
 276                 time = triggerTime(-p);
 277         }
 278 
 279         public boolean cancel(boolean mayInterruptIfRunning) {
 280             boolean cancelled = super.cancel(mayInterruptIfRunning);
 281             if (cancelled && removeOnCancel && heapIndex >= 0)
 282                 remove(this);
 283             return cancelled;
 284         }
 285 
 286         /**
 287          * Overrides FutureTask version so as to reset/requeue if periodic.
 288          */
 289         public void run() {
 290             boolean periodic = isPeriodic();
 291             if (!canRunInCurrentRunState(periodic))
 292                 cancel(false);
 293             else if (!periodic)
 294                 ScheduledFutureTask.super.run();
 295             else if (ScheduledFutureTask.super.runAndReset()) {
 296                 setNextRunTime();
 297                 reExecutePeriodic(outerTask);
 298             }
 299         }
 300     }
 301 
 302     /**
 303      * Returns true if can run a task given current run state
 304      * and run-after-shutdown parameters.
 305      *
 306      * @param periodic true if this task periodic, false if delayed
 307      */
 308     boolean canRunInCurrentRunState(boolean periodic) {
 309         return isRunningOrShutdown(periodic ?
 310                                    continueExistingPeriodicTasksAfterShutdown :
 311                                    executeExistingDelayedTasksAfterShutdown);
 312     }
 313 
 314     /**
 315      * Main execution method for delayed or periodic tasks.  If pool
 316      * is shut down, rejects the task. Otherwise adds task to queue
 317      * and starts a thread, if necessary, to run it.  (We cannot
 318      * prestart the thread to run the task because the task (probably)
 319      * shouldn't be run yet,) If the pool is shut down while the task
 320      * is being added, cancel and remove it if required by state and
 321      * run-after-shutdown parameters.
 322      *
 323      * @param task the task
 324      */
 325     private void delayedExecute(RunnableScheduledFuture<?> task) {
 326         if (isShutdown())
 327             reject(task);
 328         else {
 329             super.getQueue().add(task);
 330             if (isShutdown() &&
 331                 !canRunInCurrentRunState(task.isPeriodic()) &&
 332                 remove(task))
 333                 task.cancel(false);
 334             else
 335                 ensurePrestart();
 336         }
 337     }
 338 
 339     /**
 340      * Requeues a periodic task unless current run state precludes it.
 341      * Same idea as delayedExecute except drops task rather than rejecting.
 342      *
 343      * @param task the task
 344      */
 345     void reExecutePeriodic(RunnableScheduledFuture<?> task) {
 346         if (canRunInCurrentRunState(true)) {
 347             super.getQueue().add(task);
 348             if (!canRunInCurrentRunState(true) && remove(task))
 349                 task.cancel(false);
 350             else
 351                 ensurePrestart();
 352         }
 353     }
 354 
 355     /**
 356      * Cancels and clears the queue of all tasks that should not be run
 357      * due to shutdown policy.  Invoked within super.shutdown.
 358      */
 359     @Override void onShutdown() {
 360         BlockingQueue<Runnable> q = super.getQueue();
 361         boolean keepDelayed =
 362             getExecuteExistingDelayedTasksAfterShutdownPolicy();
 363         boolean keepPeriodic =
 364             getContinueExistingPeriodicTasksAfterShutdownPolicy();
 365         if (!keepDelayed && !keepPeriodic) {
 366             for (Object e : q.toArray())
 367                 if (e instanceof RunnableScheduledFuture<?>)
 368                     ((RunnableScheduledFuture<?>) e).cancel(false);
 369             q.clear();
 370         }
 371         else {
 372             // Traverse snapshot to avoid iterator exceptions
 373             for (Object e : q.toArray()) {
 374                 if (e instanceof RunnableScheduledFuture) {
 375                     RunnableScheduledFuture<?> t =
 376                         (RunnableScheduledFuture<?>)e;
 377                     if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
 378                         t.isCancelled()) { // also remove if already cancelled
 379                         if (q.remove(t))
 380                             t.cancel(false);
 381                     }
 382                 }
 383             }
 384         }
 385         tryTerminate();
 386     }
 387 
 388     /**
 389      * Modifies or replaces the task used to execute a runnable.
 390      * This method can be used to override the concrete
 391      * class used for managing internal tasks.
 392      * The default implementation simply returns the given task.
 393      *
 394      * @param runnable the submitted Runnable
 395      * @param task the task created to execute the runnable
 396      * @return a task that can execute the runnable
 397      * @since 1.6
 398      */
 399     protected <V> RunnableScheduledFuture<V> decorateTask(
 400         Runnable runnable, RunnableScheduledFuture<V> task) {
 401         return task;
 402     }
 403 
 404     /**
 405      * Modifies or replaces the task used to execute a callable.
 406      * This method can be used to override the concrete
 407      * class used for managing internal tasks.
 408      * The default implementation simply returns the given task.
 409      *
 410      * @param callable the submitted Callable
 411      * @param task the task created to execute the callable
 412      * @return a task that can execute the callable
 413      * @since 1.6
 414      */
 415     protected <V> RunnableScheduledFuture<V> decorateTask(
 416         Callable<V> callable, RunnableScheduledFuture<V> task) {
 417         return task;
 418     }
 419 
 420     /**
 421      * Creates a new {@code ScheduledThreadPoolExecutor} with the
 422      * given core pool size.
 423      *
 424      * @param corePoolSize the number of threads to keep in the pool, even
 425      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
 426      * @throws IllegalArgumentException if {@code corePoolSize < 0}
 427      */
 428     public ScheduledThreadPoolExecutor(int corePoolSize) {
 429         super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
 430               new DelayedWorkQueue());
 431     }
 432 
 433     /**
 434      * Creates a new {@code ScheduledThreadPoolExecutor} with the
 435      * given initial parameters.
 436      *
 437      * @param corePoolSize the number of threads to keep in the pool, even
 438      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
 439      * @param threadFactory the factory to use when the executor
 440      *        creates a new thread
 441      * @throws IllegalArgumentException if {@code corePoolSize < 0}
 442      * @throws NullPointerException if {@code threadFactory} is null
 443      */
 444     public ScheduledThreadPoolExecutor(int corePoolSize,
 445                                        ThreadFactory threadFactory) {
 446         super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
 447               new DelayedWorkQueue(), threadFactory);
 448     }
 449 
 450     /**
 451      * Creates a new ScheduledThreadPoolExecutor with the given
 452      * initial parameters.
 453      *
 454      * @param corePoolSize the number of threads to keep in the pool, even
 455      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
 456      * @param handler the handler to use when execution is blocked
 457      *        because the thread bounds and queue capacities are reached
 458      * @throws IllegalArgumentException if {@code corePoolSize < 0}
 459      * @throws NullPointerException if {@code handler} is null
 460      */
 461     public ScheduledThreadPoolExecutor(int corePoolSize,
 462                                        RejectedExecutionHandler handler) {
 463         super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
 464               new DelayedWorkQueue(), handler);
 465     }
 466 
 467     /**
 468      * Creates a new ScheduledThreadPoolExecutor with the given
 469      * initial parameters.
 470      *
 471      * @param corePoolSize the number of threads to keep in the pool, even
 472      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
 473      * @param threadFactory the factory to use when the executor
 474      *        creates a new thread
 475      * @param handler the handler to use when execution is blocked
 476      *        because the thread bounds and queue capacities are reached
 477      * @throws IllegalArgumentException if {@code corePoolSize < 0}
 478      * @throws NullPointerException if {@code threadFactory} or
 479      *         {@code handler} is null
 480      */
 481     public ScheduledThreadPoolExecutor(int corePoolSize,
 482                                        ThreadFactory threadFactory,
 483                                        RejectedExecutionHandler handler) {
 484         super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
 485               new DelayedWorkQueue(), threadFactory, handler);
 486     }
 487 
 488     /**
 489      * Returns the trigger time of a delayed action.
 490      */
 491     private long triggerTime(long delay, TimeUnit unit) {
 492         return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
 493     }
 494 
 495     /**
 496      * Returns the trigger time of a delayed action.
 497      */
 498     long triggerTime(long delay) {
 499         return now() +
 500             ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
 501     }
 502 
 503     /**
 504      * Constrains the values of all delays in the queue to be within
 505      * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
 506      * This may occur if a task is eligible to be dequeued, but has
 507      * not yet been, while some other task is added with a delay of
 508      * Long.MAX_VALUE.
 509      */
 510     private long overflowFree(long delay) {
 511         Delayed head = (Delayed) super.getQueue().peek();
 512         if (head != null) {
 513             long headDelay = head.getDelay(NANOSECONDS);
 514             if (headDelay < 0 && (delay - headDelay < 0))
 515                 delay = Long.MAX_VALUE + headDelay;
 516         }
 517         return delay;
 518     }
 519 
 520     /**
 521      * @throws RejectedExecutionException {@inheritDoc}
 522      * @throws NullPointerException       {@inheritDoc}
 523      */
 524     public ScheduledFuture<?> schedule(Runnable command,
 525                                        long delay,
 526                                        TimeUnit unit) {
 527         if (command == null || unit == null)
 528             throw new NullPointerException();
 529         RunnableScheduledFuture<?> t = decorateTask(command,
 530             new ScheduledFutureTask<Void>(command, null,
 531                                           triggerTime(delay, unit)));
 532         delayedExecute(t);
 533         return t;
 534     }
 535 
 536     /**
 537      * @throws RejectedExecutionException {@inheritDoc}
 538      * @throws NullPointerException       {@inheritDoc}
 539      */
 540     public <V> ScheduledFuture<V> schedule(Callable<V> callable,
 541                                            long delay,
 542                                            TimeUnit unit) {
 543         if (callable == null || unit == null)
 544             throw new NullPointerException();
 545         RunnableScheduledFuture<V> t = decorateTask(callable,
 546             new ScheduledFutureTask<V>(callable,
 547                                        triggerTime(delay, unit)));
 548         delayedExecute(t);
 549         return t;
 550     }
 551 
 552     /**
 553      * @throws RejectedExecutionException {@inheritDoc}
 554      * @throws NullPointerException       {@inheritDoc}
 555      * @throws IllegalArgumentException   {@inheritDoc}
 556      */
 557     public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
 558                                                   long initialDelay,
 559                                                   long period,
 560                                                   TimeUnit unit) {
 561         if (command == null || unit == null)
 562             throw new NullPointerException();
 563         if (period <= 0)
 564             throw new IllegalArgumentException();
 565         ScheduledFutureTask<Void> sft =
 566             new ScheduledFutureTask<Void>(command,
 567                                           null,
 568                                           triggerTime(initialDelay, unit),
 569                                           unit.toNanos(period));
 570         RunnableScheduledFuture<Void> t = decorateTask(command, sft);
 571         sft.outerTask = t;
 572         delayedExecute(t);
 573         return t;
 574     }
 575 
 576     /**
 577      * @throws RejectedExecutionException {@inheritDoc}
 578      * @throws NullPointerException       {@inheritDoc}
 579      * @throws IllegalArgumentException   {@inheritDoc}
 580      */
 581     public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
 582                                                      long initialDelay,
 583                                                      long delay,
 584                                                      TimeUnit unit) {
 585         if (command == null || unit == null)
 586             throw new NullPointerException();
 587         if (delay <= 0)
 588             throw new IllegalArgumentException();
 589         ScheduledFutureTask<Void> sft =
 590             new ScheduledFutureTask<Void>(command,
 591                                           null,
 592                                           triggerTime(initialDelay, unit),
 593                                           unit.toNanos(-delay));
 594         RunnableScheduledFuture<Void> t = decorateTask(command, sft);
 595         sft.outerTask = t;
 596         delayedExecute(t);
 597         return t;
 598     }
 599 
 600     /**
 601      * Executes {@code command} with zero required delay.
 602      * This has effect equivalent to
 603      * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
 604      * Note that inspections of the queue and of the list returned by
 605      * {@code shutdownNow} will access the zero-delayed
 606      * {@link ScheduledFuture}, not the {@code command} itself.
 607      *
 608      * <p>A consequence of the use of {@code ScheduledFuture} objects is
 609      * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
 610      * called with a null second {@code Throwable} argument, even if the
 611      * {@code command} terminated abruptly.  Instead, the {@code Throwable}
 612      * thrown by such a task can be obtained via {@link Future#get}.
 613      *
 614      * @throws RejectedExecutionException at discretion of
 615      *         {@code RejectedExecutionHandler}, if the task
 616      *         cannot be accepted for execution because the
 617      *         executor has been shut down
 618      * @throws NullPointerException {@inheritDoc}
 619      */
 620     public void execute(Runnable command) {
 621         schedule(command, 0, NANOSECONDS);
 622     }
 623 
 624     // Override AbstractExecutorService methods
 625 
 626     /**
 627      * @throws RejectedExecutionException {@inheritDoc}
 628      * @throws NullPointerException       {@inheritDoc}
 629      */
 630     public Future<?> submit(Runnable task) {
 631         return schedule(task, 0, NANOSECONDS);
 632     }
 633 
 634     /**
 635      * @throws RejectedExecutionException {@inheritDoc}
 636      * @throws NullPointerException       {@inheritDoc}
 637      */
 638     public <T> Future<T> submit(Runnable task, T result) {
 639         return schedule(Executors.callable(task, result), 0, NANOSECONDS);
 640     }
 641 
 642     /**
 643      * @throws RejectedExecutionException {@inheritDoc}
 644      * @throws NullPointerException       {@inheritDoc}
 645      */
 646     public <T> Future<T> submit(Callable<T> task) {
 647         return schedule(task, 0, NANOSECONDS);
 648     }
 649 
 650     /**
 651      * Sets the policy on whether to continue executing existing
 652      * periodic tasks even when this executor has been {@code shutdown}.
 653      * In this case, these tasks will only terminate upon
 654      * {@code shutdownNow} or after setting the policy to
 655      * {@code false} when already shutdown.
 656      * This value is by default {@code false}.
 657      *
 658      * @param value if {@code true}, continue after shutdown, else don't.
 659      * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
 660      */
 661     public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
 662         continueExistingPeriodicTasksAfterShutdown = value;
 663         if (!value && isShutdown())
 664             onShutdown();
 665     }
 666 
 667     /**
 668      * Gets the policy on whether to continue executing existing
 669      * periodic tasks even when this executor has been {@code shutdown}.
 670      * In this case, these tasks will only terminate upon
 671      * {@code shutdownNow} or after setting the policy to
 672      * {@code false} when already shutdown.
 673      * This value is by default {@code false}.
 674      *
 675      * @return {@code true} if will continue after shutdown
 676      * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
 677      */
 678     public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
 679         return continueExistingPeriodicTasksAfterShutdown;
 680     }
 681 
 682     /**
 683      * Sets the policy on whether to execute existing delayed
 684      * tasks even when this executor has been {@code shutdown}.
 685      * In this case, these tasks will only terminate upon
 686      * {@code shutdownNow}, or after setting the policy to
 687      * {@code false} when already shutdown.
 688      * This value is by default {@code true}.
 689      *
 690      * @param value if {@code true}, execute after shutdown, else don't.
 691      * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
 692      */
 693     public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
 694         executeExistingDelayedTasksAfterShutdown = value;
 695         if (!value && isShutdown())
 696             onShutdown();
 697     }
 698 
 699     /**
 700      * Gets the policy on whether to execute existing delayed
 701      * tasks even when this executor has been {@code shutdown}.
 702      * In this case, these tasks will only terminate upon
 703      * {@code shutdownNow}, or after setting the policy to
 704      * {@code false} when already shutdown.
 705      * This value is by default {@code true}.
 706      *
 707      * @return {@code true} if will execute after shutdown
 708      * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
 709      */
 710     public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
 711         return executeExistingDelayedTasksAfterShutdown;
 712     }
 713 
 714     /**
 715      * Sets the policy on whether cancelled tasks should be immediately
 716      * removed from the work queue at time of cancellation.  This value is
 717      * by default {@code false}.
 718      *
 719      * @param value if {@code true}, remove on cancellation, else don't
 720      * @see #getRemoveOnCancelPolicy
 721      * @since 1.7
 722      */
 723     public void setRemoveOnCancelPolicy(boolean value) {
 724         removeOnCancel = value;
 725     }
 726 
 727     /**
 728      * Gets the policy on whether cancelled tasks should be immediately
 729      * removed from the work queue at time of cancellation.  This value is
 730      * by default {@code false}.
 731      *
 732      * @return {@code true} if cancelled tasks are immediately removed
 733      *         from the queue
 734      * @see #setRemoveOnCancelPolicy
 735      * @since 1.7
 736      */
 737     public boolean getRemoveOnCancelPolicy() {
 738         return removeOnCancel;
 739     }
 740 
 741     /**
 742      * Initiates an orderly shutdown in which previously submitted
 743      * tasks are executed, but no new tasks will be accepted.
 744      * Invocation has no additional effect if already shut down.
 745      *
 746      * <p>This method does not wait for previously submitted tasks to
 747      * complete execution.  Use {@link #awaitTermination awaitTermination}
 748      * to do that.
 749      *
 750      * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
 751      * has been set {@code false}, existing delayed tasks whose delays
 752      * have not yet elapsed are cancelled.  And unless the {@code
 753      * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
 754      * {@code true}, future executions of existing periodic tasks will
 755      * be cancelled.
 756      *
 757      * @throws SecurityException {@inheritDoc}
 758      */
 759     public void shutdown() {
 760         super.shutdown();
 761     }
 762 
 763     /**
 764      * Attempts to stop all actively executing tasks, halts the
 765      * processing of waiting tasks, and returns a list of the tasks
 766      * that were awaiting execution.
 767      *
 768      * <p>This method does not wait for actively executing tasks to
 769      * terminate.  Use {@link #awaitTermination awaitTermination} to
 770      * do that.
 771      *
 772      * <p>There are no guarantees beyond best-effort attempts to stop
 773      * processing actively executing tasks.  This implementation
 774      * cancels tasks via {@link Thread#interrupt}, so any task that
 775      * fails to respond to interrupts may never terminate.
 776      *
 777      * @return list of tasks that never commenced execution.
 778      *         Each element of this list is a {@link ScheduledFuture},
 779      *         including those tasks submitted using {@code execute},
 780      *         which are for scheduling purposes used as the basis of a
 781      *         zero-delay {@code ScheduledFuture}.
 782      * @throws SecurityException {@inheritDoc}
 783      */
 784     public List<Runnable> shutdownNow() {
 785         return super.shutdownNow();
 786     }
 787 
 788     /**
 789      * Returns the task queue used by this executor.  Each element of
 790      * this queue is a {@link ScheduledFuture}, including those
 791      * tasks submitted using {@code execute} which are for scheduling
 792      * purposes used as the basis of a zero-delay
 793      * {@code ScheduledFuture}.  Iteration over this queue is
 794      * <em>not</em> guaranteed to traverse tasks in the order in
 795      * which they will execute.
 796      *
 797      * @return the task queue
 798      */
 799     public BlockingQueue<Runnable> getQueue() {
 800         return super.getQueue();
 801     }
 802 
 803     /**
 804      * Specialized delay queue. To mesh with TPE declarations, this
 805      * class must be declared as a BlockingQueue<Runnable> even though
 806      * it can only hold RunnableScheduledFutures.
 807      */
 808     static class DelayedWorkQueue extends AbstractQueue<Runnable>
 809         implements BlockingQueue<Runnable> {
 810 
 811         /*
 812          * A DelayedWorkQueue is based on a heap-based data structure
 813          * like those in DelayQueue and PriorityQueue, except that
 814          * every ScheduledFutureTask also records its index into the
 815          * heap array. This eliminates the need to find a task upon
 816          * cancellation, greatly speeding up removal (down from O(n)
 817          * to O(log n)), and reducing garbage retention that would
 818          * otherwise occur by waiting for the element to rise to top
 819          * before clearing. But because the queue may also hold
 820          * RunnableScheduledFutures that are not ScheduledFutureTasks,
 821          * we are not guaranteed to have such indices available, in
 822          * which case we fall back to linear search. (We expect that
 823          * most tasks will not be decorated, and that the faster cases
 824          * will be much more common.)
 825          *
 826          * All heap operations must record index changes -- mainly
 827          * within siftUp and siftDown. Upon removal, a task's
 828          * heapIndex is set to -1. Note that ScheduledFutureTasks can
 829          * appear at most once in the queue (this need not be true for
 830          * other kinds of tasks or work queues), so are uniquely
 831          * identified by heapIndex.
 832          */
 833 
 834         private static final int INITIAL_CAPACITY = 16;
 835         private RunnableScheduledFuture<?>[] queue =
 836             new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
 837         private final ReentrantLock lock = new ReentrantLock();
 838         private int size = 0;
 839 
 840         /**
 841          * Thread designated to wait for the task at the head of the
 842          * queue.  This variant of the Leader-Follower pattern
 843          * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
 844          * minimize unnecessary timed waiting.  When a thread becomes
 845          * the leader, it waits only for the next delay to elapse, but
 846          * other threads await indefinitely.  The leader thread must
 847          * signal some other thread before returning from take() or
 848          * poll(...), unless some other thread becomes leader in the
 849          * interim.  Whenever the head of the queue is replaced with a
 850          * task with an earlier expiration time, the leader field is
 851          * invalidated by being reset to null, and some waiting
 852          * thread, but not necessarily the current leader, is
 853          * signalled.  So waiting threads must be prepared to acquire
 854          * and lose leadership while waiting.
 855          */
 856         private Thread leader = null;
 857 
 858         /**
 859          * Condition signalled when a newer task becomes available at the
 860          * head of the queue or a new thread may need to become leader.
 861          */
 862         private final Condition available = lock.newCondition();
 863 
 864         /**
 865          * Set f's heapIndex if it is a ScheduledFutureTask.
 866          */
 867         private void setIndex(RunnableScheduledFuture<?> f, int idx) {
 868             if (f instanceof ScheduledFutureTask)
 869                 ((ScheduledFutureTask)f).heapIndex = idx;
 870         }
 871 
 872         /**
 873          * Sift element added at bottom up to its heap-ordered spot.
 874          * Call only when holding lock.
 875          */
 876         private void siftUp(int k, RunnableScheduledFuture<?> key) {
 877             while (k > 0) {
 878                 int parent = (k - 1) >>> 1;
 879                 RunnableScheduledFuture<?> e = queue[parent];
 880                 if (key.compareTo(e) >= 0)
 881                     break;
 882                 queue[k] = e;
 883                 setIndex(e, k);
 884                 k = parent;
 885             }
 886             queue[k] = key;
 887             setIndex(key, k);
 888         }
 889 
 890         /**
 891          * Sift element added at top down to its heap-ordered spot.
 892          * Call only when holding lock.
 893          */
 894         private void siftDown(int k, RunnableScheduledFuture<?> key) {
 895             int half = size >>> 1;
 896             while (k < half) {
 897                 int child = (k << 1) + 1;
 898                 RunnableScheduledFuture<?> c = queue[child];
 899                 int right = child + 1;
 900                 if (right < size && c.compareTo(queue[right]) > 0)
 901                     c = queue[child = right];
 902                 if (key.compareTo(c) <= 0)
 903                     break;
 904                 queue[k] = c;
 905                 setIndex(c, k);
 906                 k = child;
 907             }
 908             queue[k] = key;
 909             setIndex(key, k);
 910         }
 911 
 912         /**
 913          * Resize the heap array.  Call only when holding lock.
 914          */
 915         private void grow() {
 916             int oldCapacity = queue.length;
 917             int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
 918             if (newCapacity < 0) // overflow
 919                 newCapacity = Integer.MAX_VALUE;
 920             queue = Arrays.copyOf(queue, newCapacity);
 921         }
 922 
 923         /**
 924          * Find index of given object, or -1 if absent
 925          */
 926         private int indexOf(Object x) {
 927             if (x != null) {
 928                 if (x instanceof ScheduledFutureTask) {
 929                     int i = ((ScheduledFutureTask) x).heapIndex;
 930                     // Sanity check; x could conceivably be a
 931                     // ScheduledFutureTask from some other pool.
 932                     if (i >= 0 && i < size && queue[i] == x)
 933                         return i;
 934                 } else {
 935                     for (int i = 0; i < size; i++)
 936                         if (x.equals(queue[i]))
 937                             return i;
 938                 }
 939             }
 940             return -1;
 941         }
 942 
 943         public boolean contains(Object x) {
 944             final ReentrantLock lock = this.lock;
 945             lock.lock();
 946             try {
 947                 return indexOf(x) != -1;
 948             } finally {
 949                 lock.unlock();
 950             }
 951         }
 952 
 953         public boolean remove(Object x) {
 954             final ReentrantLock lock = this.lock;
 955             lock.lock();
 956             try {
 957                 int i = indexOf(x);
 958                 if (i < 0)
 959                     return false;
 960 
 961                 setIndex(queue[i], -1);
 962                 int s = --size;
 963                 RunnableScheduledFuture<?> replacement = queue[s];
 964                 queue[s] = null;
 965                 if (s != i) {
 966                     siftDown(i, replacement);
 967                     if (queue[i] == replacement)
 968                         siftUp(i, replacement);
 969                 }
 970                 return true;
 971             } finally {
 972                 lock.unlock();
 973             }
 974         }
 975 
 976         public int size() {
 977             final ReentrantLock lock = this.lock;
 978             lock.lock();
 979             try {
 980                 return size;
 981             } finally {
 982                 lock.unlock();
 983             }
 984         }
 985 
 986         public boolean isEmpty() {
 987             return size() == 0;
 988         }
 989 
 990         public int remainingCapacity() {
 991             return Integer.MAX_VALUE;
 992         }
 993 
 994         public RunnableScheduledFuture<?> peek() {
 995             final ReentrantLock lock = this.lock;
 996             lock.lock();
 997             try {
 998                 return queue[0];
 999             } finally {
1000                 lock.unlock();
1001             }
1002         }
1003 
1004         public boolean offer(Runnable x) {
1005             if (x == null)
1006                 throw new NullPointerException();
1007             RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
1008             final ReentrantLock lock = this.lock;
1009             lock.lock();
1010             try {
1011                 int i = size;
1012                 if (i >= queue.length)
1013                     grow();
1014                 size = i + 1;
1015                 if (i == 0) {
1016                     queue[0] = e;
1017                     setIndex(e, 0);
1018                 } else {
1019                     siftUp(i, e);
1020                 }
1021                 if (queue[0] == e) {
1022                     leader = null;
1023                     available.signal();
1024                 }
1025             } finally {
1026                 lock.unlock();
1027             }
1028             return true;
1029         }
1030 
1031         public void put(Runnable e) {
1032             offer(e);
1033         }
1034 
1035         public boolean add(Runnable e) {
1036             return offer(e);
1037         }
1038 
1039         public boolean offer(Runnable e, long timeout, TimeUnit unit) {
1040             return offer(e);
1041         }
1042 
1043         /**
1044          * Performs common bookkeeping for poll and take: Replaces
1045          * first element with last and sifts it down.  Call only when
1046          * holding lock.
1047          * @param f the task to remove and return
1048          */
1049         private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
1050             int s = --size;
1051             RunnableScheduledFuture<?> x = queue[s];
1052             queue[s] = null;
1053             if (s != 0)
1054                 siftDown(0, x);
1055             setIndex(f, -1);
1056             return f;
1057         }
1058 
1059         public RunnableScheduledFuture<?> poll() {
1060             final ReentrantLock lock = this.lock;
1061             lock.lock();
1062             try {
1063                 RunnableScheduledFuture<?> first = queue[0];
1064                 if (first == null || first.getDelay(NANOSECONDS) > 0)
1065                     return null;
1066                 else
1067                     return finishPoll(first);
1068             } finally {
1069                 lock.unlock();
1070             }
1071         }
1072 
1073         public RunnableScheduledFuture<?> take() throws InterruptedException {
1074             final ReentrantLock lock = this.lock;
1075             lock.lockInterruptibly();
1076             try {
1077                 for (;;) {
1078                     RunnableScheduledFuture<?> first = queue[0];
1079                     if (first == null)
1080                         available.await();
1081                     else {
1082                         long delay = first.getDelay(NANOSECONDS);
1083                         if (delay <= 0)
1084                             return finishPoll(first);
1085                         else if (leader != null)
1086                             available.await();
1087                         else {
1088                             Thread thisThread = Thread.currentThread();
1089                             leader = thisThread;
1090                             try {
1091                                 available.awaitNanos(delay);
1092                             } finally {
1093                                 if (leader == thisThread)
1094                                     leader = null;
1095                             }
1096                         }
1097                     }
1098                 }
1099             } finally {
1100                 if (leader == null && queue[0] != null)
1101                     available.signal();
1102                 lock.unlock();
1103             }
1104         }
1105 
1106         public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
1107             throws InterruptedException {
1108             long nanos = unit.toNanos(timeout);
1109             final ReentrantLock lock = this.lock;
1110             lock.lockInterruptibly();
1111             try {
1112                 for (;;) {
1113                     RunnableScheduledFuture<?> first = queue[0];
1114                     if (first == null) {
1115                         if (nanos <= 0)
1116                             return null;
1117                         else
1118                             nanos = available.awaitNanos(nanos);
1119                     } else {
1120                         long delay = first.getDelay(NANOSECONDS);
1121                         if (delay <= 0)
1122                             return finishPoll(first);
1123                         if (nanos <= 0)
1124                             return null;
1125                         if (nanos < delay || leader != null)
1126                             nanos = available.awaitNanos(nanos);
1127                         else {
1128                             Thread thisThread = Thread.currentThread();
1129                             leader = thisThread;
1130                             try {
1131                                 long timeLeft = available.awaitNanos(delay);
1132                                 nanos -= delay - timeLeft;
1133                             } finally {
1134                                 if (leader == thisThread)
1135                                     leader = null;
1136                             }
1137                         }
1138                     }
1139                 }
1140             } finally {
1141                 if (leader == null && queue[0] != null)
1142                     available.signal();
1143                 lock.unlock();
1144             }
1145         }
1146 
1147         public void clear() {
1148             final ReentrantLock lock = this.lock;
1149             lock.lock();
1150             try {
1151                 for (int i = 0; i < size; i++) {
1152                     RunnableScheduledFuture<?> t = queue[i];
1153                     if (t != null) {
1154                         queue[i] = null;
1155                         setIndex(t, -1);
1156                     }
1157                 }
1158                 size = 0;
1159             } finally {
1160                 lock.unlock();
1161             }
1162         }
1163 
1164         /**
1165          * Return and remove first element only if it is expired.
1166          * Used only by drainTo.  Call only when holding lock.
1167          */
1168         private RunnableScheduledFuture<?> pollExpired() {
1169             // assert lock.isHeldByCurrentThread();
1170             RunnableScheduledFuture<?> first = queue[0];
1171             if (first == null || first.getDelay(NANOSECONDS) > 0)
1172                 return null;
1173             return finishPoll(first);
1174         }
1175 
1176         public int drainTo(Collection<? super Runnable> c) {
1177             if (c == null)
1178                 throw new NullPointerException();
1179             if (c == this)
1180                 throw new IllegalArgumentException();
1181             final ReentrantLock lock = this.lock;
1182             lock.lock();
1183             try {
1184                 RunnableScheduledFuture<?> first;
1185                 int n = 0;
1186                 while ((first = pollExpired()) != null) {
1187                     c.add(first);
1188                     ++n;
1189                 }
1190                 return n;
1191             } finally {
1192                 lock.unlock();
1193             }
1194         }
1195 
1196         public int drainTo(Collection<? super Runnable> c, int maxElements) {
1197             if (c == null)
1198                 throw new NullPointerException();
1199             if (c == this)
1200                 throw new IllegalArgumentException();
1201             if (maxElements <= 0)
1202                 return 0;
1203             final ReentrantLock lock = this.lock;
1204             lock.lock();
1205             try {
1206                 RunnableScheduledFuture<?> first;
1207                 int n = 0;
1208                 while (n < maxElements && (first = pollExpired()) != null) {
1209                     c.add(first);
1210                     ++n;
1211                 }
1212                 return n;
1213             } finally {
1214                 lock.unlock();
1215             }
1216         }
1217 
1218         public Object[] toArray() {
1219             final ReentrantLock lock = this.lock;
1220             lock.lock();
1221             try {
1222                 return Arrays.copyOf(queue, size, Object[].class);
1223             } finally {
1224                 lock.unlock();
1225             }
1226         }
1227 
1228         @SuppressWarnings("unchecked")
1229         public <T> T[] toArray(T[] a) {
1230             final ReentrantLock lock = this.lock;
1231             lock.lock();
1232             try {
1233                 if (a.length < size)
1234                     return (T[]) Arrays.copyOf(queue, size, a.getClass());
1235                 System.arraycopy(queue, 0, a, 0, size);
1236                 if (a.length > size)
1237                     a[size] = null;
1238                 return a;
1239             } finally {
1240                 lock.unlock();
1241             }
1242         }
1243 
1244         public Iterator<Runnable> iterator() {
1245             return new Itr(Arrays.copyOf(queue, size));
1246         }
1247 
1248         /**
1249          * Snapshot iterator that works off copy of underlying q array.
1250          */
1251         private class Itr implements Iterator<Runnable> {
1252             final RunnableScheduledFuture[] array;
1253             int cursor = 0;     // index of next element to return
1254             int lastRet = -1;   // index of last element, or -1 if no such
1255 
1256             Itr(RunnableScheduledFuture[] array) {
1257                 this.array = array;
1258             }
1259 
1260             public boolean hasNext() {
1261                 return cursor < array.length;
1262             }
1263 
1264             public Runnable next() {
1265                 if (cursor >= array.length)
1266                     throw new NoSuchElementException();
1267                 lastRet = cursor;
1268                 return array[cursor++];
1269             }
1270 
1271             public void remove() {
1272                 if (lastRet < 0)
1273                     throw new IllegalStateException();
1274                 DelayedWorkQueue.this.remove(array[lastRet]);
1275                 lastRet = -1;
1276             }
1277         }
1278     }
1279 }