src/share/classes/java/util/concurrent/ScheduledThreadPoolExecutor.java

Print this page

        

*** 32,43 **** * Expert Group and released to the public domain, as explained at * http://creativecommons.org/publicdomain/zero/1.0/ */ package java.util.concurrent; ! import java.util.concurrent.atomic.*; ! import java.util.concurrent.locks.*; import java.util.*; /** * A {@link ThreadPoolExecutor} that can additionally schedule * commands to run after a given delay, or to execute --- 32,45 ---- * Expert Group and released to the public domain, as explained at * http://creativecommons.org/publicdomain/zero/1.0/ */ package java.util.concurrent; ! import static java.util.concurrent.TimeUnit.NANOSECONDS; ! import java.util.concurrent.atomic.AtomicLong; ! import java.util.concurrent.locks.Condition; ! import java.util.concurrent.locks.ReentrantLock; import java.util.*; /** * A {@link ThreadPoolExecutor} that can additionally schedule * commands to run after a given delay, or to execute
*** 164,174 **** /** * Sequence number to break scheduling ties, and in turn to * guarantee FIFO order among tied entries. */ ! private static final AtomicLong sequencer = new AtomicLong(0); /** * Returns current nanosecond time. */ final long now() { --- 166,176 ---- /** * Sequence number to break scheduling ties, and in turn to * guarantee FIFO order among tied entries. */ ! private static final AtomicLong sequencer = new AtomicLong(); /** * Returns current nanosecond time. */ final long now() {
*** 229,239 **** this.period = 0; this.sequenceNumber = sequencer.getAndIncrement(); } public long getDelay(TimeUnit unit) { ! return unit.convert(time - now(), TimeUnit.NANOSECONDS); } public int compareTo(Delayed other) { if (other == this) // compare zero ONLY if same object return 0; --- 231,241 ---- this.period = 0; this.sequenceNumber = sequencer.getAndIncrement(); } public long getDelay(TimeUnit unit) { ! return unit.convert(time - now(), NANOSECONDS); } public int compareTo(Delayed other) { if (other == this) // compare zero ONLY if same object return 0;
*** 247,258 **** else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } ! long d = (getDelay(TimeUnit.NANOSECONDS) - ! other.getDelay(TimeUnit.NANOSECONDS)); return (d == 0) ? 0 : ((d < 0) ? -1 : 1); } /** * Returns true if this is a periodic (not a one-shot) action. --- 249,260 ---- else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } ! long d = (getDelay(NANOSECONDS) - ! other.getDelay(NANOSECONDS)); return (d == 0) ? 0 : ((d < 0) ? -1 : 1); } /** * Returns true if this is a periodic (not a one-shot) action.
*** 422,432 **** * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @throws IllegalArgumentException if {@code corePoolSize < 0} */ public ScheduledThreadPoolExecutor(int corePoolSize) { ! super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new DelayedWorkQueue()); } /** * Creates a new {@code ScheduledThreadPoolExecutor} with the --- 424,434 ---- * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @throws IllegalArgumentException if {@code corePoolSize < 0} */ public ScheduledThreadPoolExecutor(int corePoolSize) { ! super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); } /** * Creates a new {@code ScheduledThreadPoolExecutor} with the
*** 439,449 **** * @throws IllegalArgumentException if {@code corePoolSize < 0} * @throws NullPointerException if {@code threadFactory} is null */ public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { ! super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new DelayedWorkQueue(), threadFactory); } /** * Creates a new ScheduledThreadPoolExecutor with the given --- 441,451 ---- * @throws IllegalArgumentException if {@code corePoolSize < 0} * @throws NullPointerException if {@code threadFactory} is null */ public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { ! super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory); } /** * Creates a new ScheduledThreadPoolExecutor with the given
*** 456,466 **** * @throws IllegalArgumentException if {@code corePoolSize < 0} * @throws NullPointerException if {@code handler} is null */ public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) { ! super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new DelayedWorkQueue(), handler); } /** * Creates a new ScheduledThreadPoolExecutor with the given --- 458,468 ---- * @throws IllegalArgumentException if {@code corePoolSize < 0} * @throws NullPointerException if {@code handler} is null */ public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) { ! super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), handler); } /** * Creates a new ScheduledThreadPoolExecutor with the given
*** 477,487 **** * {@code handler} is null */ public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { ! super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler); } /** * Returns the trigger time of a delayed action. --- 479,489 ---- * {@code handler} is null */ public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { ! super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler); } /** * Returns the trigger time of a delayed action.
*** 506,516 **** * Long.MAX_VALUE. */ private long overflowFree(long delay) { Delayed head = (Delayed) super.getQueue().peek(); if (head != null) { ! long headDelay = head.getDelay(TimeUnit.NANOSECONDS); if (headDelay < 0 && (delay - headDelay < 0)) delay = Long.MAX_VALUE + headDelay; } return delay; } --- 508,518 ---- * Long.MAX_VALUE. */ private long overflowFree(long delay) { Delayed head = (Delayed) super.getQueue().peek(); if (head != null) { ! long headDelay = head.getDelay(NANOSECONDS); if (headDelay < 0 && (delay - headDelay < 0)) delay = Long.MAX_VALUE + headDelay; } return delay; }
*** 614,651 **** * cannot be accepted for execution because the * executor has been shut down * @throws NullPointerException {@inheritDoc} */ public void execute(Runnable command) { ! schedule(command, 0, TimeUnit.NANOSECONDS); } // Override AbstractExecutorService methods /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public Future<?> submit(Runnable task) { ! return schedule(task, 0, TimeUnit.NANOSECONDS); } /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public <T> Future<T> submit(Runnable task, T result) { ! return schedule(Executors.callable(task, result), ! 0, TimeUnit.NANOSECONDS); } /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public <T> Future<T> submit(Callable<T> task) { ! return schedule(task, 0, TimeUnit.NANOSECONDS); } /** * Sets the policy on whether to continue executing existing * periodic tasks even when this executor has been {@code shutdown}. --- 616,652 ---- * cannot be accepted for execution because the * executor has been shut down * @throws NullPointerException {@inheritDoc} */ public void execute(Runnable command) { ! schedule(command, 0, NANOSECONDS); } // Override AbstractExecutorService methods /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public Future<?> submit(Runnable task) { ! return schedule(task, 0, NANOSECONDS); } /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public <T> Future<T> submit(Runnable task, T result) { ! return schedule(Executors.callable(task, result), 0, NANOSECONDS); } /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public <T> Future<T> submit(Callable<T> task) { ! return schedule(task, 0, NANOSECONDS); } /** * Sets the policy on whether to continue executing existing * periodic tasks even when this executor has been {@code shutdown}.
*** 829,840 **** * other kinds of tasks or work queues), so are uniquely * identified by heapIndex. */ private static final int INITIAL_CAPACITY = 16; ! private RunnableScheduledFuture[] queue = ! new RunnableScheduledFuture[INITIAL_CAPACITY]; private final ReentrantLock lock = new ReentrantLock(); private int size = 0; /** * Thread designated to wait for the task at the head of the --- 830,841 ---- * other kinds of tasks or work queues), so are uniquely * identified by heapIndex. */ private static final int INITIAL_CAPACITY = 16; ! private RunnableScheduledFuture<?>[] queue = ! new RunnableScheduledFuture<?>[INITIAL_CAPACITY]; private final ReentrantLock lock = new ReentrantLock(); private int size = 0; /** * Thread designated to wait for the task at the head of the
*** 861,883 **** private final Condition available = lock.newCondition(); /** * Set f's heapIndex if it is a ScheduledFutureTask. */ ! private void setIndex(RunnableScheduledFuture f, int idx) { if (f instanceof ScheduledFutureTask) ((ScheduledFutureTask)f).heapIndex = idx; } /** * Sift element added at bottom up to its heap-ordered spot. * Call only when holding lock. */ ! private void siftUp(int k, RunnableScheduledFuture key) { while (k > 0) { int parent = (k - 1) >>> 1; ! RunnableScheduledFuture e = queue[parent]; if (key.compareTo(e) >= 0) break; queue[k] = e; setIndex(e, k); k = parent; --- 862,884 ---- private final Condition available = lock.newCondition(); /** * Set f's heapIndex if it is a ScheduledFutureTask. */ ! private void setIndex(RunnableScheduledFuture<?> f, int idx) { if (f instanceof ScheduledFutureTask) ((ScheduledFutureTask)f).heapIndex = idx; } /** * Sift element added at bottom up to its heap-ordered spot. * Call only when holding lock. */ ! private void siftUp(int k, RunnableScheduledFuture<?> key) { while (k > 0) { int parent = (k - 1) >>> 1; ! RunnableScheduledFuture<?> e = queue[parent]; if (key.compareTo(e) >= 0) break; queue[k] = e; setIndex(e, k); k = parent;
*** 888,902 **** /** * Sift element added at top down to its heap-ordered spot. * Call only when holding lock. */ ! private void siftDown(int k, RunnableScheduledFuture key) { int half = size >>> 1; while (k < half) { int child = (k << 1) + 1; ! RunnableScheduledFuture c = queue[child]; int right = child + 1; if (right < size && c.compareTo(queue[right]) > 0) c = queue[child = right]; if (key.compareTo(c) <= 0) break; --- 889,903 ---- /** * Sift element added at top down to its heap-ordered spot. * Call only when holding lock. */ ! private void siftDown(int k, RunnableScheduledFuture<?> key) { int half = size >>> 1; while (k < half) { int child = (k << 1) + 1; ! RunnableScheduledFuture<?> c = queue[child]; int right = child + 1; if (right < size && c.compareTo(queue[right]) > 0) c = queue[child = right]; if (key.compareTo(c) <= 0) break;
*** 957,967 **** if (i < 0) return false; setIndex(queue[i], -1); int s = --size; ! RunnableScheduledFuture replacement = queue[s]; queue[s] = null; if (s != i) { siftDown(i, replacement); if (queue[i] == replacement) siftUp(i, replacement); --- 958,968 ---- if (i < 0) return false; setIndex(queue[i], -1); int s = --size; ! RunnableScheduledFuture<?> replacement = queue[s]; queue[s] = null; if (s != i) { siftDown(i, replacement); if (queue[i] == replacement) siftUp(i, replacement);
*** 988,998 **** public int remainingCapacity() { return Integer.MAX_VALUE; } ! public RunnableScheduledFuture peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return queue[0]; } finally { --- 989,999 ---- public int remainingCapacity() { return Integer.MAX_VALUE; } ! public RunnableScheduledFuture<?> peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return queue[0]; } finally {
*** 1001,1011 **** } public boolean offer(Runnable x) { if (x == null) throw new NullPointerException(); ! RunnableScheduledFuture e = (RunnableScheduledFuture)x; final ReentrantLock lock = this.lock; lock.lock(); try { int i = size; if (i >= queue.length) --- 1002,1012 ---- } public boolean offer(Runnable x) { if (x == null) throw new NullPointerException(); ! RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x; final ReentrantLock lock = this.lock; lock.lock(); try { int i = size; if (i >= queue.length)
*** 1043,1086 **** * Performs common bookkeeping for poll and take: Replaces * first element with last and sifts it down. Call only when * holding lock. * @param f the task to remove and return */ ! private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) { int s = --size; ! RunnableScheduledFuture x = queue[s]; queue[s] = null; if (s != 0) siftDown(0, x); setIndex(f, -1); return f; } ! public RunnableScheduledFuture poll() { final ReentrantLock lock = this.lock; lock.lock(); try { ! RunnableScheduledFuture first = queue[0]; ! if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) return null; else return finishPoll(first); } finally { lock.unlock(); } } ! public RunnableScheduledFuture take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { ! RunnableScheduledFuture first = queue[0]; if (first == null) available.await(); else { ! long delay = first.getDelay(TimeUnit.NANOSECONDS); if (delay <= 0) return finishPoll(first); else if (leader != null) available.await(); else { --- 1044,1087 ---- * Performs common bookkeeping for poll and take: Replaces * first element with last and sifts it down. Call only when * holding lock. * @param f the task to remove and return */ ! private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) { int s = --size; ! RunnableScheduledFuture<?> x = queue[s]; queue[s] = null; if (s != 0) siftDown(0, x); setIndex(f, -1); return f; } ! public RunnableScheduledFuture<?> poll() { final ReentrantLock lock = this.lock; lock.lock(); try { ! RunnableScheduledFuture<?> first = queue[0]; ! if (first == null || first.getDelay(NANOSECONDS) > 0) return null; else return finishPoll(first); } finally { lock.unlock(); } } ! public RunnableScheduledFuture<?> take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { ! RunnableScheduledFuture<?> first = queue[0]; if (first == null) available.await(); else { ! long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return finishPoll(first); else if (leader != null) available.await(); else {
*** 1100,1124 **** available.signal(); lock.unlock(); } } ! public RunnableScheduledFuture poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { ! RunnableScheduledFuture first = queue[0]; if (first == null) { if (nanos <= 0) return null; else nanos = available.awaitNanos(nanos); } else { ! long delay = first.getDelay(TimeUnit.NANOSECONDS); if (delay <= 0) return finishPoll(first); if (nanos <= 0) return null; if (nanos < delay || leader != null) --- 1101,1125 ---- available.signal(); lock.unlock(); } } ! public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { ! RunnableScheduledFuture<?> first = queue[0]; if (first == null) { if (nanos <= 0) return null; else nanos = available.awaitNanos(nanos); } else { ! long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return finishPoll(first); if (nanos <= 0) return null; if (nanos < delay || leader != null)
*** 1146,1156 **** public void clear() { final ReentrantLock lock = this.lock; lock.lock(); try { for (int i = 0; i < size; i++) { ! RunnableScheduledFuture t = queue[i]; if (t != null) { queue[i] = null; setIndex(t, -1); } } --- 1147,1157 ---- public void clear() { final ReentrantLock lock = this.lock; lock.lock(); try { for (int i = 0; i < size; i++) { ! RunnableScheduledFuture<?> t = queue[i]; if (t != null) { queue[i] = null; setIndex(t, -1); } }
*** 1162,1174 **** /** * Return and remove first element only if it is expired. * Used only by drainTo. Call only when holding lock. */ ! private RunnableScheduledFuture pollExpired() { ! RunnableScheduledFuture first = queue[0]; ! if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) return null; return finishPoll(first); } public int drainTo(Collection<? super Runnable> c) { --- 1163,1176 ---- /** * Return and remove first element only if it is expired. * Used only by drainTo. Call only when holding lock. */ ! private RunnableScheduledFuture<?> pollExpired() { ! // assert lock.isHeldByCurrentThread(); ! RunnableScheduledFuture<?> first = queue[0]; ! if (first == null || first.getDelay(NANOSECONDS) > 0) return null; return finishPoll(first); } public int drainTo(Collection<? super Runnable> c) {
*** 1177,1187 **** if (c == this) throw new IllegalArgumentException(); final ReentrantLock lock = this.lock; lock.lock(); try { ! RunnableScheduledFuture first; int n = 0; while ((first = pollExpired()) != null) { c.add(first); ++n; } --- 1179,1189 ---- if (c == this) throw new IllegalArgumentException(); final ReentrantLock lock = this.lock; lock.lock(); try { ! RunnableScheduledFuture<?> first; int n = 0; while ((first = pollExpired()) != null) { c.add(first); ++n; }
*** 1199,1209 **** if (maxElements <= 0) return 0; final ReentrantLock lock = this.lock; lock.lock(); try { ! RunnableScheduledFuture first; int n = 0; while (n < maxElements && (first = pollExpired()) != null) { c.add(first); ++n; } --- 1201,1211 ---- if (maxElements <= 0) return 0; final ReentrantLock lock = this.lock; lock.lock(); try { ! RunnableScheduledFuture<?> first; int n = 0; while (n < maxElements && (first = pollExpired()) != null) { c.add(first); ++n; }