17 * 2 along with this work; if not, write to the Free Software Foundation,
18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19 *
20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21 * or visit www.oracle.com if you need additional information or have any
22 * questions.
23 */
24
25 /*
26 * This file is available under and governed by the GNU General Public
27 * License version 2 only, as published by the Free Software Foundation.
28 * However, the following notice accompanied the original version of this
29 * file:
30 *
31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 * Expert Group and released to the public domain, as explained at
33 * http://creativecommons.org/publicdomain/zero/1.0/
34 */
35
36 package java.util.concurrent;
37 import java.util.concurrent.atomic.*;
38 import java.util.concurrent.locks.*;
39 import java.util.*;
40
41 /**
42 * A {@link ThreadPoolExecutor} that can additionally schedule
43 * commands to run after a given delay, or to execute
44 * periodically. This class is preferable to {@link java.util.Timer}
45 * when multiple worker threads are needed, or when the additional
46 * flexibility or capabilities of {@link ThreadPoolExecutor} (which
47 * this class extends) are required.
48 *
49 * <p>Delayed tasks execute no sooner than they are enabled, but
50 * without any real-time guarantees about when, after they are
51 * enabled, they will commence. Tasks scheduled for exactly the same
52 * execution time are enabled in first-in-first-out (FIFO) order of
53 * submission.
54 *
55 * <p>When a submitted task is cancelled before it is run, execution
56 * is suppressed. By default, such a cancelled task is not
57 * automatically removed from the work queue until its delay
58 * elapses. While this enables further inspection and monitoring, it
149
150 /**
151 * False if should cancel/suppress periodic tasks on shutdown.
152 */
153 private volatile boolean continueExistingPeriodicTasksAfterShutdown;
154
155 /**
156 * False if should cancel non-periodic tasks on shutdown.
157 */
158 private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
159
160 /**
161 * True if ScheduledFutureTask.cancel should remove from queue
162 */
163 private volatile boolean removeOnCancel = false;
164
165 /**
166 * Sequence number to break scheduling ties, and in turn to
167 * guarantee FIFO order among tied entries.
168 */
169 private static final AtomicLong sequencer = new AtomicLong(0);
170
171 /**
172 * Returns current nanosecond time.
173 */
174 final long now() {
175 return System.nanoTime();
176 }
177
178 private class ScheduledFutureTask<V>
179 extends FutureTask<V> implements RunnableScheduledFuture<V> {
180
181 /** Sequence number to break ties FIFO */
182 private final long sequenceNumber;
183
184 /** The time the task is enabled to execute in nanoTime units */
185 private long time;
186
187 /**
188 * Period in nanoseconds for repeating tasks. A positive
189 * value indicates fixed-rate execution. A negative value
214 * Creates a periodic action with given nano time and period.
215 */
216 ScheduledFutureTask(Runnable r, V result, long ns, long period) {
217 super(r, result);
218 this.time = ns;
219 this.period = period;
220 this.sequenceNumber = sequencer.getAndIncrement();
221 }
222
223 /**
224 * Creates a one-shot action with given nanoTime-based trigger.
225 */
226 ScheduledFutureTask(Callable<V> callable, long ns) {
227 super(callable);
228 this.time = ns;
229 this.period = 0;
230 this.sequenceNumber = sequencer.getAndIncrement();
231 }
232
233 public long getDelay(TimeUnit unit) {
234 return unit.convert(time - now(), TimeUnit.NANOSECONDS);
235 }
236
237 public int compareTo(Delayed other) {
238 if (other == this) // compare zero ONLY if same object
239 return 0;
240 if (other instanceof ScheduledFutureTask) {
241 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
242 long diff = time - x.time;
243 if (diff < 0)
244 return -1;
245 else if (diff > 0)
246 return 1;
247 else if (sequenceNumber < x.sequenceNumber)
248 return -1;
249 else
250 return 1;
251 }
252 long d = (getDelay(TimeUnit.NANOSECONDS) -
253 other.getDelay(TimeUnit.NANOSECONDS));
254 return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
255 }
256
257 /**
258 * Returns true if this is a periodic (not a one-shot) action.
259 *
260 * @return true if periodic
261 */
262 public boolean isPeriodic() {
263 return period != 0;
264 }
265
266 /**
267 * Sets the next time to run for a periodic task.
268 */
269 private void setNextRunTime() {
270 long p = period;
271 if (p > 0)
272 time += p;
273 else
407 *
408 * @param callable the submitted Callable
409 * @param task the task created to execute the callable
410 * @return a task that can execute the callable
411 * @since 1.6
412 */
413 protected <V> RunnableScheduledFuture<V> decorateTask(
414 Callable<V> callable, RunnableScheduledFuture<V> task) {
415 return task;
416 }
417
418 /**
419 * Creates a new {@code ScheduledThreadPoolExecutor} with the
420 * given core pool size.
421 *
422 * @param corePoolSize the number of threads to keep in the pool, even
423 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
424 * @throws IllegalArgumentException if {@code corePoolSize < 0}
425 */
426 public ScheduledThreadPoolExecutor(int corePoolSize) {
427 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
428 new DelayedWorkQueue());
429 }
430
431 /**
432 * Creates a new {@code ScheduledThreadPoolExecutor} with the
433 * given initial parameters.
434 *
435 * @param corePoolSize the number of threads to keep in the pool, even
436 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
437 * @param threadFactory the factory to use when the executor
438 * creates a new thread
439 * @throws IllegalArgumentException if {@code corePoolSize < 0}
440 * @throws NullPointerException if {@code threadFactory} is null
441 */
442 public ScheduledThreadPoolExecutor(int corePoolSize,
443 ThreadFactory threadFactory) {
444 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
445 new DelayedWorkQueue(), threadFactory);
446 }
447
448 /**
449 * Creates a new ScheduledThreadPoolExecutor with the given
450 * initial parameters.
451 *
452 * @param corePoolSize the number of threads to keep in the pool, even
453 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
454 * @param handler the handler to use when execution is blocked
455 * because the thread bounds and queue capacities are reached
456 * @throws IllegalArgumentException if {@code corePoolSize < 0}
457 * @throws NullPointerException if {@code handler} is null
458 */
459 public ScheduledThreadPoolExecutor(int corePoolSize,
460 RejectedExecutionHandler handler) {
461 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
462 new DelayedWorkQueue(), handler);
463 }
464
465 /**
466 * Creates a new ScheduledThreadPoolExecutor with the given
467 * initial parameters.
468 *
469 * @param corePoolSize the number of threads to keep in the pool, even
470 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
471 * @param threadFactory the factory to use when the executor
472 * creates a new thread
473 * @param handler the handler to use when execution is blocked
474 * because the thread bounds and queue capacities are reached
475 * @throws IllegalArgumentException if {@code corePoolSize < 0}
476 * @throws NullPointerException if {@code threadFactory} or
477 * {@code handler} is null
478 */
479 public ScheduledThreadPoolExecutor(int corePoolSize,
480 ThreadFactory threadFactory,
481 RejectedExecutionHandler handler) {
482 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
483 new DelayedWorkQueue(), threadFactory, handler);
484 }
485
486 /**
487 * Returns the trigger time of a delayed action.
488 */
489 private long triggerTime(long delay, TimeUnit unit) {
490 return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
491 }
492
493 /**
494 * Returns the trigger time of a delayed action.
495 */
496 long triggerTime(long delay) {
497 return now() +
498 ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
499 }
500
501 /**
502 * Constrains the values of all delays in the queue to be within
503 * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
504 * This may occur if a task is eligible to be dequeued, but has
505 * not yet been, while some other task is added with a delay of
506 * Long.MAX_VALUE.
507 */
508 private long overflowFree(long delay) {
509 Delayed head = (Delayed) super.getQueue().peek();
510 if (head != null) {
511 long headDelay = head.getDelay(TimeUnit.NANOSECONDS);
512 if (headDelay < 0 && (delay - headDelay < 0))
513 delay = Long.MAX_VALUE + headDelay;
514 }
515 return delay;
516 }
517
518 /**
519 * @throws RejectedExecutionException {@inheritDoc}
520 * @throws NullPointerException {@inheritDoc}
521 */
522 public ScheduledFuture<?> schedule(Runnable command,
523 long delay,
524 TimeUnit unit) {
525 if (command == null || unit == null)
526 throw new NullPointerException();
527 RunnableScheduledFuture<?> t = decorateTask(command,
528 new ScheduledFutureTask<Void>(command, null,
529 triggerTime(delay, unit)));
530 delayedExecute(t);
531 return t;
599 * Executes {@code command} with zero required delay.
600 * This has effect equivalent to
601 * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
602 * Note that inspections of the queue and of the list returned by
603 * {@code shutdownNow} will access the zero-delayed
604 * {@link ScheduledFuture}, not the {@code command} itself.
605 *
606 * <p>A consequence of the use of {@code ScheduledFuture} objects is
607 * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
608 * called with a null second {@code Throwable} argument, even if the
609 * {@code command} terminated abruptly. Instead, the {@code Throwable}
610 * thrown by such a task can be obtained via {@link Future#get}.
611 *
612 * @throws RejectedExecutionException at discretion of
613 * {@code RejectedExecutionHandler}, if the task
614 * cannot be accepted for execution because the
615 * executor has been shut down
616 * @throws NullPointerException {@inheritDoc}
617 */
618 public void execute(Runnable command) {
619 schedule(command, 0, TimeUnit.NANOSECONDS);
620 }
621
622 // Override AbstractExecutorService methods
623
624 /**
625 * @throws RejectedExecutionException {@inheritDoc}
626 * @throws NullPointerException {@inheritDoc}
627 */
628 public Future<?> submit(Runnable task) {
629 return schedule(task, 0, TimeUnit.NANOSECONDS);
630 }
631
632 /**
633 * @throws RejectedExecutionException {@inheritDoc}
634 * @throws NullPointerException {@inheritDoc}
635 */
636 public <T> Future<T> submit(Runnable task, T result) {
637 return schedule(Executors.callable(task, result),
638 0, TimeUnit.NANOSECONDS);
639 }
640
641 /**
642 * @throws RejectedExecutionException {@inheritDoc}
643 * @throws NullPointerException {@inheritDoc}
644 */
645 public <T> Future<T> submit(Callable<T> task) {
646 return schedule(task, 0, TimeUnit.NANOSECONDS);
647 }
648
649 /**
650 * Sets the policy on whether to continue executing existing
651 * periodic tasks even when this executor has been {@code shutdown}.
652 * In this case, these tasks will only terminate upon
653 * {@code shutdownNow} or after setting the policy to
654 * {@code false} when already shutdown.
655 * This value is by default {@code false}.
656 *
657 * @param value if {@code true}, continue after shutdown, else don't.
658 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
659 */
660 public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
661 continueExistingPeriodicTasksAfterShutdown = value;
662 if (!value && isShutdown())
663 onShutdown();
664 }
665
666 /**
814 * heap array. This eliminates the need to find a task upon
815 * cancellation, greatly speeding up removal (down from O(n)
816 * to O(log n)), and reducing garbage retention that would
817 * otherwise occur by waiting for the element to rise to top
818 * before clearing. But because the queue may also hold
819 * RunnableScheduledFutures that are not ScheduledFutureTasks,
820 * we are not guaranteed to have such indices available, in
821 * which case we fall back to linear search. (We expect that
822 * most tasks will not be decorated, and that the faster cases
823 * will be much more common.)
824 *
825 * All heap operations must record index changes -- mainly
826 * within siftUp and siftDown. Upon removal, a task's
827 * heapIndex is set to -1. Note that ScheduledFutureTasks can
828 * appear at most once in the queue (this need not be true for
829 * other kinds of tasks or work queues), so are uniquely
830 * identified by heapIndex.
831 */
832
833 private static final int INITIAL_CAPACITY = 16;
834 private RunnableScheduledFuture[] queue =
835 new RunnableScheduledFuture[INITIAL_CAPACITY];
836 private final ReentrantLock lock = new ReentrantLock();
837 private int size = 0;
838
839 /**
840 * Thread designated to wait for the task at the head of the
841 * queue. This variant of the Leader-Follower pattern
842 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
843 * minimize unnecessary timed waiting. When a thread becomes
844 * the leader, it waits only for the next delay to elapse, but
845 * other threads await indefinitely. The leader thread must
846 * signal some other thread before returning from take() or
847 * poll(...), unless some other thread becomes leader in the
848 * interim. Whenever the head of the queue is replaced with a
849 * task with an earlier expiration time, the leader field is
850 * invalidated by being reset to null, and some waiting
851 * thread, but not necessarily the current leader, is
852 * signalled. So waiting threads must be prepared to acquire
853 * and lose leadership while waiting.
854 */
855 private Thread leader = null;
856
857 /**
858 * Condition signalled when a newer task becomes available at the
859 * head of the queue or a new thread may need to become leader.
860 */
861 private final Condition available = lock.newCondition();
862
863 /**
864 * Set f's heapIndex if it is a ScheduledFutureTask.
865 */
866 private void setIndex(RunnableScheduledFuture f, int idx) {
867 if (f instanceof ScheduledFutureTask)
868 ((ScheduledFutureTask)f).heapIndex = idx;
869 }
870
871 /**
872 * Sift element added at bottom up to its heap-ordered spot.
873 * Call only when holding lock.
874 */
875 private void siftUp(int k, RunnableScheduledFuture key) {
876 while (k > 0) {
877 int parent = (k - 1) >>> 1;
878 RunnableScheduledFuture e = queue[parent];
879 if (key.compareTo(e) >= 0)
880 break;
881 queue[k] = e;
882 setIndex(e, k);
883 k = parent;
884 }
885 queue[k] = key;
886 setIndex(key, k);
887 }
888
889 /**
890 * Sift element added at top down to its heap-ordered spot.
891 * Call only when holding lock.
892 */
893 private void siftDown(int k, RunnableScheduledFuture key) {
894 int half = size >>> 1;
895 while (k < half) {
896 int child = (k << 1) + 1;
897 RunnableScheduledFuture c = queue[child];
898 int right = child + 1;
899 if (right < size && c.compareTo(queue[right]) > 0)
900 c = queue[child = right];
901 if (key.compareTo(c) <= 0)
902 break;
903 queue[k] = c;
904 setIndex(c, k);
905 k = child;
906 }
907 queue[k] = key;
908 setIndex(key, k);
909 }
910
911 /**
912 * Resize the heap array. Call only when holding lock.
913 */
914 private void grow() {
915 int oldCapacity = queue.length;
916 int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
917 if (newCapacity < 0) // overflow
942 public boolean contains(Object x) {
943 final ReentrantLock lock = this.lock;
944 lock.lock();
945 try {
946 return indexOf(x) != -1;
947 } finally {
948 lock.unlock();
949 }
950 }
951
952 public boolean remove(Object x) {
953 final ReentrantLock lock = this.lock;
954 lock.lock();
955 try {
956 int i = indexOf(x);
957 if (i < 0)
958 return false;
959
960 setIndex(queue[i], -1);
961 int s = --size;
962 RunnableScheduledFuture replacement = queue[s];
963 queue[s] = null;
964 if (s != i) {
965 siftDown(i, replacement);
966 if (queue[i] == replacement)
967 siftUp(i, replacement);
968 }
969 return true;
970 } finally {
971 lock.unlock();
972 }
973 }
974
975 public int size() {
976 final ReentrantLock lock = this.lock;
977 lock.lock();
978 try {
979 return size;
980 } finally {
981 lock.unlock();
982 }
983 }
984
985 public boolean isEmpty() {
986 return size() == 0;
987 }
988
989 public int remainingCapacity() {
990 return Integer.MAX_VALUE;
991 }
992
993 public RunnableScheduledFuture peek() {
994 final ReentrantLock lock = this.lock;
995 lock.lock();
996 try {
997 return queue[0];
998 } finally {
999 lock.unlock();
1000 }
1001 }
1002
1003 public boolean offer(Runnable x) {
1004 if (x == null)
1005 throw new NullPointerException();
1006 RunnableScheduledFuture e = (RunnableScheduledFuture)x;
1007 final ReentrantLock lock = this.lock;
1008 lock.lock();
1009 try {
1010 int i = size;
1011 if (i >= queue.length)
1012 grow();
1013 size = i + 1;
1014 if (i == 0) {
1015 queue[0] = e;
1016 setIndex(e, 0);
1017 } else {
1018 siftUp(i, e);
1019 }
1020 if (queue[0] == e) {
1021 leader = null;
1022 available.signal();
1023 }
1024 } finally {
1025 lock.unlock();
1026 }
1028 }
1029
1030 public void put(Runnable e) {
1031 offer(e);
1032 }
1033
1034 public boolean add(Runnable e) {
1035 return offer(e);
1036 }
1037
1038 public boolean offer(Runnable e, long timeout, TimeUnit unit) {
1039 return offer(e);
1040 }
1041
1042 /**
1043 * Performs common bookkeeping for poll and take: Replaces
1044 * first element with last and sifts it down. Call only when
1045 * holding lock.
1046 * @param f the task to remove and return
1047 */
1048 private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) {
1049 int s = --size;
1050 RunnableScheduledFuture x = queue[s];
1051 queue[s] = null;
1052 if (s != 0)
1053 siftDown(0, x);
1054 setIndex(f, -1);
1055 return f;
1056 }
1057
1058 public RunnableScheduledFuture poll() {
1059 final ReentrantLock lock = this.lock;
1060 lock.lock();
1061 try {
1062 RunnableScheduledFuture first = queue[0];
1063 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
1064 return null;
1065 else
1066 return finishPoll(first);
1067 } finally {
1068 lock.unlock();
1069 }
1070 }
1071
1072 public RunnableScheduledFuture take() throws InterruptedException {
1073 final ReentrantLock lock = this.lock;
1074 lock.lockInterruptibly();
1075 try {
1076 for (;;) {
1077 RunnableScheduledFuture first = queue[0];
1078 if (first == null)
1079 available.await();
1080 else {
1081 long delay = first.getDelay(TimeUnit.NANOSECONDS);
1082 if (delay <= 0)
1083 return finishPoll(first);
1084 else if (leader != null)
1085 available.await();
1086 else {
1087 Thread thisThread = Thread.currentThread();
1088 leader = thisThread;
1089 try {
1090 available.awaitNanos(delay);
1091 } finally {
1092 if (leader == thisThread)
1093 leader = null;
1094 }
1095 }
1096 }
1097 }
1098 } finally {
1099 if (leader == null && queue[0] != null)
1100 available.signal();
1101 lock.unlock();
1102 }
1103 }
1104
1105 public RunnableScheduledFuture poll(long timeout, TimeUnit unit)
1106 throws InterruptedException {
1107 long nanos = unit.toNanos(timeout);
1108 final ReentrantLock lock = this.lock;
1109 lock.lockInterruptibly();
1110 try {
1111 for (;;) {
1112 RunnableScheduledFuture first = queue[0];
1113 if (first == null) {
1114 if (nanos <= 0)
1115 return null;
1116 else
1117 nanos = available.awaitNanos(nanos);
1118 } else {
1119 long delay = first.getDelay(TimeUnit.NANOSECONDS);
1120 if (delay <= 0)
1121 return finishPoll(first);
1122 if (nanos <= 0)
1123 return null;
1124 if (nanos < delay || leader != null)
1125 nanos = available.awaitNanos(nanos);
1126 else {
1127 Thread thisThread = Thread.currentThread();
1128 leader = thisThread;
1129 try {
1130 long timeLeft = available.awaitNanos(delay);
1131 nanos -= delay - timeLeft;
1132 } finally {
1133 if (leader == thisThread)
1134 leader = null;
1135 }
1136 }
1137 }
1138 }
1139 } finally {
1140 if (leader == null && queue[0] != null)
1141 available.signal();
1142 lock.unlock();
1143 }
1144 }
1145
1146 public void clear() {
1147 final ReentrantLock lock = this.lock;
1148 lock.lock();
1149 try {
1150 for (int i = 0; i < size; i++) {
1151 RunnableScheduledFuture t = queue[i];
1152 if (t != null) {
1153 queue[i] = null;
1154 setIndex(t, -1);
1155 }
1156 }
1157 size = 0;
1158 } finally {
1159 lock.unlock();
1160 }
1161 }
1162
1163 /**
1164 * Return and remove first element only if it is expired.
1165 * Used only by drainTo. Call only when holding lock.
1166 */
1167 private RunnableScheduledFuture pollExpired() {
1168 RunnableScheduledFuture first = queue[0];
1169 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
1170 return null;
1171 return finishPoll(first);
1172 }
1173
1174 public int drainTo(Collection<? super Runnable> c) {
1175 if (c == null)
1176 throw new NullPointerException();
1177 if (c == this)
1178 throw new IllegalArgumentException();
1179 final ReentrantLock lock = this.lock;
1180 lock.lock();
1181 try {
1182 RunnableScheduledFuture first;
1183 int n = 0;
1184 while ((first = pollExpired()) != null) {
1185 c.add(first);
1186 ++n;
1187 }
1188 return n;
1189 } finally {
1190 lock.unlock();
1191 }
1192 }
1193
1194 public int drainTo(Collection<? super Runnable> c, int maxElements) {
1195 if (c == null)
1196 throw new NullPointerException();
1197 if (c == this)
1198 throw new IllegalArgumentException();
1199 if (maxElements <= 0)
1200 return 0;
1201 final ReentrantLock lock = this.lock;
1202 lock.lock();
1203 try {
1204 RunnableScheduledFuture first;
1205 int n = 0;
1206 while (n < maxElements && (first = pollExpired()) != null) {
1207 c.add(first);
1208 ++n;
1209 }
1210 return n;
1211 } finally {
1212 lock.unlock();
1213 }
1214 }
1215
1216 public Object[] toArray() {
1217 final ReentrantLock lock = this.lock;
1218 lock.lock();
1219 try {
1220 return Arrays.copyOf(queue, size, Object[].class);
1221 } finally {
1222 lock.unlock();
1223 }
1224 }
|
17 * 2 along with this work; if not, write to the Free Software Foundation,
18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19 *
20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21 * or visit www.oracle.com if you need additional information or have any
22 * questions.
23 */
24
25 /*
26 * This file is available under and governed by the GNU General Public
27 * License version 2 only, as published by the Free Software Foundation.
28 * However, the following notice accompanied the original version of this
29 * file:
30 *
31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 * Expert Group and released to the public domain, as explained at
33 * http://creativecommons.org/publicdomain/zero/1.0/
34 */
35
36 package java.util.concurrent;
37 import static java.util.concurrent.TimeUnit.NANOSECONDS;
38 import java.util.concurrent.atomic.AtomicLong;
39 import java.util.concurrent.locks.Condition;
40 import java.util.concurrent.locks.ReentrantLock;
41 import java.util.*;
42
43 /**
44 * A {@link ThreadPoolExecutor} that can additionally schedule
45 * commands to run after a given delay, or to execute
46 * periodically. This class is preferable to {@link java.util.Timer}
47 * when multiple worker threads are needed, or when the additional
48 * flexibility or capabilities of {@link ThreadPoolExecutor} (which
49 * this class extends) are required.
50 *
51 * <p>Delayed tasks execute no sooner than they are enabled, but
52 * without any real-time guarantees about when, after they are
53 * enabled, they will commence. Tasks scheduled for exactly the same
54 * execution time are enabled in first-in-first-out (FIFO) order of
55 * submission.
56 *
57 * <p>When a submitted task is cancelled before it is run, execution
58 * is suppressed. By default, such a cancelled task is not
59 * automatically removed from the work queue until its delay
60 * elapses. While this enables further inspection and monitoring, it
151
152 /**
153 * False if should cancel/suppress periodic tasks on shutdown.
154 */
155 private volatile boolean continueExistingPeriodicTasksAfterShutdown;
156
157 /**
158 * False if should cancel non-periodic tasks on shutdown.
159 */
160 private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
161
162 /**
163 * True if ScheduledFutureTask.cancel should remove from queue
164 */
165 private volatile boolean removeOnCancel = false;
166
167 /**
168 * Sequence number to break scheduling ties, and in turn to
169 * guarantee FIFO order among tied entries.
170 */
171 private static final AtomicLong sequencer = new AtomicLong();
172
173 /**
174 * Returns current nanosecond time.
175 */
176 final long now() {
177 return System.nanoTime();
178 }
179
180 private class ScheduledFutureTask<V>
181 extends FutureTask<V> implements RunnableScheduledFuture<V> {
182
183 /** Sequence number to break ties FIFO */
184 private final long sequenceNumber;
185
186 /** The time the task is enabled to execute in nanoTime units */
187 private long time;
188
189 /**
190 * Period in nanoseconds for repeating tasks. A positive
191 * value indicates fixed-rate execution. A negative value
216 * Creates a periodic action with given nano time and period.
217 */
218 ScheduledFutureTask(Runnable r, V result, long ns, long period) {
219 super(r, result);
220 this.time = ns;
221 this.period = period;
222 this.sequenceNumber = sequencer.getAndIncrement();
223 }
224
225 /**
226 * Creates a one-shot action with given nanoTime-based trigger.
227 */
228 ScheduledFutureTask(Callable<V> callable, long ns) {
229 super(callable);
230 this.time = ns;
231 this.period = 0;
232 this.sequenceNumber = sequencer.getAndIncrement();
233 }
234
235 public long getDelay(TimeUnit unit) {
236 return unit.convert(time - now(), NANOSECONDS);
237 }
238
239 public int compareTo(Delayed other) {
240 if (other == this) // compare zero ONLY if same object
241 return 0;
242 if (other instanceof ScheduledFutureTask) {
243 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
244 long diff = time - x.time;
245 if (diff < 0)
246 return -1;
247 else if (diff > 0)
248 return 1;
249 else if (sequenceNumber < x.sequenceNumber)
250 return -1;
251 else
252 return 1;
253 }
254 long d = (getDelay(NANOSECONDS) -
255 other.getDelay(NANOSECONDS));
256 return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
257 }
258
259 /**
260 * Returns true if this is a periodic (not a one-shot) action.
261 *
262 * @return true if periodic
263 */
264 public boolean isPeriodic() {
265 return period != 0;
266 }
267
268 /**
269 * Sets the next time to run for a periodic task.
270 */
271 private void setNextRunTime() {
272 long p = period;
273 if (p > 0)
274 time += p;
275 else
409 *
410 * @param callable the submitted Callable
411 * @param task the task created to execute the callable
412 * @return a task that can execute the callable
413 * @since 1.6
414 */
415 protected <V> RunnableScheduledFuture<V> decorateTask(
416 Callable<V> callable, RunnableScheduledFuture<V> task) {
417 return task;
418 }
419
420 /**
421 * Creates a new {@code ScheduledThreadPoolExecutor} with the
422 * given core pool size.
423 *
424 * @param corePoolSize the number of threads to keep in the pool, even
425 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
426 * @throws IllegalArgumentException if {@code corePoolSize < 0}
427 */
428 public ScheduledThreadPoolExecutor(int corePoolSize) {
429 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
430 new DelayedWorkQueue());
431 }
432
433 /**
434 * Creates a new {@code ScheduledThreadPoolExecutor} with the
435 * given initial parameters.
436 *
437 * @param corePoolSize the number of threads to keep in the pool, even
438 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
439 * @param threadFactory the factory to use when the executor
440 * creates a new thread
441 * @throws IllegalArgumentException if {@code corePoolSize < 0}
442 * @throws NullPointerException if {@code threadFactory} is null
443 */
444 public ScheduledThreadPoolExecutor(int corePoolSize,
445 ThreadFactory threadFactory) {
446 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
447 new DelayedWorkQueue(), threadFactory);
448 }
449
450 /**
451 * Creates a new ScheduledThreadPoolExecutor with the given
452 * initial parameters.
453 *
454 * @param corePoolSize the number of threads to keep in the pool, even
455 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
456 * @param handler the handler to use when execution is blocked
457 * because the thread bounds and queue capacities are reached
458 * @throws IllegalArgumentException if {@code corePoolSize < 0}
459 * @throws NullPointerException if {@code handler} is null
460 */
461 public ScheduledThreadPoolExecutor(int corePoolSize,
462 RejectedExecutionHandler handler) {
463 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
464 new DelayedWorkQueue(), handler);
465 }
466
467 /**
468 * Creates a new ScheduledThreadPoolExecutor with the given
469 * initial parameters.
470 *
471 * @param corePoolSize the number of threads to keep in the pool, even
472 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
473 * @param threadFactory the factory to use when the executor
474 * creates a new thread
475 * @param handler the handler to use when execution is blocked
476 * because the thread bounds and queue capacities are reached
477 * @throws IllegalArgumentException if {@code corePoolSize < 0}
478 * @throws NullPointerException if {@code threadFactory} or
479 * {@code handler} is null
480 */
481 public ScheduledThreadPoolExecutor(int corePoolSize,
482 ThreadFactory threadFactory,
483 RejectedExecutionHandler handler) {
484 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
485 new DelayedWorkQueue(), threadFactory, handler);
486 }
487
488 /**
489 * Returns the trigger time of a delayed action.
490 */
491 private long triggerTime(long delay, TimeUnit unit) {
492 return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
493 }
494
495 /**
496 * Returns the trigger time of a delayed action.
497 */
498 long triggerTime(long delay) {
499 return now() +
500 ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
501 }
502
503 /**
504 * Constrains the values of all delays in the queue to be within
505 * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
506 * This may occur if a task is eligible to be dequeued, but has
507 * not yet been, while some other task is added with a delay of
508 * Long.MAX_VALUE.
509 */
510 private long overflowFree(long delay) {
511 Delayed head = (Delayed) super.getQueue().peek();
512 if (head != null) {
513 long headDelay = head.getDelay(NANOSECONDS);
514 if (headDelay < 0 && (delay - headDelay < 0))
515 delay = Long.MAX_VALUE + headDelay;
516 }
517 return delay;
518 }
519
520 /**
521 * @throws RejectedExecutionException {@inheritDoc}
522 * @throws NullPointerException {@inheritDoc}
523 */
524 public ScheduledFuture<?> schedule(Runnable command,
525 long delay,
526 TimeUnit unit) {
527 if (command == null || unit == null)
528 throw new NullPointerException();
529 RunnableScheduledFuture<?> t = decorateTask(command,
530 new ScheduledFutureTask<Void>(command, null,
531 triggerTime(delay, unit)));
532 delayedExecute(t);
533 return t;
601 * Executes {@code command} with zero required delay.
602 * This has effect equivalent to
603 * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
604 * Note that inspections of the queue and of the list returned by
605 * {@code shutdownNow} will access the zero-delayed
606 * {@link ScheduledFuture}, not the {@code command} itself.
607 *
608 * <p>A consequence of the use of {@code ScheduledFuture} objects is
609 * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
610 * called with a null second {@code Throwable} argument, even if the
611 * {@code command} terminated abruptly. Instead, the {@code Throwable}
612 * thrown by such a task can be obtained via {@link Future#get}.
613 *
614 * @throws RejectedExecutionException at discretion of
615 * {@code RejectedExecutionHandler}, if the task
616 * cannot be accepted for execution because the
617 * executor has been shut down
618 * @throws NullPointerException {@inheritDoc}
619 */
620 public void execute(Runnable command) {
621 schedule(command, 0, NANOSECONDS);
622 }
623
624 // Override AbstractExecutorService methods
625
626 /**
627 * @throws RejectedExecutionException {@inheritDoc}
628 * @throws NullPointerException {@inheritDoc}
629 */
630 public Future<?> submit(Runnable task) {
631 return schedule(task, 0, NANOSECONDS);
632 }
633
634 /**
635 * @throws RejectedExecutionException {@inheritDoc}
636 * @throws NullPointerException {@inheritDoc}
637 */
638 public <T> Future<T> submit(Runnable task, T result) {
639 return schedule(Executors.callable(task, result), 0, NANOSECONDS);
640 }
641
642 /**
643 * @throws RejectedExecutionException {@inheritDoc}
644 * @throws NullPointerException {@inheritDoc}
645 */
646 public <T> Future<T> submit(Callable<T> task) {
647 return schedule(task, 0, NANOSECONDS);
648 }
649
650 /**
651 * Sets the policy on whether to continue executing existing
652 * periodic tasks even when this executor has been {@code shutdown}.
653 * In this case, these tasks will only terminate upon
654 * {@code shutdownNow} or after setting the policy to
655 * {@code false} when already shutdown.
656 * This value is by default {@code false}.
657 *
658 * @param value if {@code true}, continue after shutdown, else don't.
659 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
660 */
661 public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
662 continueExistingPeriodicTasksAfterShutdown = value;
663 if (!value && isShutdown())
664 onShutdown();
665 }
666
667 /**
815 * heap array. This eliminates the need to find a task upon
816 * cancellation, greatly speeding up removal (down from O(n)
817 * to O(log n)), and reducing garbage retention that would
818 * otherwise occur by waiting for the element to rise to top
819 * before clearing. But because the queue may also hold
820 * RunnableScheduledFutures that are not ScheduledFutureTasks,
821 * we are not guaranteed to have such indices available, in
822 * which case we fall back to linear search. (We expect that
823 * most tasks will not be decorated, and that the faster cases
824 * will be much more common.)
825 *
826 * All heap operations must record index changes -- mainly
827 * within siftUp and siftDown. Upon removal, a task's
828 * heapIndex is set to -1. Note that ScheduledFutureTasks can
829 * appear at most once in the queue (this need not be true for
830 * other kinds of tasks or work queues), so are uniquely
831 * identified by heapIndex.
832 */
833
834 private static final int INITIAL_CAPACITY = 16;
835 private RunnableScheduledFuture<?>[] queue =
836 new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
837 private final ReentrantLock lock = new ReentrantLock();
838 private int size = 0;
839
840 /**
841 * Thread designated to wait for the task at the head of the
842 * queue. This variant of the Leader-Follower pattern
843 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
844 * minimize unnecessary timed waiting. When a thread becomes
845 * the leader, it waits only for the next delay to elapse, but
846 * other threads await indefinitely. The leader thread must
847 * signal some other thread before returning from take() or
848 * poll(...), unless some other thread becomes leader in the
849 * interim. Whenever the head of the queue is replaced with a
850 * task with an earlier expiration time, the leader field is
851 * invalidated by being reset to null, and some waiting
852 * thread, but not necessarily the current leader, is
853 * signalled. So waiting threads must be prepared to acquire
854 * and lose leadership while waiting.
855 */
856 private Thread leader = null;
857
858 /**
859 * Condition signalled when a newer task becomes available at the
860 * head of the queue or a new thread may need to become leader.
861 */
862 private final Condition available = lock.newCondition();
863
864 /**
865 * Set f's heapIndex if it is a ScheduledFutureTask.
866 */
867 private void setIndex(RunnableScheduledFuture<?> f, int idx) {
868 if (f instanceof ScheduledFutureTask)
869 ((ScheduledFutureTask)f).heapIndex = idx;
870 }
871
872 /**
873 * Sift element added at bottom up to its heap-ordered spot.
874 * Call only when holding lock.
875 */
876 private void siftUp(int k, RunnableScheduledFuture<?> key) {
877 while (k > 0) {
878 int parent = (k - 1) >>> 1;
879 RunnableScheduledFuture<?> e = queue[parent];
880 if (key.compareTo(e) >= 0)
881 break;
882 queue[k] = e;
883 setIndex(e, k);
884 k = parent;
885 }
886 queue[k] = key;
887 setIndex(key, k);
888 }
889
890 /**
891 * Sift element added at top down to its heap-ordered spot.
892 * Call only when holding lock.
893 */
894 private void siftDown(int k, RunnableScheduledFuture<?> key) {
895 int half = size >>> 1;
896 while (k < half) {
897 int child = (k << 1) + 1;
898 RunnableScheduledFuture<?> c = queue[child];
899 int right = child + 1;
900 if (right < size && c.compareTo(queue[right]) > 0)
901 c = queue[child = right];
902 if (key.compareTo(c) <= 0)
903 break;
904 queue[k] = c;
905 setIndex(c, k);
906 k = child;
907 }
908 queue[k] = key;
909 setIndex(key, k);
910 }
911
912 /**
913 * Resize the heap array. Call only when holding lock.
914 */
915 private void grow() {
916 int oldCapacity = queue.length;
917 int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
918 if (newCapacity < 0) // overflow
943 public boolean contains(Object x) {
944 final ReentrantLock lock = this.lock;
945 lock.lock();
946 try {
947 return indexOf(x) != -1;
948 } finally {
949 lock.unlock();
950 }
951 }
952
953 public boolean remove(Object x) {
954 final ReentrantLock lock = this.lock;
955 lock.lock();
956 try {
957 int i = indexOf(x);
958 if (i < 0)
959 return false;
960
961 setIndex(queue[i], -1);
962 int s = --size;
963 RunnableScheduledFuture<?> replacement = queue[s];
964 queue[s] = null;
965 if (s != i) {
966 siftDown(i, replacement);
967 if (queue[i] == replacement)
968 siftUp(i, replacement);
969 }
970 return true;
971 } finally {
972 lock.unlock();
973 }
974 }
975
976 public int size() {
977 final ReentrantLock lock = this.lock;
978 lock.lock();
979 try {
980 return size;
981 } finally {
982 lock.unlock();
983 }
984 }
985
986 public boolean isEmpty() {
987 return size() == 0;
988 }
989
990 public int remainingCapacity() {
991 return Integer.MAX_VALUE;
992 }
993
994 public RunnableScheduledFuture<?> peek() {
995 final ReentrantLock lock = this.lock;
996 lock.lock();
997 try {
998 return queue[0];
999 } finally {
1000 lock.unlock();
1001 }
1002 }
1003
1004 public boolean offer(Runnable x) {
1005 if (x == null)
1006 throw new NullPointerException();
1007 RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
1008 final ReentrantLock lock = this.lock;
1009 lock.lock();
1010 try {
1011 int i = size;
1012 if (i >= queue.length)
1013 grow();
1014 size = i + 1;
1015 if (i == 0) {
1016 queue[0] = e;
1017 setIndex(e, 0);
1018 } else {
1019 siftUp(i, e);
1020 }
1021 if (queue[0] == e) {
1022 leader = null;
1023 available.signal();
1024 }
1025 } finally {
1026 lock.unlock();
1027 }
1029 }
1030
1031 public void put(Runnable e) {
1032 offer(e);
1033 }
1034
1035 public boolean add(Runnable e) {
1036 return offer(e);
1037 }
1038
1039 public boolean offer(Runnable e, long timeout, TimeUnit unit) {
1040 return offer(e);
1041 }
1042
1043 /**
1044 * Performs common bookkeeping for poll and take: Replaces
1045 * first element with last and sifts it down. Call only when
1046 * holding lock.
1047 * @param f the task to remove and return
1048 */
1049 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
1050 int s = --size;
1051 RunnableScheduledFuture<?> x = queue[s];
1052 queue[s] = null;
1053 if (s != 0)
1054 siftDown(0, x);
1055 setIndex(f, -1);
1056 return f;
1057 }
1058
1059 public RunnableScheduledFuture<?> poll() {
1060 final ReentrantLock lock = this.lock;
1061 lock.lock();
1062 try {
1063 RunnableScheduledFuture<?> first = queue[0];
1064 if (first == null || first.getDelay(NANOSECONDS) > 0)
1065 return null;
1066 else
1067 return finishPoll(first);
1068 } finally {
1069 lock.unlock();
1070 }
1071 }
1072
1073 public RunnableScheduledFuture<?> take() throws InterruptedException {
1074 final ReentrantLock lock = this.lock;
1075 lock.lockInterruptibly();
1076 try {
1077 for (;;) {
1078 RunnableScheduledFuture<?> first = queue[0];
1079 if (first == null)
1080 available.await();
1081 else {
1082 long delay = first.getDelay(NANOSECONDS);
1083 if (delay <= 0)
1084 return finishPoll(first);
1085 else if (leader != null)
1086 available.await();
1087 else {
1088 Thread thisThread = Thread.currentThread();
1089 leader = thisThread;
1090 try {
1091 available.awaitNanos(delay);
1092 } finally {
1093 if (leader == thisThread)
1094 leader = null;
1095 }
1096 }
1097 }
1098 }
1099 } finally {
1100 if (leader == null && queue[0] != null)
1101 available.signal();
1102 lock.unlock();
1103 }
1104 }
1105
1106 public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
1107 throws InterruptedException {
1108 long nanos = unit.toNanos(timeout);
1109 final ReentrantLock lock = this.lock;
1110 lock.lockInterruptibly();
1111 try {
1112 for (;;) {
1113 RunnableScheduledFuture<?> first = queue[0];
1114 if (first == null) {
1115 if (nanos <= 0)
1116 return null;
1117 else
1118 nanos = available.awaitNanos(nanos);
1119 } else {
1120 long delay = first.getDelay(NANOSECONDS);
1121 if (delay <= 0)
1122 return finishPoll(first);
1123 if (nanos <= 0)
1124 return null;
1125 if (nanos < delay || leader != null)
1126 nanos = available.awaitNanos(nanos);
1127 else {
1128 Thread thisThread = Thread.currentThread();
1129 leader = thisThread;
1130 try {
1131 long timeLeft = available.awaitNanos(delay);
1132 nanos -= delay - timeLeft;
1133 } finally {
1134 if (leader == thisThread)
1135 leader = null;
1136 }
1137 }
1138 }
1139 }
1140 } finally {
1141 if (leader == null && queue[0] != null)
1142 available.signal();
1143 lock.unlock();
1144 }
1145 }
1146
1147 public void clear() {
1148 final ReentrantLock lock = this.lock;
1149 lock.lock();
1150 try {
1151 for (int i = 0; i < size; i++) {
1152 RunnableScheduledFuture<?> t = queue[i];
1153 if (t != null) {
1154 queue[i] = null;
1155 setIndex(t, -1);
1156 }
1157 }
1158 size = 0;
1159 } finally {
1160 lock.unlock();
1161 }
1162 }
1163
1164 /**
1165 * Return and remove first element only if it is expired.
1166 * Used only by drainTo. Call only when holding lock.
1167 */
1168 private RunnableScheduledFuture<?> pollExpired() {
1169 // assert lock.isHeldByCurrentThread();
1170 RunnableScheduledFuture<?> first = queue[0];
1171 if (first == null || first.getDelay(NANOSECONDS) > 0)
1172 return null;
1173 return finishPoll(first);
1174 }
1175
1176 public int drainTo(Collection<? super Runnable> c) {
1177 if (c == null)
1178 throw new NullPointerException();
1179 if (c == this)
1180 throw new IllegalArgumentException();
1181 final ReentrantLock lock = this.lock;
1182 lock.lock();
1183 try {
1184 RunnableScheduledFuture<?> first;
1185 int n = 0;
1186 while ((first = pollExpired()) != null) {
1187 c.add(first);
1188 ++n;
1189 }
1190 return n;
1191 } finally {
1192 lock.unlock();
1193 }
1194 }
1195
1196 public int drainTo(Collection<? super Runnable> c, int maxElements) {
1197 if (c == null)
1198 throw new NullPointerException();
1199 if (c == this)
1200 throw new IllegalArgumentException();
1201 if (maxElements <= 0)
1202 return 0;
1203 final ReentrantLock lock = this.lock;
1204 lock.lock();
1205 try {
1206 RunnableScheduledFuture<?> first;
1207 int n = 0;
1208 while (n < maxElements && (first = pollExpired()) != null) {
1209 c.add(first);
1210 ++n;
1211 }
1212 return n;
1213 } finally {
1214 lock.unlock();
1215 }
1216 }
1217
1218 public Object[] toArray() {
1219 final ReentrantLock lock = this.lock;
1220 lock.lock();
1221 try {
1222 return Arrays.copyOf(queue, size, Object[].class);
1223 } finally {
1224 lock.unlock();
1225 }
1226 }
|