src/share/classes/java/util/concurrent/ScheduledThreadPoolExecutor.java
Print this page
@@ -32,12 +32,14 @@
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package java.util.concurrent;
-import java.util.concurrent.atomic.*;
-import java.util.concurrent.locks.*;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.*;
/**
* A {@link ThreadPoolExecutor} that can additionally schedule
* commands to run after a given delay, or to execute
@@ -164,11 +166,11 @@
/**
* Sequence number to break scheduling ties, and in turn to
* guarantee FIFO order among tied entries.
*/
- private static final AtomicLong sequencer = new AtomicLong(0);
+ private static final AtomicLong sequencer = new AtomicLong();
/**
* Returns current nanosecond time.
*/
final long now() {
@@ -229,11 +231,11 @@
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
public long getDelay(TimeUnit unit) {
- return unit.convert(time - now(), TimeUnit.NANOSECONDS);
+ return unit.convert(time - now(), NANOSECONDS);
}
public int compareTo(Delayed other) {
if (other == this) // compare zero ONLY if same object
return 0;
@@ -247,12 +249,12 @@
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
- long d = (getDelay(TimeUnit.NANOSECONDS) -
- other.getDelay(TimeUnit.NANOSECONDS));
+ long d = (getDelay(NANOSECONDS) -
+ other.getDelay(NANOSECONDS));
return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
}
/**
* Returns true if this is a periodic (not a one-shot) action.
@@ -422,11 +424,11 @@
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @throws IllegalArgumentException if {@code corePoolSize < 0}
*/
public ScheduledThreadPoolExecutor(int corePoolSize) {
- super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
+ super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
/**
* Creates a new {@code ScheduledThreadPoolExecutor} with the
@@ -439,11 +441,11 @@
* @throws IllegalArgumentException if {@code corePoolSize < 0}
* @throws NullPointerException if {@code threadFactory} is null
*/
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
- super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
+ super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
/**
* Creates a new ScheduledThreadPoolExecutor with the given
@@ -456,11 +458,11 @@
* @throws IllegalArgumentException if {@code corePoolSize < 0}
* @throws NullPointerException if {@code handler} is null
*/
public ScheduledThreadPoolExecutor(int corePoolSize,
RejectedExecutionHandler handler) {
- super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
+ super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), handler);
}
/**
* Creates a new ScheduledThreadPoolExecutor with the given
@@ -477,11 +479,11 @@
* {@code handler} is null
*/
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
- super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
+ super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
/**
* Returns the trigger time of a delayed action.
@@ -506,11 +508,11 @@
* Long.MAX_VALUE.
*/
private long overflowFree(long delay) {
Delayed head = (Delayed) super.getQueue().peek();
if (head != null) {
- long headDelay = head.getDelay(TimeUnit.NANOSECONDS);
+ long headDelay = head.getDelay(NANOSECONDS);
if (headDelay < 0 && (delay - headDelay < 0))
delay = Long.MAX_VALUE + headDelay;
}
return delay;
}
@@ -614,38 +616,37 @@
* cannot be accepted for execution because the
* executor has been shut down
* @throws NullPointerException {@inheritDoc}
*/
public void execute(Runnable command) {
- schedule(command, 0, TimeUnit.NANOSECONDS);
+ schedule(command, 0, NANOSECONDS);
}
// Override AbstractExecutorService methods
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public Future<?> submit(Runnable task) {
- return schedule(task, 0, TimeUnit.NANOSECONDS);
+ return schedule(task, 0, NANOSECONDS);
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Runnable task, T result) {
- return schedule(Executors.callable(task, result),
- 0, TimeUnit.NANOSECONDS);
+ return schedule(Executors.callable(task, result), 0, NANOSECONDS);
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Callable<T> task) {
- return schedule(task, 0, TimeUnit.NANOSECONDS);
+ return schedule(task, 0, NANOSECONDS);
}
/**
* Sets the policy on whether to continue executing existing
* periodic tasks even when this executor has been {@code shutdown}.
@@ -829,12 +830,12 @@
* other kinds of tasks or work queues), so are uniquely
* identified by heapIndex.
*/
private static final int INITIAL_CAPACITY = 16;
- private RunnableScheduledFuture[] queue =
- new RunnableScheduledFuture[INITIAL_CAPACITY];
+ private RunnableScheduledFuture<?>[] queue =
+ new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
private final ReentrantLock lock = new ReentrantLock();
private int size = 0;
/**
* Thread designated to wait for the task at the head of the
@@ -861,23 +862,23 @@
private final Condition available = lock.newCondition();
/**
* Set f's heapIndex if it is a ScheduledFutureTask.
*/
- private void setIndex(RunnableScheduledFuture f, int idx) {
+ private void setIndex(RunnableScheduledFuture<?> f, int idx) {
if (f instanceof ScheduledFutureTask)
((ScheduledFutureTask)f).heapIndex = idx;
}
/**
* Sift element added at bottom up to its heap-ordered spot.
* Call only when holding lock.
*/
- private void siftUp(int k, RunnableScheduledFuture key) {
+ private void siftUp(int k, RunnableScheduledFuture<?> key) {
while (k > 0) {
int parent = (k - 1) >>> 1;
- RunnableScheduledFuture e = queue[parent];
+ RunnableScheduledFuture<?> e = queue[parent];
if (key.compareTo(e) >= 0)
break;
queue[k] = e;
setIndex(e, k);
k = parent;
@@ -888,15 +889,15 @@
/**
* Sift element added at top down to its heap-ordered spot.
* Call only when holding lock.
*/
- private void siftDown(int k, RunnableScheduledFuture key) {
+ private void siftDown(int k, RunnableScheduledFuture<?> key) {
int half = size >>> 1;
while (k < half) {
int child = (k << 1) + 1;
- RunnableScheduledFuture c = queue[child];
+ RunnableScheduledFuture<?> c = queue[child];
int right = child + 1;
if (right < size && c.compareTo(queue[right]) > 0)
c = queue[child = right];
if (key.compareTo(c) <= 0)
break;
@@ -957,11 +958,11 @@
if (i < 0)
return false;
setIndex(queue[i], -1);
int s = --size;
- RunnableScheduledFuture replacement = queue[s];
+ RunnableScheduledFuture<?> replacement = queue[s];
queue[s] = null;
if (s != i) {
siftDown(i, replacement);
if (queue[i] == replacement)
siftUp(i, replacement);
@@ -988,11 +989,11 @@
public int remainingCapacity() {
return Integer.MAX_VALUE;
}
- public RunnableScheduledFuture peek() {
+ public RunnableScheduledFuture<?> peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return queue[0];
} finally {
@@ -1001,11 +1002,11 @@
}
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
- RunnableScheduledFuture e = (RunnableScheduledFuture)x;
+ RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length)
@@ -1043,44 +1044,44 @@
* Performs common bookkeeping for poll and take: Replaces
* first element with last and sifts it down. Call only when
* holding lock.
* @param f the task to remove and return
*/
- private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) {
+ private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
int s = --size;
- RunnableScheduledFuture x = queue[s];
+ RunnableScheduledFuture<?> x = queue[s];
queue[s] = null;
if (s != 0)
siftDown(0, x);
setIndex(f, -1);
return f;
}
- public RunnableScheduledFuture poll() {
+ public RunnableScheduledFuture<?> poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
- RunnableScheduledFuture first = queue[0];
- if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
+ RunnableScheduledFuture<?> first = queue[0];
+ if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
return finishPoll(first);
} finally {
lock.unlock();
}
}
- public RunnableScheduledFuture take() throws InterruptedException {
+ public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
- RunnableScheduledFuture first = queue[0];
+ RunnableScheduledFuture<?> first = queue[0];
if (first == null)
available.await();
else {
- long delay = first.getDelay(TimeUnit.NANOSECONDS);
+ long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return finishPoll(first);
else if (leader != null)
available.await();
else {
@@ -1100,25 +1101,25 @@
available.signal();
lock.unlock();
}
}
- public RunnableScheduledFuture poll(long timeout, TimeUnit unit)
+ public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
- RunnableScheduledFuture first = queue[0];
+ RunnableScheduledFuture<?> first = queue[0];
if (first == null) {
if (nanos <= 0)
return null;
else
nanos = available.awaitNanos(nanos);
} else {
- long delay = first.getDelay(TimeUnit.NANOSECONDS);
+ long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return finishPoll(first);
if (nanos <= 0)
return null;
if (nanos < delay || leader != null)
@@ -1146,11 +1147,11 @@
public void clear() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
for (int i = 0; i < size; i++) {
- RunnableScheduledFuture t = queue[i];
+ RunnableScheduledFuture<?> t = queue[i];
if (t != null) {
queue[i] = null;
setIndex(t, -1);
}
}
@@ -1162,13 +1163,14 @@
/**
* Return and remove first element only if it is expired.
* Used only by drainTo. Call only when holding lock.
*/
- private RunnableScheduledFuture pollExpired() {
- RunnableScheduledFuture first = queue[0];
- if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
+ private RunnableScheduledFuture<?> pollExpired() {
+ // assert lock.isHeldByCurrentThread();
+ RunnableScheduledFuture<?> first = queue[0];
+ if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
return finishPoll(first);
}
public int drainTo(Collection<? super Runnable> c) {
@@ -1177,11 +1179,11 @@
if (c == this)
throw new IllegalArgumentException();
final ReentrantLock lock = this.lock;
lock.lock();
try {
- RunnableScheduledFuture first;
+ RunnableScheduledFuture<?> first;
int n = 0;
while ((first = pollExpired()) != null) {
c.add(first);
++n;
}
@@ -1199,11 +1201,11 @@
if (maxElements <= 0)
return 0;
final ReentrantLock lock = this.lock;
lock.lock();
try {
- RunnableScheduledFuture first;
+ RunnableScheduledFuture<?> first;
int n = 0;
while (n < maxElements && (first = pollExpired()) != null) {
c.add(first);
++n;
}