src/share/classes/java/util/concurrent/ScheduledThreadPoolExecutor.java

Print this page




  17  * 2 along with this work; if not, write to the Free Software Foundation,
  18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  19  *
  20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  21  * or visit www.oracle.com if you need additional information or have any
  22  * questions.
  23  */
  24 
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/publicdomain/zero/1.0/
  34  */
  35 
  36 package java.util.concurrent;
  37 import java.util.concurrent.atomic.*;
  38 import java.util.concurrent.locks.*;


  39 import java.util.*;
  40 
  41 /**
  42  * A {@link ThreadPoolExecutor} that can additionally schedule
  43  * commands to run after a given delay, or to execute
  44  * periodically. This class is preferable to {@link java.util.Timer}
  45  * when multiple worker threads are needed, or when the additional
  46  * flexibility or capabilities of {@link ThreadPoolExecutor} (which
  47  * this class extends) are required.
  48  *
  49  * <p>Delayed tasks execute no sooner than they are enabled, but
  50  * without any real-time guarantees about when, after they are
  51  * enabled, they will commence. Tasks scheduled for exactly the same
  52  * execution time are enabled in first-in-first-out (FIFO) order of
  53  * submission.
  54  *
  55  * <p>When a submitted task is cancelled before it is run, execution
  56  * is suppressed. By default, such a cancelled task is not
  57  * automatically removed from the work queue until its delay
  58  * elapses. While this enables further inspection and monitoring, it


 149 
 150     /**
 151      * False if should cancel/suppress periodic tasks on shutdown.
 152      */
 153     private volatile boolean continueExistingPeriodicTasksAfterShutdown;
 154 
 155     /**
 156      * False if should cancel non-periodic tasks on shutdown.
 157      */
 158     private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
 159 
 160     /**
 161      * True if ScheduledFutureTask.cancel should remove from queue
 162      */
 163     private volatile boolean removeOnCancel = false;
 164 
 165     /**
 166      * Sequence number to break scheduling ties, and in turn to
 167      * guarantee FIFO order among tied entries.
 168      */
 169     private static final AtomicLong sequencer = new AtomicLong(0);
 170 
 171     /**
 172      * Returns current nanosecond time.
 173      */
 174     final long now() {
 175         return System.nanoTime();
 176     }
 177 
 178     private class ScheduledFutureTask<V>
 179             extends FutureTask<V> implements RunnableScheduledFuture<V> {
 180 
 181         /** Sequence number to break ties FIFO */
 182         private final long sequenceNumber;
 183 
 184         /** The time the task is enabled to execute in nanoTime units */
 185         private long time;
 186 
 187         /**
 188          * Period in nanoseconds for repeating tasks.  A positive
 189          * value indicates fixed-rate execution.  A negative value


 214          * Creates a periodic action with given nano time and period.
 215          */
 216         ScheduledFutureTask(Runnable r, V result, long ns, long period) {
 217             super(r, result);
 218             this.time = ns;
 219             this.period = period;
 220             this.sequenceNumber = sequencer.getAndIncrement();
 221         }
 222 
 223         /**
 224          * Creates a one-shot action with given nanoTime-based trigger.
 225          */
 226         ScheduledFutureTask(Callable<V> callable, long ns) {
 227             super(callable);
 228             this.time = ns;
 229             this.period = 0;
 230             this.sequenceNumber = sequencer.getAndIncrement();
 231         }
 232 
 233         public long getDelay(TimeUnit unit) {
 234             return unit.convert(time - now(), TimeUnit.NANOSECONDS);
 235         }
 236 
 237         public int compareTo(Delayed other) {
 238             if (other == this) // compare zero ONLY if same object
 239                 return 0;
 240             if (other instanceof ScheduledFutureTask) {
 241                 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
 242                 long diff = time - x.time;
 243                 if (diff < 0)
 244                     return -1;
 245                 else if (diff > 0)
 246                     return 1;
 247                 else if (sequenceNumber < x.sequenceNumber)
 248                     return -1;
 249                 else
 250                     return 1;
 251             }
 252             long d = (getDelay(TimeUnit.NANOSECONDS) -
 253                       other.getDelay(TimeUnit.NANOSECONDS));
 254             return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
 255         }
 256 
 257         /**
 258          * Returns true if this is a periodic (not a one-shot) action.
 259          *
 260          * @return true if periodic
 261          */
 262         public boolean isPeriodic() {
 263             return period != 0;
 264         }
 265 
 266         /**
 267          * Sets the next time to run for a periodic task.
 268          */
 269         private void setNextRunTime() {
 270             long p = period;
 271             if (p > 0)
 272                 time += p;
 273             else


 407      *
 408      * @param callable the submitted Callable
 409      * @param task the task created to execute the callable
 410      * @return a task that can execute the callable
 411      * @since 1.6
 412      */
 413     protected <V> RunnableScheduledFuture<V> decorateTask(
 414         Callable<V> callable, RunnableScheduledFuture<V> task) {
 415         return task;
 416     }
 417 
 418     /**
 419      * Creates a new {@code ScheduledThreadPoolExecutor} with the
 420      * given core pool size.
 421      *
 422      * @param corePoolSize the number of threads to keep in the pool, even
 423      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
 424      * @throws IllegalArgumentException if {@code corePoolSize < 0}
 425      */
 426     public ScheduledThreadPoolExecutor(int corePoolSize) {
 427         super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
 428               new DelayedWorkQueue());
 429     }
 430 
 431     /**
 432      * Creates a new {@code ScheduledThreadPoolExecutor} with the
 433      * given initial parameters.
 434      *
 435      * @param corePoolSize the number of threads to keep in the pool, even
 436      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
 437      * @param threadFactory the factory to use when the executor
 438      *        creates a new thread
 439      * @throws IllegalArgumentException if {@code corePoolSize < 0}
 440      * @throws NullPointerException if {@code threadFactory} is null
 441      */
 442     public ScheduledThreadPoolExecutor(int corePoolSize,
 443                                        ThreadFactory threadFactory) {
 444         super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
 445               new DelayedWorkQueue(), threadFactory);
 446     }
 447 
 448     /**
 449      * Creates a new ScheduledThreadPoolExecutor with the given
 450      * initial parameters.
 451      *
 452      * @param corePoolSize the number of threads to keep in the pool, even
 453      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
 454      * @param handler the handler to use when execution is blocked
 455      *        because the thread bounds and queue capacities are reached
 456      * @throws IllegalArgumentException if {@code corePoolSize < 0}
 457      * @throws NullPointerException if {@code handler} is null
 458      */
 459     public ScheduledThreadPoolExecutor(int corePoolSize,
 460                                        RejectedExecutionHandler handler) {
 461         super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
 462               new DelayedWorkQueue(), handler);
 463     }
 464 
 465     /**
 466      * Creates a new ScheduledThreadPoolExecutor with the given
 467      * initial parameters.
 468      *
 469      * @param corePoolSize the number of threads to keep in the pool, even
 470      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
 471      * @param threadFactory the factory to use when the executor
 472      *        creates a new thread
 473      * @param handler the handler to use when execution is blocked
 474      *        because the thread bounds and queue capacities are reached
 475      * @throws IllegalArgumentException if {@code corePoolSize < 0}
 476      * @throws NullPointerException if {@code threadFactory} or
 477      *         {@code handler} is null
 478      */
 479     public ScheduledThreadPoolExecutor(int corePoolSize,
 480                                        ThreadFactory threadFactory,
 481                                        RejectedExecutionHandler handler) {
 482         super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
 483               new DelayedWorkQueue(), threadFactory, handler);
 484     }
 485 
 486     /**
 487      * Returns the trigger time of a delayed action.
 488      */
 489     private long triggerTime(long delay, TimeUnit unit) {
 490         return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
 491     }
 492 
 493     /**
 494      * Returns the trigger time of a delayed action.
 495      */
 496     long triggerTime(long delay) {
 497         return now() +
 498             ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
 499     }
 500 
 501     /**
 502      * Constrains the values of all delays in the queue to be within
 503      * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
 504      * This may occur if a task is eligible to be dequeued, but has
 505      * not yet been, while some other task is added with a delay of
 506      * Long.MAX_VALUE.
 507      */
 508     private long overflowFree(long delay) {
 509         Delayed head = (Delayed) super.getQueue().peek();
 510         if (head != null) {
 511             long headDelay = head.getDelay(TimeUnit.NANOSECONDS);
 512             if (headDelay < 0 && (delay - headDelay < 0))
 513                 delay = Long.MAX_VALUE + headDelay;
 514         }
 515         return delay;
 516     }
 517 
 518     /**
 519      * @throws RejectedExecutionException {@inheritDoc}
 520      * @throws NullPointerException       {@inheritDoc}
 521      */
 522     public ScheduledFuture<?> schedule(Runnable command,
 523                                        long delay,
 524                                        TimeUnit unit) {
 525         if (command == null || unit == null)
 526             throw new NullPointerException();
 527         RunnableScheduledFuture<?> t = decorateTask(command,
 528             new ScheduledFutureTask<Void>(command, null,
 529                                           triggerTime(delay, unit)));
 530         delayedExecute(t);
 531         return t;


 599      * Executes {@code command} with zero required delay.
 600      * This has effect equivalent to
 601      * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
 602      * Note that inspections of the queue and of the list returned by
 603      * {@code shutdownNow} will access the zero-delayed
 604      * {@link ScheduledFuture}, not the {@code command} itself.
 605      *
 606      * <p>A consequence of the use of {@code ScheduledFuture} objects is
 607      * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
 608      * called with a null second {@code Throwable} argument, even if the
 609      * {@code command} terminated abruptly.  Instead, the {@code Throwable}
 610      * thrown by such a task can be obtained via {@link Future#get}.
 611      *
 612      * @throws RejectedExecutionException at discretion of
 613      *         {@code RejectedExecutionHandler}, if the task
 614      *         cannot be accepted for execution because the
 615      *         executor has been shut down
 616      * @throws NullPointerException {@inheritDoc}
 617      */
 618     public void execute(Runnable command) {
 619         schedule(command, 0, TimeUnit.NANOSECONDS);
 620     }
 621 
 622     // Override AbstractExecutorService methods
 623 
 624     /**
 625      * @throws RejectedExecutionException {@inheritDoc}
 626      * @throws NullPointerException       {@inheritDoc}
 627      */
 628     public Future<?> submit(Runnable task) {
 629         return schedule(task, 0, TimeUnit.NANOSECONDS);
 630     }
 631 
 632     /**
 633      * @throws RejectedExecutionException {@inheritDoc}
 634      * @throws NullPointerException       {@inheritDoc}
 635      */
 636     public <T> Future<T> submit(Runnable task, T result) {
 637         return schedule(Executors.callable(task, result),
 638                         0, TimeUnit.NANOSECONDS);
 639     }
 640 
 641     /**
 642      * @throws RejectedExecutionException {@inheritDoc}
 643      * @throws NullPointerException       {@inheritDoc}
 644      */
 645     public <T> Future<T> submit(Callable<T> task) {
 646         return schedule(task, 0, TimeUnit.NANOSECONDS);
 647     }
 648 
 649     /**
 650      * Sets the policy on whether to continue executing existing
 651      * periodic tasks even when this executor has been {@code shutdown}.
 652      * In this case, these tasks will only terminate upon
 653      * {@code shutdownNow} or after setting the policy to
 654      * {@code false} when already shutdown.
 655      * This value is by default {@code false}.
 656      *
 657      * @param value if {@code true}, continue after shutdown, else don't.
 658      * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
 659      */
 660     public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
 661         continueExistingPeriodicTasksAfterShutdown = value;
 662         if (!value && isShutdown())
 663             onShutdown();
 664     }
 665 
 666     /**


 814          * heap array. This eliminates the need to find a task upon
 815          * cancellation, greatly speeding up removal (down from O(n)
 816          * to O(log n)), and reducing garbage retention that would
 817          * otherwise occur by waiting for the element to rise to top
 818          * before clearing. But because the queue may also hold
 819          * RunnableScheduledFutures that are not ScheduledFutureTasks,
 820          * we are not guaranteed to have such indices available, in
 821          * which case we fall back to linear search. (We expect that
 822          * most tasks will not be decorated, and that the faster cases
 823          * will be much more common.)
 824          *
 825          * All heap operations must record index changes -- mainly
 826          * within siftUp and siftDown. Upon removal, a task's
 827          * heapIndex is set to -1. Note that ScheduledFutureTasks can
 828          * appear at most once in the queue (this need not be true for
 829          * other kinds of tasks or work queues), so are uniquely
 830          * identified by heapIndex.
 831          */
 832 
 833         private static final int INITIAL_CAPACITY = 16;
 834         private RunnableScheduledFuture[] queue =
 835             new RunnableScheduledFuture[INITIAL_CAPACITY];
 836         private final ReentrantLock lock = new ReentrantLock();
 837         private int size = 0;
 838 
 839         /**
 840          * Thread designated to wait for the task at the head of the
 841          * queue.  This variant of the Leader-Follower pattern
 842          * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
 843          * minimize unnecessary timed waiting.  When a thread becomes
 844          * the leader, it waits only for the next delay to elapse, but
 845          * other threads await indefinitely.  The leader thread must
 846          * signal some other thread before returning from take() or
 847          * poll(...), unless some other thread becomes leader in the
 848          * interim.  Whenever the head of the queue is replaced with a
 849          * task with an earlier expiration time, the leader field is
 850          * invalidated by being reset to null, and some waiting
 851          * thread, but not necessarily the current leader, is
 852          * signalled.  So waiting threads must be prepared to acquire
 853          * and lose leadership while waiting.
 854          */
 855         private Thread leader = null;
 856 
 857         /**
 858          * Condition signalled when a newer task becomes available at the
 859          * head of the queue or a new thread may need to become leader.
 860          */
 861         private final Condition available = lock.newCondition();
 862 
 863         /**
 864          * Set f's heapIndex if it is a ScheduledFutureTask.
 865          */
 866         private void setIndex(RunnableScheduledFuture f, int idx) {
 867             if (f instanceof ScheduledFutureTask)
 868                 ((ScheduledFutureTask)f).heapIndex = idx;
 869         }
 870 
 871         /**
 872          * Sift element added at bottom up to its heap-ordered spot.
 873          * Call only when holding lock.
 874          */
 875         private void siftUp(int k, RunnableScheduledFuture key) {
 876             while (k > 0) {
 877                 int parent = (k - 1) >>> 1;
 878                 RunnableScheduledFuture e = queue[parent];
 879                 if (key.compareTo(e) >= 0)
 880                     break;
 881                 queue[k] = e;
 882                 setIndex(e, k);
 883                 k = parent;
 884             }
 885             queue[k] = key;
 886             setIndex(key, k);
 887         }
 888 
 889         /**
 890          * Sift element added at top down to its heap-ordered spot.
 891          * Call only when holding lock.
 892          */
 893         private void siftDown(int k, RunnableScheduledFuture key) {
 894             int half = size >>> 1;
 895             while (k < half) {
 896                 int child = (k << 1) + 1;
 897                 RunnableScheduledFuture c = queue[child];
 898                 int right = child + 1;
 899                 if (right < size && c.compareTo(queue[right]) > 0)
 900                     c = queue[child = right];
 901                 if (key.compareTo(c) <= 0)
 902                     break;
 903                 queue[k] = c;
 904                 setIndex(c, k);
 905                 k = child;
 906             }
 907             queue[k] = key;
 908             setIndex(key, k);
 909         }
 910 
 911         /**
 912          * Resize the heap array.  Call only when holding lock.
 913          */
 914         private void grow() {
 915             int oldCapacity = queue.length;
 916             int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
 917             if (newCapacity < 0) // overflow


 942         public boolean contains(Object x) {
 943             final ReentrantLock lock = this.lock;
 944             lock.lock();
 945             try {
 946                 return indexOf(x) != -1;
 947             } finally {
 948                 lock.unlock();
 949             }
 950         }
 951 
 952         public boolean remove(Object x) {
 953             final ReentrantLock lock = this.lock;
 954             lock.lock();
 955             try {
 956                 int i = indexOf(x);
 957                 if (i < 0)
 958                     return false;
 959 
 960                 setIndex(queue[i], -1);
 961                 int s = --size;
 962                 RunnableScheduledFuture replacement = queue[s];
 963                 queue[s] = null;
 964                 if (s != i) {
 965                     siftDown(i, replacement);
 966                     if (queue[i] == replacement)
 967                         siftUp(i, replacement);
 968                 }
 969                 return true;
 970             } finally {
 971                 lock.unlock();
 972             }
 973         }
 974 
 975         public int size() {
 976             final ReentrantLock lock = this.lock;
 977             lock.lock();
 978             try {
 979                 return size;
 980             } finally {
 981                 lock.unlock();
 982             }
 983         }
 984 
 985         public boolean isEmpty() {
 986             return size() == 0;
 987         }
 988 
 989         public int remainingCapacity() {
 990             return Integer.MAX_VALUE;
 991         }
 992 
 993         public RunnableScheduledFuture peek() {
 994             final ReentrantLock lock = this.lock;
 995             lock.lock();
 996             try {
 997                 return queue[0];
 998             } finally {
 999                 lock.unlock();
1000             }
1001         }
1002 
1003         public boolean offer(Runnable x) {
1004             if (x == null)
1005                 throw new NullPointerException();
1006             RunnableScheduledFuture e = (RunnableScheduledFuture)x;
1007             final ReentrantLock lock = this.lock;
1008             lock.lock();
1009             try {
1010                 int i = size;
1011                 if (i >= queue.length)
1012                     grow();
1013                 size = i + 1;
1014                 if (i == 0) {
1015                     queue[0] = e;
1016                     setIndex(e, 0);
1017                 } else {
1018                     siftUp(i, e);
1019                 }
1020                 if (queue[0] == e) {
1021                     leader = null;
1022                     available.signal();
1023                 }
1024             } finally {
1025                 lock.unlock();
1026             }


1028         }
1029 
1030         public void put(Runnable e) {
1031             offer(e);
1032         }
1033 
1034         public boolean add(Runnable e) {
1035             return offer(e);
1036         }
1037 
1038         public boolean offer(Runnable e, long timeout, TimeUnit unit) {
1039             return offer(e);
1040         }
1041 
1042         /**
1043          * Performs common bookkeeping for poll and take: Replaces
1044          * first element with last and sifts it down.  Call only when
1045          * holding lock.
1046          * @param f the task to remove and return
1047          */
1048         private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) {
1049             int s = --size;
1050             RunnableScheduledFuture x = queue[s];
1051             queue[s] = null;
1052             if (s != 0)
1053                 siftDown(0, x);
1054             setIndex(f, -1);
1055             return f;
1056         }
1057 
1058         public RunnableScheduledFuture poll() {
1059             final ReentrantLock lock = this.lock;
1060             lock.lock();
1061             try {
1062                 RunnableScheduledFuture first = queue[0];
1063                 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
1064                     return null;
1065                 else
1066                     return finishPoll(first);
1067             } finally {
1068                 lock.unlock();
1069             }
1070         }
1071 
1072         public RunnableScheduledFuture take() throws InterruptedException {
1073             final ReentrantLock lock = this.lock;
1074             lock.lockInterruptibly();
1075             try {
1076                 for (;;) {
1077                     RunnableScheduledFuture first = queue[0];
1078                     if (first == null)
1079                         available.await();
1080                     else {
1081                         long delay = first.getDelay(TimeUnit.NANOSECONDS);
1082                         if (delay <= 0)
1083                             return finishPoll(first);
1084                         else if (leader != null)
1085                             available.await();
1086                         else {
1087                             Thread thisThread = Thread.currentThread();
1088                             leader = thisThread;
1089                             try {
1090                                 available.awaitNanos(delay);
1091                             } finally {
1092                                 if (leader == thisThread)
1093                                     leader = null;
1094                             }
1095                         }
1096                     }
1097                 }
1098             } finally {
1099                 if (leader == null && queue[0] != null)
1100                     available.signal();
1101                 lock.unlock();
1102             }
1103         }
1104 
1105         public RunnableScheduledFuture poll(long timeout, TimeUnit unit)
1106             throws InterruptedException {
1107             long nanos = unit.toNanos(timeout);
1108             final ReentrantLock lock = this.lock;
1109             lock.lockInterruptibly();
1110             try {
1111                 for (;;) {
1112                     RunnableScheduledFuture first = queue[0];
1113                     if (first == null) {
1114                         if (nanos <= 0)
1115                             return null;
1116                         else
1117                             nanos = available.awaitNanos(nanos);
1118                     } else {
1119                         long delay = first.getDelay(TimeUnit.NANOSECONDS);
1120                         if (delay <= 0)
1121                             return finishPoll(first);
1122                         if (nanos <= 0)
1123                             return null;
1124                         if (nanos < delay || leader != null)
1125                             nanos = available.awaitNanos(nanos);
1126                         else {
1127                             Thread thisThread = Thread.currentThread();
1128                             leader = thisThread;
1129                             try {
1130                                 long timeLeft = available.awaitNanos(delay);
1131                                 nanos -= delay - timeLeft;
1132                             } finally {
1133                                 if (leader == thisThread)
1134                                     leader = null;
1135                             }
1136                         }
1137                     }
1138                 }
1139             } finally {
1140                 if (leader == null && queue[0] != null)
1141                     available.signal();
1142                 lock.unlock();
1143             }
1144         }
1145 
1146         public void clear() {
1147             final ReentrantLock lock = this.lock;
1148             lock.lock();
1149             try {
1150                 for (int i = 0; i < size; i++) {
1151                     RunnableScheduledFuture t = queue[i];
1152                     if (t != null) {
1153                         queue[i] = null;
1154                         setIndex(t, -1);
1155                     }
1156                 }
1157                 size = 0;
1158             } finally {
1159                 lock.unlock();
1160             }
1161         }
1162 
1163         /**
1164          * Return and remove first element only if it is expired.
1165          * Used only by drainTo.  Call only when holding lock.
1166          */
1167         private RunnableScheduledFuture pollExpired() {
1168             RunnableScheduledFuture first = queue[0];
1169             if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)

1170                 return null;
1171             return finishPoll(first);
1172         }
1173 
1174         public int drainTo(Collection<? super Runnable> c) {
1175             if (c == null)
1176                 throw new NullPointerException();
1177             if (c == this)
1178                 throw new IllegalArgumentException();
1179             final ReentrantLock lock = this.lock;
1180             lock.lock();
1181             try {
1182                 RunnableScheduledFuture first;
1183                 int n = 0;
1184                 while ((first = pollExpired()) != null) {
1185                     c.add(first);
1186                     ++n;
1187                 }
1188                 return n;
1189             } finally {
1190                 lock.unlock();
1191             }
1192         }
1193 
1194         public int drainTo(Collection<? super Runnable> c, int maxElements) {
1195             if (c == null)
1196                 throw new NullPointerException();
1197             if (c == this)
1198                 throw new IllegalArgumentException();
1199             if (maxElements <= 0)
1200                 return 0;
1201             final ReentrantLock lock = this.lock;
1202             lock.lock();
1203             try {
1204                 RunnableScheduledFuture first;
1205                 int n = 0;
1206                 while (n < maxElements && (first = pollExpired()) != null) {
1207                     c.add(first);
1208                     ++n;
1209                 }
1210                 return n;
1211             } finally {
1212                 lock.unlock();
1213             }
1214         }
1215 
1216         public Object[] toArray() {
1217             final ReentrantLock lock = this.lock;
1218             lock.lock();
1219             try {
1220                 return Arrays.copyOf(queue, size, Object[].class);
1221             } finally {
1222                 lock.unlock();
1223             }
1224         }




  17  * 2 along with this work; if not, write to the Free Software Foundation,
  18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  19  *
  20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  21  * or visit www.oracle.com if you need additional information or have any
  22  * questions.
  23  */
  24 
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/publicdomain/zero/1.0/
  34  */
  35 
  36 package java.util.concurrent;
  37 import static java.util.concurrent.TimeUnit.NANOSECONDS;
  38 import java.util.concurrent.atomic.AtomicLong;
  39 import java.util.concurrent.locks.Condition;
  40 import java.util.concurrent.locks.ReentrantLock;
  41 import java.util.*;
  42 
  43 /**
  44  * A {@link ThreadPoolExecutor} that can additionally schedule
  45  * commands to run after a given delay, or to execute
  46  * periodically. This class is preferable to {@link java.util.Timer}
  47  * when multiple worker threads are needed, or when the additional
  48  * flexibility or capabilities of {@link ThreadPoolExecutor} (which
  49  * this class extends) are required.
  50  *
  51  * <p>Delayed tasks execute no sooner than they are enabled, but
  52  * without any real-time guarantees about when, after they are
  53  * enabled, they will commence. Tasks scheduled for exactly the same
  54  * execution time are enabled in first-in-first-out (FIFO) order of
  55  * submission.
  56  *
  57  * <p>When a submitted task is cancelled before it is run, execution
  58  * is suppressed. By default, such a cancelled task is not
  59  * automatically removed from the work queue until its delay
  60  * elapses. While this enables further inspection and monitoring, it


 151 
 152     /**
 153      * False if should cancel/suppress periodic tasks on shutdown.
 154      */
 155     private volatile boolean continueExistingPeriodicTasksAfterShutdown;
 156 
 157     /**
 158      * False if should cancel non-periodic tasks on shutdown.
 159      */
 160     private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
 161 
 162     /**
 163      * True if ScheduledFutureTask.cancel should remove from queue
 164      */
 165     private volatile boolean removeOnCancel = false;
 166 
 167     /**
 168      * Sequence number to break scheduling ties, and in turn to
 169      * guarantee FIFO order among tied entries.
 170      */
 171     private static final AtomicLong sequencer = new AtomicLong();
 172 
 173     /**
 174      * Returns current nanosecond time.
 175      */
 176     final long now() {
 177         return System.nanoTime();
 178     }
 179 
 180     private class ScheduledFutureTask<V>
 181             extends FutureTask<V> implements RunnableScheduledFuture<V> {
 182 
 183         /** Sequence number to break ties FIFO */
 184         private final long sequenceNumber;
 185 
 186         /** The time the task is enabled to execute in nanoTime units */
 187         private long time;
 188 
 189         /**
 190          * Period in nanoseconds for repeating tasks.  A positive
 191          * value indicates fixed-rate execution.  A negative value


 216          * Creates a periodic action with given nano time and period.
 217          */
 218         ScheduledFutureTask(Runnable r, V result, long ns, long period) {
 219             super(r, result);
 220             this.time = ns;
 221             this.period = period;
 222             this.sequenceNumber = sequencer.getAndIncrement();
 223         }
 224 
 225         /**
 226          * Creates a one-shot action with given nanoTime-based trigger.
 227          */
 228         ScheduledFutureTask(Callable<V> callable, long ns) {
 229             super(callable);
 230             this.time = ns;
 231             this.period = 0;
 232             this.sequenceNumber = sequencer.getAndIncrement();
 233         }
 234 
 235         public long getDelay(TimeUnit unit) {
 236             return unit.convert(time - now(), NANOSECONDS);
 237         }
 238 
 239         public int compareTo(Delayed other) {
 240             if (other == this) // compare zero ONLY if same object
 241                 return 0;
 242             if (other instanceof ScheduledFutureTask) {
 243                 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
 244                 long diff = time - x.time;
 245                 if (diff < 0)
 246                     return -1;
 247                 else if (diff > 0)
 248                     return 1;
 249                 else if (sequenceNumber < x.sequenceNumber)
 250                     return -1;
 251                 else
 252                     return 1;
 253             }
 254             long d = (getDelay(NANOSECONDS) -
 255                       other.getDelay(NANOSECONDS));
 256             return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
 257         }
 258 
 259         /**
 260          * Returns true if this is a periodic (not a one-shot) action.
 261          *
 262          * @return true if periodic
 263          */
 264         public boolean isPeriodic() {
 265             return period != 0;
 266         }
 267 
 268         /**
 269          * Sets the next time to run for a periodic task.
 270          */
 271         private void setNextRunTime() {
 272             long p = period;
 273             if (p > 0)
 274                 time += p;
 275             else


 409      *
 410      * @param callable the submitted Callable
 411      * @param task the task created to execute the callable
 412      * @return a task that can execute the callable
 413      * @since 1.6
 414      */
 415     protected <V> RunnableScheduledFuture<V> decorateTask(
 416         Callable<V> callable, RunnableScheduledFuture<V> task) {
 417         return task;
 418     }
 419 
 420     /**
 421      * Creates a new {@code ScheduledThreadPoolExecutor} with the
 422      * given core pool size.
 423      *
 424      * @param corePoolSize the number of threads to keep in the pool, even
 425      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
 426      * @throws IllegalArgumentException if {@code corePoolSize < 0}
 427      */
 428     public ScheduledThreadPoolExecutor(int corePoolSize) {
 429         super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
 430               new DelayedWorkQueue());
 431     }
 432 
 433     /**
 434      * Creates a new {@code ScheduledThreadPoolExecutor} with the
 435      * given initial parameters.
 436      *
 437      * @param corePoolSize the number of threads to keep in the pool, even
 438      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
 439      * @param threadFactory the factory to use when the executor
 440      *        creates a new thread
 441      * @throws IllegalArgumentException if {@code corePoolSize < 0}
 442      * @throws NullPointerException if {@code threadFactory} is null
 443      */
 444     public ScheduledThreadPoolExecutor(int corePoolSize,
 445                                        ThreadFactory threadFactory) {
 446         super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
 447               new DelayedWorkQueue(), threadFactory);
 448     }
 449 
 450     /**
 451      * Creates a new ScheduledThreadPoolExecutor with the given
 452      * initial parameters.
 453      *
 454      * @param corePoolSize the number of threads to keep in the pool, even
 455      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
 456      * @param handler the handler to use when execution is blocked
 457      *        because the thread bounds and queue capacities are reached
 458      * @throws IllegalArgumentException if {@code corePoolSize < 0}
 459      * @throws NullPointerException if {@code handler} is null
 460      */
 461     public ScheduledThreadPoolExecutor(int corePoolSize,
 462                                        RejectedExecutionHandler handler) {
 463         super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
 464               new DelayedWorkQueue(), handler);
 465     }
 466 
 467     /**
 468      * Creates a new ScheduledThreadPoolExecutor with the given
 469      * initial parameters.
 470      *
 471      * @param corePoolSize the number of threads to keep in the pool, even
 472      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
 473      * @param threadFactory the factory to use when the executor
 474      *        creates a new thread
 475      * @param handler the handler to use when execution is blocked
 476      *        because the thread bounds and queue capacities are reached
 477      * @throws IllegalArgumentException if {@code corePoolSize < 0}
 478      * @throws NullPointerException if {@code threadFactory} or
 479      *         {@code handler} is null
 480      */
 481     public ScheduledThreadPoolExecutor(int corePoolSize,
 482                                        ThreadFactory threadFactory,
 483                                        RejectedExecutionHandler handler) {
 484         super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
 485               new DelayedWorkQueue(), threadFactory, handler);
 486     }
 487 
 488     /**
 489      * Returns the trigger time of a delayed action.
 490      */
 491     private long triggerTime(long delay, TimeUnit unit) {
 492         return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
 493     }
 494 
 495     /**
 496      * Returns the trigger time of a delayed action.
 497      */
 498     long triggerTime(long delay) {
 499         return now() +
 500             ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
 501     }
 502 
 503     /**
 504      * Constrains the values of all delays in the queue to be within
 505      * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
 506      * This may occur if a task is eligible to be dequeued, but has
 507      * not yet been, while some other task is added with a delay of
 508      * Long.MAX_VALUE.
 509      */
 510     private long overflowFree(long delay) {
 511         Delayed head = (Delayed) super.getQueue().peek();
 512         if (head != null) {
 513             long headDelay = head.getDelay(NANOSECONDS);
 514             if (headDelay < 0 && (delay - headDelay < 0))
 515                 delay = Long.MAX_VALUE + headDelay;
 516         }
 517         return delay;
 518     }
 519 
 520     /**
 521      * @throws RejectedExecutionException {@inheritDoc}
 522      * @throws NullPointerException       {@inheritDoc}
 523      */
 524     public ScheduledFuture<?> schedule(Runnable command,
 525                                        long delay,
 526                                        TimeUnit unit) {
 527         if (command == null || unit == null)
 528             throw new NullPointerException();
 529         RunnableScheduledFuture<?> t = decorateTask(command,
 530             new ScheduledFutureTask<Void>(command, null,
 531                                           triggerTime(delay, unit)));
 532         delayedExecute(t);
 533         return t;


 601      * Executes {@code command} with zero required delay.
 602      * This has effect equivalent to
 603      * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
 604      * Note that inspections of the queue and of the list returned by
 605      * {@code shutdownNow} will access the zero-delayed
 606      * {@link ScheduledFuture}, not the {@code command} itself.
 607      *
 608      * <p>A consequence of the use of {@code ScheduledFuture} objects is
 609      * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
 610      * called with a null second {@code Throwable} argument, even if the
 611      * {@code command} terminated abruptly.  Instead, the {@code Throwable}
 612      * thrown by such a task can be obtained via {@link Future#get}.
 613      *
 614      * @throws RejectedExecutionException at discretion of
 615      *         {@code RejectedExecutionHandler}, if the task
 616      *         cannot be accepted for execution because the
 617      *         executor has been shut down
 618      * @throws NullPointerException {@inheritDoc}
 619      */
 620     public void execute(Runnable command) {
 621         schedule(command, 0, NANOSECONDS);
 622     }
 623 
 624     // Override AbstractExecutorService methods
 625 
 626     /**
 627      * @throws RejectedExecutionException {@inheritDoc}
 628      * @throws NullPointerException       {@inheritDoc}
 629      */
 630     public Future<?> submit(Runnable task) {
 631         return schedule(task, 0, NANOSECONDS);
 632     }
 633 
 634     /**
 635      * @throws RejectedExecutionException {@inheritDoc}
 636      * @throws NullPointerException       {@inheritDoc}
 637      */
 638     public <T> Future<T> submit(Runnable task, T result) {
 639         return schedule(Executors.callable(task, result), 0, NANOSECONDS);

 640     }
 641 
 642     /**
 643      * @throws RejectedExecutionException {@inheritDoc}
 644      * @throws NullPointerException       {@inheritDoc}
 645      */
 646     public <T> Future<T> submit(Callable<T> task) {
 647         return schedule(task, 0, NANOSECONDS);
 648     }
 649 
 650     /**
 651      * Sets the policy on whether to continue executing existing
 652      * periodic tasks even when this executor has been {@code shutdown}.
 653      * In this case, these tasks will only terminate upon
 654      * {@code shutdownNow} or after setting the policy to
 655      * {@code false} when already shutdown.
 656      * This value is by default {@code false}.
 657      *
 658      * @param value if {@code true}, continue after shutdown, else don't.
 659      * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
 660      */
 661     public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
 662         continueExistingPeriodicTasksAfterShutdown = value;
 663         if (!value && isShutdown())
 664             onShutdown();
 665     }
 666 
 667     /**


 815          * heap array. This eliminates the need to find a task upon
 816          * cancellation, greatly speeding up removal (down from O(n)
 817          * to O(log n)), and reducing garbage retention that would
 818          * otherwise occur by waiting for the element to rise to top
 819          * before clearing. But because the queue may also hold
 820          * RunnableScheduledFutures that are not ScheduledFutureTasks,
 821          * we are not guaranteed to have such indices available, in
 822          * which case we fall back to linear search. (We expect that
 823          * most tasks will not be decorated, and that the faster cases
 824          * will be much more common.)
 825          *
 826          * All heap operations must record index changes -- mainly
 827          * within siftUp and siftDown. Upon removal, a task's
 828          * heapIndex is set to -1. Note that ScheduledFutureTasks can
 829          * appear at most once in the queue (this need not be true for
 830          * other kinds of tasks or work queues), so are uniquely
 831          * identified by heapIndex.
 832          */
 833 
 834         private static final int INITIAL_CAPACITY = 16;
 835         private RunnableScheduledFuture<?>[] queue =
 836             new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
 837         private final ReentrantLock lock = new ReentrantLock();
 838         private int size = 0;
 839 
 840         /**
 841          * Thread designated to wait for the task at the head of the
 842          * queue.  This variant of the Leader-Follower pattern
 843          * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
 844          * minimize unnecessary timed waiting.  When a thread becomes
 845          * the leader, it waits only for the next delay to elapse, but
 846          * other threads await indefinitely.  The leader thread must
 847          * signal some other thread before returning from take() or
 848          * poll(...), unless some other thread becomes leader in the
 849          * interim.  Whenever the head of the queue is replaced with a
 850          * task with an earlier expiration time, the leader field is
 851          * invalidated by being reset to null, and some waiting
 852          * thread, but not necessarily the current leader, is
 853          * signalled.  So waiting threads must be prepared to acquire
 854          * and lose leadership while waiting.
 855          */
 856         private Thread leader = null;
 857 
 858         /**
 859          * Condition signalled when a newer task becomes available at the
 860          * head of the queue or a new thread may need to become leader.
 861          */
 862         private final Condition available = lock.newCondition();
 863 
 864         /**
 865          * Set f's heapIndex if it is a ScheduledFutureTask.
 866          */
 867         private void setIndex(RunnableScheduledFuture<?> f, int idx) {
 868             if (f instanceof ScheduledFutureTask)
 869                 ((ScheduledFutureTask)f).heapIndex = idx;
 870         }
 871 
 872         /**
 873          * Sift element added at bottom up to its heap-ordered spot.
 874          * Call only when holding lock.
 875          */
 876         private void siftUp(int k, RunnableScheduledFuture<?> key) {
 877             while (k > 0) {
 878                 int parent = (k - 1) >>> 1;
 879                 RunnableScheduledFuture<?> e = queue[parent];
 880                 if (key.compareTo(e) >= 0)
 881                     break;
 882                 queue[k] = e;
 883                 setIndex(e, k);
 884                 k = parent;
 885             }
 886             queue[k] = key;
 887             setIndex(key, k);
 888         }
 889 
 890         /**
 891          * Sift element added at top down to its heap-ordered spot.
 892          * Call only when holding lock.
 893          */
 894         private void siftDown(int k, RunnableScheduledFuture<?> key) {
 895             int half = size >>> 1;
 896             while (k < half) {
 897                 int child = (k << 1) + 1;
 898                 RunnableScheduledFuture<?> c = queue[child];
 899                 int right = child + 1;
 900                 if (right < size && c.compareTo(queue[right]) > 0)
 901                     c = queue[child = right];
 902                 if (key.compareTo(c) <= 0)
 903                     break;
 904                 queue[k] = c;
 905                 setIndex(c, k);
 906                 k = child;
 907             }
 908             queue[k] = key;
 909             setIndex(key, k);
 910         }
 911 
 912         /**
 913          * Resize the heap array.  Call only when holding lock.
 914          */
 915         private void grow() {
 916             int oldCapacity = queue.length;
 917             int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
 918             if (newCapacity < 0) // overflow


 943         public boolean contains(Object x) {
 944             final ReentrantLock lock = this.lock;
 945             lock.lock();
 946             try {
 947                 return indexOf(x) != -1;
 948             } finally {
 949                 lock.unlock();
 950             }
 951         }
 952 
 953         public boolean remove(Object x) {
 954             final ReentrantLock lock = this.lock;
 955             lock.lock();
 956             try {
 957                 int i = indexOf(x);
 958                 if (i < 0)
 959                     return false;
 960 
 961                 setIndex(queue[i], -1);
 962                 int s = --size;
 963                 RunnableScheduledFuture<?> replacement = queue[s];
 964                 queue[s] = null;
 965                 if (s != i) {
 966                     siftDown(i, replacement);
 967                     if (queue[i] == replacement)
 968                         siftUp(i, replacement);
 969                 }
 970                 return true;
 971             } finally {
 972                 lock.unlock();
 973             }
 974         }
 975 
 976         public int size() {
 977             final ReentrantLock lock = this.lock;
 978             lock.lock();
 979             try {
 980                 return size;
 981             } finally {
 982                 lock.unlock();
 983             }
 984         }
 985 
 986         public boolean isEmpty() {
 987             return size() == 0;
 988         }
 989 
 990         public int remainingCapacity() {
 991             return Integer.MAX_VALUE;
 992         }
 993 
 994         public RunnableScheduledFuture<?> peek() {
 995             final ReentrantLock lock = this.lock;
 996             lock.lock();
 997             try {
 998                 return queue[0];
 999             } finally {
1000                 lock.unlock();
1001             }
1002         }
1003 
1004         public boolean offer(Runnable x) {
1005             if (x == null)
1006                 throw new NullPointerException();
1007             RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
1008             final ReentrantLock lock = this.lock;
1009             lock.lock();
1010             try {
1011                 int i = size;
1012                 if (i >= queue.length)
1013                     grow();
1014                 size = i + 1;
1015                 if (i == 0) {
1016                     queue[0] = e;
1017                     setIndex(e, 0);
1018                 } else {
1019                     siftUp(i, e);
1020                 }
1021                 if (queue[0] == e) {
1022                     leader = null;
1023                     available.signal();
1024                 }
1025             } finally {
1026                 lock.unlock();
1027             }


1029         }
1030 
1031         public void put(Runnable e) {
1032             offer(e);
1033         }
1034 
1035         public boolean add(Runnable e) {
1036             return offer(e);
1037         }
1038 
1039         public boolean offer(Runnable e, long timeout, TimeUnit unit) {
1040             return offer(e);
1041         }
1042 
1043         /**
1044          * Performs common bookkeeping for poll and take: Replaces
1045          * first element with last and sifts it down.  Call only when
1046          * holding lock.
1047          * @param f the task to remove and return
1048          */
1049         private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
1050             int s = --size;
1051             RunnableScheduledFuture<?> x = queue[s];
1052             queue[s] = null;
1053             if (s != 0)
1054                 siftDown(0, x);
1055             setIndex(f, -1);
1056             return f;
1057         }
1058 
1059         public RunnableScheduledFuture<?> poll() {
1060             final ReentrantLock lock = this.lock;
1061             lock.lock();
1062             try {
1063                 RunnableScheduledFuture<?> first = queue[0];
1064                 if (first == null || first.getDelay(NANOSECONDS) > 0)
1065                     return null;
1066                 else
1067                     return finishPoll(first);
1068             } finally {
1069                 lock.unlock();
1070             }
1071         }
1072 
1073         public RunnableScheduledFuture<?> take() throws InterruptedException {
1074             final ReentrantLock lock = this.lock;
1075             lock.lockInterruptibly();
1076             try {
1077                 for (;;) {
1078                     RunnableScheduledFuture<?> first = queue[0];
1079                     if (first == null)
1080                         available.await();
1081                     else {
1082                         long delay = first.getDelay(NANOSECONDS);
1083                         if (delay <= 0)
1084                             return finishPoll(first);
1085                         else if (leader != null)
1086                             available.await();
1087                         else {
1088                             Thread thisThread = Thread.currentThread();
1089                             leader = thisThread;
1090                             try {
1091                                 available.awaitNanos(delay);
1092                             } finally {
1093                                 if (leader == thisThread)
1094                                     leader = null;
1095                             }
1096                         }
1097                     }
1098                 }
1099             } finally {
1100                 if (leader == null && queue[0] != null)
1101                     available.signal();
1102                 lock.unlock();
1103             }
1104         }
1105 
1106         public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
1107             throws InterruptedException {
1108             long nanos = unit.toNanos(timeout);
1109             final ReentrantLock lock = this.lock;
1110             lock.lockInterruptibly();
1111             try {
1112                 for (;;) {
1113                     RunnableScheduledFuture<?> first = queue[0];
1114                     if (first == null) {
1115                         if (nanos <= 0)
1116                             return null;
1117                         else
1118                             nanos = available.awaitNanos(nanos);
1119                     } else {
1120                         long delay = first.getDelay(NANOSECONDS);
1121                         if (delay <= 0)
1122                             return finishPoll(first);
1123                         if (nanos <= 0)
1124                             return null;
1125                         if (nanos < delay || leader != null)
1126                             nanos = available.awaitNanos(nanos);
1127                         else {
1128                             Thread thisThread = Thread.currentThread();
1129                             leader = thisThread;
1130                             try {
1131                                 long timeLeft = available.awaitNanos(delay);
1132                                 nanos -= delay - timeLeft;
1133                             } finally {
1134                                 if (leader == thisThread)
1135                                     leader = null;
1136                             }
1137                         }
1138                     }
1139                 }
1140             } finally {
1141                 if (leader == null && queue[0] != null)
1142                     available.signal();
1143                 lock.unlock();
1144             }
1145         }
1146 
1147         public void clear() {
1148             final ReentrantLock lock = this.lock;
1149             lock.lock();
1150             try {
1151                 for (int i = 0; i < size; i++) {
1152                     RunnableScheduledFuture<?> t = queue[i];
1153                     if (t != null) {
1154                         queue[i] = null;
1155                         setIndex(t, -1);
1156                     }
1157                 }
1158                 size = 0;
1159             } finally {
1160                 lock.unlock();
1161             }
1162         }
1163 
1164         /**
1165          * Return and remove first element only if it is expired.
1166          * Used only by drainTo.  Call only when holding lock.
1167          */
1168         private RunnableScheduledFuture<?> pollExpired() {
1169             // assert lock.isHeldByCurrentThread();
1170             RunnableScheduledFuture<?> first = queue[0];
1171             if (first == null || first.getDelay(NANOSECONDS) > 0)
1172                 return null;
1173             return finishPoll(first);
1174         }
1175 
1176         public int drainTo(Collection<? super Runnable> c) {
1177             if (c == null)
1178                 throw new NullPointerException();
1179             if (c == this)
1180                 throw new IllegalArgumentException();
1181             final ReentrantLock lock = this.lock;
1182             lock.lock();
1183             try {
1184                 RunnableScheduledFuture<?> first;
1185                 int n = 0;
1186                 while ((first = pollExpired()) != null) {
1187                     c.add(first);
1188                     ++n;
1189                 }
1190                 return n;
1191             } finally {
1192                 lock.unlock();
1193             }
1194         }
1195 
1196         public int drainTo(Collection<? super Runnable> c, int maxElements) {
1197             if (c == null)
1198                 throw new NullPointerException();
1199             if (c == this)
1200                 throw new IllegalArgumentException();
1201             if (maxElements <= 0)
1202                 return 0;
1203             final ReentrantLock lock = this.lock;
1204             lock.lock();
1205             try {
1206                 RunnableScheduledFuture<?> first;
1207                 int n = 0;
1208                 while (n < maxElements && (first = pollExpired()) != null) {
1209                     c.add(first);
1210                     ++n;
1211                 }
1212                 return n;
1213             } finally {
1214                 lock.unlock();
1215             }
1216         }
1217 
1218         public Object[] toArray() {
1219             final ReentrantLock lock = this.lock;
1220             lock.lock();
1221             try {
1222                 return Arrays.copyOf(queue, size, Object[].class);
1223             } finally {
1224                 lock.unlock();
1225             }
1226         }