1 /*
   2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   3  *
   4  * This code is free software; you can redistribute it and/or modify it
   5  * under the terms of the GNU General Public License version 2 only, as
   6  * published by the Free Software Foundation.  Oracle designates this
   7  * particular file as subject to the "Classpath" exception as provided
   8  * by Oracle in the LICENSE file that accompanied this code.
   9  *
  10  * This code is distributed in the hope that it will be useful, but WITHOUT
  11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  13  * version 2 for more details (a copy is included in the LICENSE file that
  14  * accompanied this code).
  15  *
  16  * You should have received a copy of the GNU General Public License version
  17  * 2 along with this work; if not, write to the Free Software Foundation,
  18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  19  *
  20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  21  * or visit www.oracle.com if you need additional information or have any
  22  * questions.
  23  */
  24 
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/licenses/publicdomain
  34  */
  35 
  36 package java.util.concurrent;
  37 import java.util.concurrent.atomic.*;
  38 import java.util.concurrent.locks.*;
  39 import java.util.*;
  40 
  41 /**
  42  * A {@link ThreadPoolExecutor} that can additionally schedule
  43  * commands to run after a given delay, or to execute
  44  * periodically. This class is preferable to {@link java.util.Timer}
  45  * when multiple worker threads are needed, or when the additional
  46  * flexibility or capabilities of {@link ThreadPoolExecutor} (which
  47  * this class extends) are required.
  48  *
  49  * <p>Delayed tasks execute no sooner than they are enabled, but
  50  * without any real-time guarantees about when, after they are
  51  * enabled, they will commence. Tasks scheduled for exactly the same
  52  * execution time are enabled in first-in-first-out (FIFO) order of
  53  * submission.
  54  *
  55  * <p>When a submitted task is cancelled before it is run, execution
  56  * is suppressed. By default, such a cancelled task is not
  57  * automatically removed from the work queue until its delay
  58  * elapses. While this enables further inspection and monitoring, it
  59  * may also cause unbounded retention of cancelled tasks. To avoid
  60  * this, set {@link #setRemoveOnCancelPolicy} to {@code true}, which
  61  * causes tasks to be immediately removed from the work queue at
  62  * time of cancellation.
  63  *
  64  * <p>Successive executions of a task scheduled via
  65  * <code>scheduleAtFixedRate</code> or
  66  * <code>scheduleWithFixedDelay</code> do not overlap. While different
  67  * executions may be performed by different threads, the effects of
  68  * prior executions <a
  69  * href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
  70  * those of subsequent ones.
  71  *
  72  * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
  73  * of the inherited tuning methods are not useful for it. In
  74  * particular, because it acts as a fixed-sized pool using
  75  * {@code corePoolSize} threads and an unbounded queue, adjustments
  76  * to {@code maximumPoolSize} have no useful effect. Additionally, it
  77  * is almost never a good idea to set {@code corePoolSize} to zero or
  78  * use {@code allowCoreThreadTimeOut} because this may leave the pool
  79  * without threads to handle tasks once they become eligible to run.
  80  *
  81  * <p><b>Extension notes:</b> This class overrides the
  82  * {@link ThreadPoolExecutor#execute execute} and
  83  * {@link AbstractExecutorService#submit(Runnable) submit}
  84  * methods to generate internal {@link ScheduledFuture} objects to
  85  * control per-task delays and scheduling.  To preserve
  86  * functionality, any further overrides of these methods in
  87  * subclasses must invoke superclass versions, which effectively
  88  * disables additional task customization.  However, this class
  89  * provides alternative protected extension method
  90  * {@code decorateTask} (one version each for {@code Runnable} and
  91  * {@code Callable}) that can be used to customize the concrete task
  92  * types used to execute commands entered via {@code execute},
  93  * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
  94  * and {@code scheduleWithFixedDelay}.  By default, a
  95  * {@code ScheduledThreadPoolExecutor} uses a task type extending
  96  * {@link FutureTask}. However, this may be modified or replaced using
  97  * subclasses of the form:
  98  *
  99  *  <pre> {@code
 100  * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
 101  *
 102  *   static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
 103  *
 104  *   protected <V> RunnableScheduledFuture<V> decorateTask(
 105  *                Runnable r, RunnableScheduledFuture<V> task) {
 106  *       return new CustomTask<V>(r, task);
 107  *   }
 108  *
 109  *   protected <V> RunnableScheduledFuture<V> decorateTask(
 110  *                Callable<V> c, RunnableScheduledFuture<V> task) {
 111  *       return new CustomTask<V>(c, task);
 112  *   }
 113  *   // ... add constructors, etc.
 114  * }}</pre>
 115  *
 116  * @since 1.5
 117  * @author Doug Lea
 118  */
 119 public class ScheduledThreadPoolExecutor
 120         extends ThreadPoolExecutor
 121         implements ScheduledExecutorService {
 122 
 123     /*
 124      * This class specializes ThreadPoolExecutor implementation by
 125      *
 126      * 1. Using a custom task type, ScheduledFutureTask for
 127      *    tasks, even those that don't require scheduling (i.e.,
 128      *    those submitted using ExecutorService execute, not
 129      *    ScheduledExecutorService methods) which are treated as
 130      *    delayed tasks with a delay of zero.
 131      *
 132      * 2. Using a custom queue (DelayedWorkQueue), a variant of
 133      *    unbounded DelayQueue. The lack of capacity constraint and
 134      *    the fact that corePoolSize and maximumPoolSize are
 135      *    effectively identical simplifies some execution mechanics
 136      *    (see delayedExecute) compared to ThreadPoolExecutor.
 137      *
 138      * 3. Supporting optional run-after-shutdown parameters, which
 139      *    leads to overrides of shutdown methods to remove and cancel
 140      *    tasks that should NOT be run after shutdown, as well as
 141      *    different recheck logic when task (re)submission overlaps
 142      *    with a shutdown.
 143      *
 144      * 4. Task decoration methods to allow interception and
 145      *    instrumentation, which are needed because subclasses cannot
 146      *    otherwise override submit methods to get this effect. These
 147      *    don't have any impact on pool control logic though.
 148      */
 149 
 150     /**
 151      * False if should cancel/suppress periodic tasks on shutdown.
 152      */
 153     private volatile boolean continueExistingPeriodicTasksAfterShutdown;
 154 
 155     /**
 156      * False if should cancel non-periodic tasks on shutdown.
 157      */
 158     private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
 159 
 160     /**
 161      * True if ScheduledFutureTask.cancel should remove from queue
 162      */
 163     private volatile boolean removeOnCancel = false;
 164 
 165     /**
 166      * Sequence number to break scheduling ties, and in turn to
 167      * guarantee FIFO order among tied entries.
 168      */
 169     private static final AtomicLong sequencer = new AtomicLong(0);
 170 
 171     /**
 172      * Returns current nanosecond time.
 173      */
 174     final long now() {
 175         return System.nanoTime();
 176     }
 177 
 178     private class ScheduledFutureTask<V>
 179             extends FutureTask<V> implements RunnableScheduledFuture<V> {
 180 
 181         /** Sequence number to break ties FIFO */
 182         private final long sequenceNumber;
 183 
 184         /** The time the task is enabled to execute in nanoTime units */
 185         private long time;
 186 
 187         /**
 188          * Period in nanoseconds for repeating tasks.  A positive
 189          * value indicates fixed-rate execution.  A negative value
 190          * indicates fixed-delay execution.  A value of 0 indicates a
 191          * non-repeating task.
 192          */
 193         private final long period;
 194 
 195         /** The actual task to be re-enqueued by reExecutePeriodic */
 196         RunnableScheduledFuture<V> outerTask = this;
 197 
 198         /**
 199          * Index into delay queue, to support faster cancellation.
 200          */
 201         int heapIndex;
 202 
 203         /**
 204          * Creates a one-shot action with given nanoTime-based trigger time.
 205          */
 206         ScheduledFutureTask(Runnable r, V result, long ns) {
 207             super(r, result);
 208             this.time = ns;
 209             this.period = 0;
 210             this.sequenceNumber = sequencer.getAndIncrement();
 211         }
 212 
 213         /**
 214          * Creates a periodic action with given nano time and period.
 215          */
 216         ScheduledFutureTask(Runnable r, V result, long ns, long period) {
 217             super(r, result);
 218             this.time = ns;
 219             this.period = period;
 220             this.sequenceNumber = sequencer.getAndIncrement();
 221         }
 222 
 223         /**
 224          * Creates a one-shot action with given nanoTime-based trigger.
 225          */
 226         ScheduledFutureTask(Callable<V> callable, long ns) {
 227             super(callable);
 228             this.time = ns;
 229             this.period = 0;
 230             this.sequenceNumber = sequencer.getAndIncrement();
 231         }
 232 
 233         public long getDelay(TimeUnit unit) {
 234             return unit.convert(time - now(), TimeUnit.NANOSECONDS);
 235         }
 236 
 237         public int compareTo(Delayed other) {
 238             if (other == this) // compare zero ONLY if same object
 239                 return 0;
 240             if (other instanceof ScheduledFutureTask) {
 241                 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
 242                 long diff = time - x.time;
 243                 if (diff < 0)
 244                     return -1;
 245                 else if (diff > 0)
 246                     return 1;
 247                 else if (sequenceNumber < x.sequenceNumber)
 248                     return -1;
 249                 else
 250                     return 1;
 251             }
 252             long d = (getDelay(TimeUnit.NANOSECONDS) -
 253                       other.getDelay(TimeUnit.NANOSECONDS));
 254             return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
 255         }
 256 
 257         /**
 258          * Returns true if this is a periodic (not a one-shot) action.
 259          *
 260          * @return true if periodic
 261          */
 262         public boolean isPeriodic() {
 263             return period != 0;
 264         }
 265 
 266         /**
 267          * Sets the next time to run for a periodic task.
 268          */
 269         private void setNextRunTime() {
 270             long p = period;
 271             if (p > 0)
 272                 time += p;
 273             else
 274                 time = triggerTime(-p);
 275         }
 276 
 277         public boolean cancel(boolean mayInterruptIfRunning) {
 278             boolean cancelled = super.cancel(mayInterruptIfRunning);
 279             if (cancelled && removeOnCancel && heapIndex >= 0)
 280                 remove(this);
 281             return cancelled;
 282         }
 283 
 284         /**
 285          * Overrides FutureTask version so as to reset/requeue if periodic.
 286          */
 287         public void run() {
 288             boolean periodic = isPeriodic();
 289             if (!canRunInCurrentRunState(periodic))
 290                 cancel(false);
 291             else if (!periodic)
 292                 ScheduledFutureTask.super.run();
 293             else if (ScheduledFutureTask.super.runAndReset()) {
 294                 setNextRunTime();
 295                 reExecutePeriodic(outerTask);
 296             }
 297         }
 298     }
 299 
 300     /**
 301      * Returns true if can run a task given current run state
 302      * and run-after-shutdown parameters.
 303      *
 304      * @param periodic true if this task periodic, false if delayed
 305      */
 306     boolean canRunInCurrentRunState(boolean periodic) {
 307         return isRunningOrShutdown(periodic ?
 308                                    continueExistingPeriodicTasksAfterShutdown :
 309                                    executeExistingDelayedTasksAfterShutdown);
 310     }
 311 
 312     /**
 313      * Main execution method for delayed or periodic tasks.  If pool
 314      * is shut down, rejects the task. Otherwise adds task to queue
 315      * and starts a thread, if necessary, to run it.  (We cannot
 316      * prestart the thread to run the task because the task (probably)
 317      * shouldn't be run yet,) If the pool is shut down while the task
 318      * is being added, cancel and remove it if required by state and
 319      * run-after-shutdown parameters.
 320      *
 321      * @param task the task
 322      */
 323     private void delayedExecute(RunnableScheduledFuture<?> task) {
 324         if (isShutdown())
 325             reject(task);
 326         else {
 327             super.getQueue().add(task);
 328             if (isShutdown() &&
 329                 !canRunInCurrentRunState(task.isPeriodic()) &&
 330                 remove(task))
 331                 task.cancel(false);
 332             else
 333                 prestartCoreThread();
 334         }
 335     }
 336 
 337     /**
 338      * Requeues a periodic task unless current run state precludes it.
 339      * Same idea as delayedExecute except drops task rather than rejecting.
 340      *
 341      * @param task the task
 342      */
 343     void reExecutePeriodic(RunnableScheduledFuture<?> task) {
 344         if (canRunInCurrentRunState(true)) {
 345             super.getQueue().add(task);
 346             if (!canRunInCurrentRunState(true) && remove(task))
 347                 task.cancel(false);
 348             else
 349                 prestartCoreThread();
 350         }
 351     }
 352 
 353     /**
 354      * Cancels and clears the queue of all tasks that should not be run
 355      * due to shutdown policy.  Invoked within super.shutdown.
 356      */
 357     @Override void onShutdown() {
 358         BlockingQueue<Runnable> q = super.getQueue();
 359         boolean keepDelayed =
 360             getExecuteExistingDelayedTasksAfterShutdownPolicy();
 361         boolean keepPeriodic =
 362             getContinueExistingPeriodicTasksAfterShutdownPolicy();
 363         if (!keepDelayed && !keepPeriodic)



 364             q.clear();

 365         else {
 366             // Traverse snapshot to avoid iterator exceptions
 367             for (Object e : q.toArray()) {
 368                 if (e instanceof RunnableScheduledFuture) {
 369                     RunnableScheduledFuture<?> t =
 370                         (RunnableScheduledFuture<?>)e;
 371                     if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
 372                         t.isCancelled()) { // also remove if already cancelled
 373                         if (q.remove(t))
 374                             t.cancel(false);
 375                     }
 376                 }
 377             }
 378         }
 379         tryTerminate();
 380     }
 381 
 382     /**
 383      * Modifies or replaces the task used to execute a runnable.
 384      * This method can be used to override the concrete
 385      * class used for managing internal tasks.
 386      * The default implementation simply returns the given task.
 387      *
 388      * @param runnable the submitted Runnable
 389      * @param task the task created to execute the runnable
 390      * @return a task that can execute the runnable
 391      * @since 1.6
 392      */
 393     protected <V> RunnableScheduledFuture<V> decorateTask(
 394         Runnable runnable, RunnableScheduledFuture<V> task) {
 395         return task;
 396     }
 397 
 398     /**
 399      * Modifies or replaces the task used to execute a callable.
 400      * This method can be used to override the concrete
 401      * class used for managing internal tasks.
 402      * The default implementation simply returns the given task.
 403      *
 404      * @param callable the submitted Callable
 405      * @param task the task created to execute the callable
 406      * @return a task that can execute the callable
 407      * @since 1.6
 408      */
 409     protected <V> RunnableScheduledFuture<V> decorateTask(
 410         Callable<V> callable, RunnableScheduledFuture<V> task) {
 411         return task;
 412     }
 413 
 414     /**
 415      * Creates a new {@code ScheduledThreadPoolExecutor} with the
 416      * given core pool size.
 417      *
 418      * @param corePoolSize the number of threads to keep in the pool, even
 419      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
 420      * @throws IllegalArgumentException if {@code corePoolSize < 0}
 421      */
 422     public ScheduledThreadPoolExecutor(int corePoolSize) {
 423         super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
 424               new DelayedWorkQueue());
 425     }
 426 
 427     /**
 428      * Creates a new {@code ScheduledThreadPoolExecutor} with the
 429      * given initial parameters.
 430      *
 431      * @param corePoolSize the number of threads to keep in the pool, even
 432      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
 433      * @param threadFactory the factory to use when the executor
 434      *        creates a new thread
 435      * @throws IllegalArgumentException if {@code corePoolSize < 0}
 436      * @throws NullPointerException if {@code threadFactory} is null
 437      */
 438     public ScheduledThreadPoolExecutor(int corePoolSize,
 439                              ThreadFactory threadFactory) {
 440         super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
 441               new DelayedWorkQueue(), threadFactory);
 442     }
 443 
 444     /**
 445      * Creates a new ScheduledThreadPoolExecutor with the given
 446      * initial parameters.
 447      *
 448      * @param corePoolSize the number of threads to keep in the pool, even
 449      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
 450      * @param handler the handler to use when execution is blocked
 451      *        because the thread bounds and queue capacities are reached
 452      * @throws IllegalArgumentException if {@code corePoolSize < 0}
 453      * @throws NullPointerException if {@code handler} is null
 454      */
 455     public ScheduledThreadPoolExecutor(int corePoolSize,
 456                               RejectedExecutionHandler handler) {
 457         super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
 458               new DelayedWorkQueue(), handler);
 459     }
 460 
 461     /**
 462      * Creates a new ScheduledThreadPoolExecutor with the given
 463      * initial parameters.
 464      *
 465      * @param corePoolSize the number of threads to keep in the pool, even
 466      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
 467      * @param threadFactory the factory to use when the executor
 468      *        creates a new thread
 469      * @param handler the handler to use when execution is blocked
 470      *        because the thread bounds and queue capacities are reached
 471      * @throws IllegalArgumentException if {@code corePoolSize < 0}
 472      * @throws NullPointerException if {@code threadFactory} or
 473      *         {@code handler} is null
 474      */
 475     public ScheduledThreadPoolExecutor(int corePoolSize,
 476                               ThreadFactory threadFactory,
 477                               RejectedExecutionHandler handler) {
 478         super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
 479               new DelayedWorkQueue(), threadFactory, handler);
 480     }
 481 
 482     /**
 483      * Returns the trigger time of a delayed action.
 484      */
 485     private long triggerTime(long delay, TimeUnit unit) {
 486         return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
 487     }
 488 
 489     /**
 490      * Returns the trigger time of a delayed action.
 491      */
 492     long triggerTime(long delay) {
 493         return now() +
 494             ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
 495     }
 496 
 497     /**
 498      * Constrains the values of all delays in the queue to be within
 499      * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
 500      * This may occur if a task is eligible to be dequeued, but has
 501      * not yet been, while some other task is added with a delay of
 502      * Long.MAX_VALUE.
 503      */
 504     private long overflowFree(long delay) {
 505         Delayed head = (Delayed) super.getQueue().peek();
 506         if (head != null) {
 507             long headDelay = head.getDelay(TimeUnit.NANOSECONDS);
 508             if (headDelay < 0 && (delay - headDelay < 0))
 509                 delay = Long.MAX_VALUE + headDelay;
 510         }
 511         return delay;
 512     }
 513 
 514     /**
 515      * @throws RejectedExecutionException {@inheritDoc}
 516      * @throws NullPointerException       {@inheritDoc}
 517      */
 518     public ScheduledFuture<?> schedule(Runnable command,
 519                                        long delay,
 520                                        TimeUnit unit) {
 521         if (command == null || unit == null)
 522             throw new NullPointerException();
 523         RunnableScheduledFuture<?> t = decorateTask(command,
 524             new ScheduledFutureTask<Void>(command, null,
 525                                           triggerTime(delay, unit)));
 526         delayedExecute(t);
 527         return t;
 528     }
 529 
 530     /**
 531      * @throws RejectedExecutionException {@inheritDoc}
 532      * @throws NullPointerException       {@inheritDoc}
 533      */
 534     public <V> ScheduledFuture<V> schedule(Callable<V> callable,
 535                                            long delay,
 536                                            TimeUnit unit) {
 537         if (callable == null || unit == null)
 538             throw new NullPointerException();
 539         RunnableScheduledFuture<V> t = decorateTask(callable,
 540             new ScheduledFutureTask<V>(callable,
 541                                        triggerTime(delay, unit)));
 542         delayedExecute(t);
 543         return t;
 544     }
 545 
 546     /**
 547      * @throws RejectedExecutionException {@inheritDoc}
 548      * @throws NullPointerException       {@inheritDoc}
 549      * @throws IllegalArgumentException   {@inheritDoc}
 550      */
 551     public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
 552                                                   long initialDelay,
 553                                                   long period,
 554                                                   TimeUnit unit) {
 555         if (command == null || unit == null)
 556             throw new NullPointerException();
 557         if (period <= 0)
 558             throw new IllegalArgumentException();
 559         ScheduledFutureTask<Void> sft =
 560             new ScheduledFutureTask<Void>(command,
 561                                           null,
 562                                           triggerTime(initialDelay, unit),
 563                                           unit.toNanos(period));
 564         RunnableScheduledFuture<Void> t = decorateTask(command, sft);
 565         sft.outerTask = t;
 566         delayedExecute(t);
 567         return t;
 568     }
 569 
 570     /**
 571      * @throws RejectedExecutionException {@inheritDoc}
 572      * @throws NullPointerException       {@inheritDoc}
 573      * @throws IllegalArgumentException   {@inheritDoc}
 574      */
 575     public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
 576                                                      long initialDelay,
 577                                                      long delay,
 578                                                      TimeUnit unit) {
 579         if (command == null || unit == null)
 580             throw new NullPointerException();
 581         if (delay <= 0)
 582             throw new IllegalArgumentException();
 583         ScheduledFutureTask<Void> sft =
 584             new ScheduledFutureTask<Void>(command,
 585                                           null,
 586                                           triggerTime(initialDelay, unit),
 587                                           unit.toNanos(-delay));
 588         RunnableScheduledFuture<Void> t = decorateTask(command, sft);
 589         sft.outerTask = t;
 590         delayedExecute(t);
 591         return t;
 592     }
 593 
 594     /**
 595      * Executes {@code command} with zero required delay.
 596      * This has effect equivalent to
 597      * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
 598      * Note that inspections of the queue and of the list returned by
 599      * {@code shutdownNow} will access the zero-delayed
 600      * {@link ScheduledFuture}, not the {@code command} itself.
 601      *
 602      * <p>A consequence of the use of {@code ScheduledFuture} objects is
 603      * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
 604      * called with a null second {@code Throwable} argument, even if the
 605      * {@code command} terminated abruptly.  Instead, the {@code Throwable}
 606      * thrown by such a task can be obtained via {@link Future#get}.
 607      *
 608      * @throws RejectedExecutionException at discretion of
 609      *         {@code RejectedExecutionHandler}, if the task
 610      *         cannot be accepted for execution because the
 611      *         executor has been shut down
 612      * @throws NullPointerException {@inheritDoc}
 613      */
 614     public void execute(Runnable command) {
 615         schedule(command, 0, TimeUnit.NANOSECONDS);
 616     }
 617 
 618     // Override AbstractExecutorService methods
 619 
 620     /**
 621      * @throws RejectedExecutionException {@inheritDoc}
 622      * @throws NullPointerException       {@inheritDoc}
 623      */
 624     public Future<?> submit(Runnable task) {
 625         return schedule(task, 0, TimeUnit.NANOSECONDS);
 626     }
 627 
 628     /**
 629      * @throws RejectedExecutionException {@inheritDoc}
 630      * @throws NullPointerException       {@inheritDoc}
 631      */
 632     public <T> Future<T> submit(Runnable task, T result) {
 633         return schedule(Executors.callable(task, result),
 634                         0, TimeUnit.NANOSECONDS);
 635     }
 636 
 637     /**
 638      * @throws RejectedExecutionException {@inheritDoc}
 639      * @throws NullPointerException       {@inheritDoc}
 640      */
 641     public <T> Future<T> submit(Callable<T> task) {
 642         return schedule(task, 0, TimeUnit.NANOSECONDS);
 643     }
 644 
 645     /**
 646      * Sets the policy on whether to continue executing existing
 647      * periodic tasks even when this executor has been {@code shutdown}.
 648      * In this case, these tasks will only terminate upon
 649      * {@code shutdownNow} or after setting the policy to
 650      * {@code false} when already shutdown.
 651      * This value is by default {@code false}.
 652      *
 653      * @param value if {@code true}, continue after shutdown, else don't.
 654      * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
 655      */
 656     public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
 657         continueExistingPeriodicTasksAfterShutdown = value;
 658         if (!value && isShutdown())
 659             onShutdown();
 660     }
 661 
 662     /**
 663      * Gets the policy on whether to continue executing existing
 664      * periodic tasks even when this executor has been {@code shutdown}.
 665      * In this case, these tasks will only terminate upon
 666      * {@code shutdownNow} or after setting the policy to
 667      * {@code false} when already shutdown.
 668      * This value is by default {@code false}.
 669      *
 670      * @return {@code true} if will continue after shutdown
 671      * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
 672      */
 673     public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
 674         return continueExistingPeriodicTasksAfterShutdown;
 675     }
 676 
 677     /**
 678      * Sets the policy on whether to execute existing delayed
 679      * tasks even when this executor has been {@code shutdown}.
 680      * In this case, these tasks will only terminate upon
 681      * {@code shutdownNow}, or after setting the policy to
 682      * {@code false} when already shutdown.
 683      * This value is by default {@code true}.
 684      *
 685      * @param value if {@code true}, execute after shutdown, else don't.
 686      * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
 687      */
 688     public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
 689         executeExistingDelayedTasksAfterShutdown = value;
 690         if (!value && isShutdown())
 691             onShutdown();
 692     }
 693 
 694     /**
 695      * Gets the policy on whether to execute existing delayed
 696      * tasks even when this executor has been {@code shutdown}.
 697      * In this case, these tasks will only terminate upon
 698      * {@code shutdownNow}, or after setting the policy to
 699      * {@code false} when already shutdown.
 700      * This value is by default {@code true}.
 701      *
 702      * @return {@code true} if will execute after shutdown
 703      * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
 704      */
 705     public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
 706         return executeExistingDelayedTasksAfterShutdown;
 707     }
 708 
 709     /**
 710      * Sets the policy on whether cancelled tasks should be immediately
 711      * removed from the work queue at time of cancellation.  This value is
 712      * by default {@code false}.
 713      *
 714      * @param value if {@code true}, remove on cancellation, else don't
 715      * @see #getRemoveOnCancelPolicy
 716      * @since 1.7
 717      */
 718     public void setRemoveOnCancelPolicy(boolean value) {
 719         removeOnCancel = value;
 720     }
 721 
 722     /**
 723      * Gets the policy on whether cancelled tasks should be immediately
 724      * removed from the work queue at time of cancellation.  This value is
 725      * by default {@code false}.
 726      *
 727      * @return {@code true} if cancelled tasks are immediately removed
 728      *         from the queue
 729      * @see #setRemoveOnCancelPolicy
 730      * @since 1.7
 731      */
 732     public boolean getRemoveOnCancelPolicy() {
 733         return removeOnCancel;
 734     }
 735 
 736     /**
 737      * Initiates an orderly shutdown in which previously submitted
 738      * tasks are executed, but no new tasks will be accepted.
 739      * Invocation has no additional effect if already shut down.
 740      *
 741      * <p>This method does not wait for previously submitted tasks to
 742      * complete execution.  Use {@link #awaitTermination awaitTermination}
 743      * to do that.
 744      *
 745      * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
 746      * has been set {@code false}, existing delayed tasks whose delays
 747      * have not yet elapsed are cancelled.  And unless the {@code
 748      * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
 749      * {@code true}, future executions of existing periodic tasks will
 750      * be cancelled.
 751      *
 752      * @throws SecurityException {@inheritDoc}
 753      */
 754     public void shutdown() {
 755         super.shutdown();
 756     }
 757 
 758     /**
 759      * Attempts to stop all actively executing tasks, halts the
 760      * processing of waiting tasks, and returns a list of the tasks
 761      * that were awaiting execution.
 762      *
 763      * <p>This method does not wait for actively executing tasks to
 764      * terminate.  Use {@link #awaitTermination awaitTermination} to
 765      * do that.
 766      *
 767      * <p>There are no guarantees beyond best-effort attempts to stop
 768      * processing actively executing tasks.  This implementation
 769      * cancels tasks via {@link Thread#interrupt}, so any task that
 770      * fails to respond to interrupts may never terminate.
 771      *
 772      * @return list of tasks that never commenced execution.
 773      *         Each element of this list is a {@link ScheduledFuture},
 774      *         including those tasks submitted using {@code execute},
 775      *         which are for scheduling purposes used as the basis of a
 776      *         zero-delay {@code ScheduledFuture}.
 777      * @throws SecurityException {@inheritDoc}
 778      */
 779     public List<Runnable> shutdownNow() {
 780         return super.shutdownNow();
 781     }
 782 
 783     /**
 784      * Returns the task queue used by this executor.  Each element of
 785      * this queue is a {@link ScheduledFuture}, including those
 786      * tasks submitted using {@code execute} which are for scheduling
 787      * purposes used as the basis of a zero-delay
 788      * {@code ScheduledFuture}.  Iteration over this queue is
 789      * <em>not</em> guaranteed to traverse tasks in the order in
 790      * which they will execute.
 791      *
 792      * @return the task queue
 793      */
 794     public BlockingQueue<Runnable> getQueue() {
 795         return super.getQueue();
 796     }
 797 
 798     /**
 799      * Specialized delay queue. To mesh with TPE declarations, this
 800      * class must be declared as a BlockingQueue<Runnable> even though
 801      * it can only hold RunnableScheduledFutures.
 802      */
 803     static class DelayedWorkQueue extends AbstractQueue<Runnable>
 804         implements BlockingQueue<Runnable> {
 805 
 806         /*
 807          * A DelayedWorkQueue is based on a heap-based data structure
 808          * like those in DelayQueue and PriorityQueue, except that
 809          * every ScheduledFutureTask also records its index into the
 810          * heap array. This eliminates the need to find a task upon
 811          * cancellation, greatly speeding up removal (down from O(n)
 812          * to O(log n)), and reducing garbage retention that would
 813          * otherwise occur by waiting for the element to rise to top
 814          * before clearing. But because the queue may also hold
 815          * RunnableScheduledFutures that are not ScheduledFutureTasks,
 816          * we are not guaranteed to have such indices available, in
 817          * which case we fall back to linear search. (We expect that
 818          * most tasks will not be decorated, and that the faster cases
 819          * will be much more common.)
 820          *
 821          * All heap operations must record index changes -- mainly
 822          * within siftUp and siftDown. Upon removal, a task's
 823          * heapIndex is set to -1. Note that ScheduledFutureTasks can
 824          * appear at most once in the queue (this need not be true for
 825          * other kinds of tasks or work queues), so are uniquely
 826          * identified by heapIndex.
 827          */
 828 
 829         private static final int INITIAL_CAPACITY = 16;
 830         private RunnableScheduledFuture[] queue =
 831             new RunnableScheduledFuture[INITIAL_CAPACITY];
 832         private final ReentrantLock lock = new ReentrantLock();
 833         private int size = 0;
 834 
 835         /**
 836          * Thread designated to wait for the task at the head of the
 837          * queue.  This variant of the Leader-Follower pattern
 838          * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
 839          * minimize unnecessary timed waiting.  When a thread becomes
 840          * the leader, it waits only for the next delay to elapse, but
 841          * other threads await indefinitely.  The leader thread must
 842          * signal some other thread before returning from take() or
 843          * poll(...), unless some other thread becomes leader in the
 844          * interim.  Whenever the head of the queue is replaced with a
 845          * task with an earlier expiration time, the leader field is
 846          * invalidated by being reset to null, and some waiting
 847          * thread, but not necessarily the current leader, is
 848          * signalled.  So waiting threads must be prepared to acquire
 849          * and lose leadership while waiting.
 850          */
 851         private Thread leader = null;
 852 
 853         /**
 854          * Condition signalled when a newer task becomes available at the
 855          * head of the queue or a new thread may need to become leader.
 856          */
 857         private final Condition available = lock.newCondition();
 858 
 859         /**
 860          * Set f's heapIndex if it is a ScheduledFutureTask.
 861          */
 862         private void setIndex(RunnableScheduledFuture f, int idx) {
 863             if (f instanceof ScheduledFutureTask)
 864                 ((ScheduledFutureTask)f).heapIndex = idx;
 865         }
 866 
 867         /**
 868          * Sift element added at bottom up to its heap-ordered spot.
 869          * Call only when holding lock.
 870          */
 871         private void siftUp(int k, RunnableScheduledFuture key) {
 872             while (k > 0) {
 873                 int parent = (k - 1) >>> 1;
 874                 RunnableScheduledFuture e = queue[parent];
 875                 if (key.compareTo(e) >= 0)
 876                     break;
 877                 queue[k] = e;
 878                 setIndex(e, k);
 879                 k = parent;
 880             }
 881             queue[k] = key;
 882             setIndex(key, k);
 883         }
 884 
 885         /**
 886          * Sift element added at top down to its heap-ordered spot.
 887          * Call only when holding lock.
 888          */
 889         private void siftDown(int k, RunnableScheduledFuture key) {
 890             int half = size >>> 1;
 891             while (k < half) {
 892                 int child = (k << 1) + 1;
 893                 RunnableScheduledFuture c = queue[child];
 894                 int right = child + 1;
 895                 if (right < size && c.compareTo(queue[right]) > 0)
 896                     c = queue[child = right];
 897                 if (key.compareTo(c) <= 0)
 898                     break;
 899                 queue[k] = c;
 900                 setIndex(c, k);
 901                 k = child;
 902             }
 903             queue[k] = key;
 904             setIndex(key, k);
 905         }
 906 
 907         /**
 908          * Resize the heap array.  Call only when holding lock.
 909          */
 910         private void grow() {
 911             int oldCapacity = queue.length;
 912             int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
 913             if (newCapacity < 0) // overflow
 914                 newCapacity = Integer.MAX_VALUE;
 915             queue = Arrays.copyOf(queue, newCapacity);
 916         }
 917 
 918         /**
 919          * Find index of given object, or -1 if absent
 920          */
 921         private int indexOf(Object x) {
 922             if (x != null) {
 923                 if (x instanceof ScheduledFutureTask) {
 924                     int i = ((ScheduledFutureTask) x).heapIndex;
 925                     // Sanity check; x could conceivably be a
 926                     // ScheduledFutureTask from some other pool.
 927                     if (i >= 0 && i < size && queue[i] == x)
 928                         return i;
 929                 } else {
 930                     for (int i = 0; i < size; i++)
 931                         if (x.equals(queue[i]))
 932                             return i;
 933                 }
 934             }
 935             return -1;
 936         }
 937 
 938         public boolean contains(Object x) {
 939             final ReentrantLock lock = this.lock;
 940             lock.lock();
 941             try {
 942                 return indexOf(x) != -1;
 943             } finally {
 944                 lock.unlock();
 945             }
 946         }
 947 
 948         public boolean remove(Object x) {
 949             final ReentrantLock lock = this.lock;
 950             lock.lock();
 951             try {
 952                 int i = indexOf(x);
 953                 if (i < 0)
 954                     return false;
 955 
 956                 setIndex(queue[i], -1);
 957                 int s = --size;
 958                 RunnableScheduledFuture replacement = queue[s];
 959                 queue[s] = null;
 960                 if (s != i) {
 961                     siftDown(i, replacement);
 962                     if (queue[i] == replacement)
 963                         siftUp(i, replacement);
 964                 }
 965                 return true;
 966             } finally {
 967                 lock.unlock();
 968             }
 969         }
 970 
 971         public int size() {
 972             final ReentrantLock lock = this.lock;
 973             lock.lock();
 974             try {
 975                 return size;
 976             } finally {
 977                 lock.unlock();
 978             }
 979         }
 980 
 981         public boolean isEmpty() {
 982             return size() == 0;
 983         }
 984 
 985         public int remainingCapacity() {
 986             return Integer.MAX_VALUE;
 987         }
 988 
 989         public RunnableScheduledFuture peek() {
 990             final ReentrantLock lock = this.lock;
 991             lock.lock();
 992             try {
 993                 return queue[0];
 994             } finally {
 995                 lock.unlock();
 996             }
 997         }
 998 
 999         public boolean offer(Runnable x) {
1000             if (x == null)
1001                 throw new NullPointerException();
1002             RunnableScheduledFuture e = (RunnableScheduledFuture)x;
1003             final ReentrantLock lock = this.lock;
1004             lock.lock();
1005             try {
1006                 int i = size;
1007                 if (i >= queue.length)
1008                     grow();
1009                 size = i + 1;
1010                 if (i == 0) {
1011                     queue[0] = e;
1012                     setIndex(e, 0);
1013                 } else {
1014                     siftUp(i, e);
1015                 }
1016                 if (queue[0] == e) {
1017                     leader = null;
1018                     available.signal();
1019                 }
1020             } finally {
1021                 lock.unlock();
1022             }
1023             return true;
1024         }
1025 
1026         public void put(Runnable e) {
1027             offer(e);
1028         }
1029 
1030         public boolean add(Runnable e) {
1031             return offer(e);
1032         }
1033 
1034         public boolean offer(Runnable e, long timeout, TimeUnit unit) {
1035             return offer(e);
1036         }
1037 
1038         /**
1039          * Performs common bookkeeping for poll and take: Replaces
1040          * first element with last and sifts it down.  Call only when
1041          * holding lock.
1042          * @param f the task to remove and return
1043          */
1044         private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) {
1045             int s = --size;
1046             RunnableScheduledFuture x = queue[s];
1047             queue[s] = null;
1048             if (s != 0)
1049                 siftDown(0, x);
1050             setIndex(f, -1);
1051             return f;
1052         }
1053 
1054         public RunnableScheduledFuture poll() {
1055             final ReentrantLock lock = this.lock;
1056             lock.lock();
1057             try {
1058                 RunnableScheduledFuture first = queue[0];
1059                 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
1060                     return null;
1061                 else
1062                     return finishPoll(first);
1063             } finally {
1064                 lock.unlock();
1065             }
1066         }
1067 
1068         public RunnableScheduledFuture take() throws InterruptedException {
1069             final ReentrantLock lock = this.lock;
1070             lock.lockInterruptibly();
1071             try {
1072                 for (;;) {
1073                     RunnableScheduledFuture first = queue[0];
1074                     if (first == null)
1075                         available.await();
1076                     else {
1077                         long delay = first.getDelay(TimeUnit.NANOSECONDS);
1078                         if (delay <= 0)
1079                             return finishPoll(first);
1080                         else if (leader != null)
1081                             available.await();
1082                         else {
1083                             Thread thisThread = Thread.currentThread();
1084                             leader = thisThread;
1085                             try {
1086                                 available.awaitNanos(delay);
1087                             } finally {
1088                                 if (leader == thisThread)
1089                                     leader = null;
1090                             }
1091                         }
1092                     }
1093                 }
1094             } finally {
1095                 if (leader == null && queue[0] != null)
1096                     available.signal();
1097                 lock.unlock();
1098             }
1099         }
1100 
1101         public RunnableScheduledFuture poll(long timeout, TimeUnit unit)
1102             throws InterruptedException {
1103             long nanos = unit.toNanos(timeout);
1104             final ReentrantLock lock = this.lock;
1105             lock.lockInterruptibly();
1106             try {
1107                 for (;;) {
1108                     RunnableScheduledFuture first = queue[0];
1109                     if (first == null) {
1110                         if (nanos <= 0)
1111                             return null;
1112                         else
1113                             nanos = available.awaitNanos(nanos);
1114                     } else {
1115                         long delay = first.getDelay(TimeUnit.NANOSECONDS);
1116                         if (delay <= 0)
1117                             return finishPoll(first);
1118                         if (nanos <= 0)
1119                             return null;
1120                         if (nanos < delay || leader != null)
1121                             nanos = available.awaitNanos(nanos);
1122                         else {
1123                             Thread thisThread = Thread.currentThread();
1124                             leader = thisThread;
1125                             try {
1126                                 long timeLeft = available.awaitNanos(delay);
1127                                 nanos -= delay - timeLeft;
1128                             } finally {
1129                                 if (leader == thisThread)
1130                                     leader = null;
1131                             }
1132                         }
1133                     }
1134                 }
1135             } finally {
1136                 if (leader == null && queue[0] != null)
1137                     available.signal();
1138                 lock.unlock();
1139             }
1140         }
1141 
1142         public void clear() {
1143             final ReentrantLock lock = this.lock;
1144             lock.lock();
1145             try {
1146                 for (int i = 0; i < size; i++) {
1147                     RunnableScheduledFuture t = queue[i];
1148                     if (t != null) {
1149                         queue[i] = null;
1150                         setIndex(t, -1);
1151                     }
1152                 }
1153                 size = 0;
1154             } finally {
1155                 lock.unlock();
1156             }
1157         }
1158 
1159         /**
1160          * Return and remove first element only if it is expired.
1161          * Used only by drainTo.  Call only when holding lock.
1162          */
1163         private RunnableScheduledFuture pollExpired() {
1164             RunnableScheduledFuture first = queue[0];
1165             if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
1166                 return null;
1167             return finishPoll(first);
1168         }
1169 
1170         public int drainTo(Collection<? super Runnable> c) {
1171             if (c == null)
1172                 throw new NullPointerException();
1173             if (c == this)
1174                 throw new IllegalArgumentException();
1175             final ReentrantLock lock = this.lock;
1176             lock.lock();
1177             try {
1178                 RunnableScheduledFuture first;
1179                 int n = 0;
1180                 while ((first = pollExpired()) != null) {
1181                     c.add(first);
1182                     ++n;
1183                 }
1184                 return n;
1185             } finally {
1186                 lock.unlock();
1187             }
1188         }
1189 
1190         public int drainTo(Collection<? super Runnable> c, int maxElements) {
1191             if (c == null)
1192                 throw new NullPointerException();
1193             if (c == this)
1194                 throw new IllegalArgumentException();
1195             if (maxElements <= 0)
1196                 return 0;
1197             final ReentrantLock lock = this.lock;
1198             lock.lock();
1199             try {
1200                 RunnableScheduledFuture first;
1201                 int n = 0;
1202                 while (n < maxElements && (first = pollExpired()) != null) {
1203                     c.add(first);
1204                     ++n;
1205                 }
1206                 return n;
1207             } finally {
1208                 lock.unlock();
1209             }
1210         }
1211 
1212         public Object[] toArray() {
1213             final ReentrantLock lock = this.lock;
1214             lock.lock();
1215             try {
1216                 return Arrays.copyOf(queue, size, Object[].class);
1217             } finally {
1218                 lock.unlock();
1219             }
1220         }
1221 
1222         @SuppressWarnings("unchecked")
1223         public <T> T[] toArray(T[] a) {
1224             final ReentrantLock lock = this.lock;
1225             lock.lock();
1226             try {
1227                 if (a.length < size)
1228                     return (T[]) Arrays.copyOf(queue, size, a.getClass());
1229                 System.arraycopy(queue, 0, a, 0, size);
1230                 if (a.length > size)
1231                     a[size] = null;
1232                 return a;
1233             } finally {
1234                 lock.unlock();
1235             }
1236         }
1237 
1238         public Iterator<Runnable> iterator() {
1239             return new Itr(Arrays.copyOf(queue, size));
1240         }
1241 
1242         /**
1243          * Snapshot iterator that works off copy of underlying q array.
1244          */
1245         private class Itr implements Iterator<Runnable> {
1246             final RunnableScheduledFuture[] array;
1247             int cursor = 0;     // index of next element to return
1248             int lastRet = -1;   // index of last element, or -1 if no such
1249 
1250             Itr(RunnableScheduledFuture[] array) {
1251                 this.array = array;
1252             }
1253 
1254             public boolean hasNext() {
1255                 return cursor < array.length;
1256             }
1257 
1258             public Runnable next() {
1259                 if (cursor >= array.length)
1260                     throw new NoSuchElementException();
1261                 lastRet = cursor;
1262                 return array[cursor++];
1263             }
1264 
1265             public void remove() {
1266                 if (lastRet < 0)
1267                     throw new IllegalStateException();
1268                 DelayedWorkQueue.this.remove(array[lastRet]);
1269                 lastRet = -1;
1270             }
1271         }
1272     }
1273 }
--- EOF ---