src/share/classes/java/util/concurrent/ForkJoinPool.java
Print this page
@@ -524,11 +524,11 @@
* worker thread. (Bits 16-31 are unused.)
*/
private volatile long eventWaiters;
private static final int EVENT_COUNT_SHIFT = 32;
- private static final long WAITER_ID_MASK = (1L << 16) - 1L;
+ private static final int WAITER_ID_MASK = (1 << 16) - 1;
/**
* A counter for events that may wake up worker threads:
* - Submission of a new task to the pool
* - A worker pushing a task on an empty queue
@@ -613,22 +613,32 @@
// Utilities for CASing fields. Note that most of these
// are usually manually inlined by callers
/**
- * Increments running count part of workerCounts
+ * Increments running count part of workerCounts.
*/
final void incrementRunningCount() {
int c;
do {} while (!UNSAFE.compareAndSwapInt(this, workerCountsOffset,
c = workerCounts,
c + ONE_RUNNING));
}
/**
- * Tries to decrement running count unless already zero
+ * Tries to increment running count part of workerCounts.
*/
+ final boolean tryIncrementRunningCount() {
+ int c;
+ return UNSAFE.compareAndSwapInt(this, workerCountsOffset,
+ c = workerCounts,
+ c + ONE_RUNNING);
+ }
+
+ /**
+ * Tries to decrement running count unless already zero.
+ */
final boolean tryDecrementRunningCount() {
int wc = workerCounts;
if ((wc & RUNNING_COUNT_MASK) == 0)
return false;
return UNSAFE.compareAndSwapInt(this, workerCountsOffset,
@@ -696,14 +706,15 @@
int n = ws.length;
if (k < 0 || k >= n || ws[k] != null) {
for (k = 0; k < n && ws[k] != null; ++k)
;
if (k == n)
- ws = Arrays.copyOf(ws, n << 1);
+ ws = workers = Arrays.copyOf(ws, n << 1);
}
ws[k] = w;
- workers = ws; // volatile array write ensures slot visibility
+ int c = eventCount; // advance event count to ensure visibility
+ UNSAFE.compareAndSwapInt(this, eventCountOffset, c, c+1);
} finally {
lock.unlock();
}
return k;
}
@@ -732,11 +743,11 @@
*
* @param w the worker
*/
final void workerTerminated(ForkJoinWorkerThread w) {
forgetWorker(w);
- decrementWorkerCounts(w.isTrimmed()? 0 : ONE_RUNNING, ONE_TOTAL);
+ decrementWorkerCounts(w.isTrimmed() ? 0 : ONE_RUNNING, ONE_TOTAL);
while (w.stealCount != 0) // collect final count
tryAccumulateStealCount(w);
tryTerminate(false);
}
@@ -744,28 +755,27 @@
/**
* Releases workers blocked on a count not equal to current count.
* Normally called after precheck that eventWaiters isn't zero to
* avoid wasted array checks. Gives up upon a change in count or
- * upon releasing two workers, letting others take over.
+ * upon releasing four workers, letting others take over.
*/
private void releaseEventWaiters() {
ForkJoinWorkerThread[] ws = workers;
int n = ws.length;
long h = eventWaiters;
int ec = eventCount;
- boolean releasedOne = false;
+ int releases = 4;
ForkJoinWorkerThread w; int id;
- while ((id = ((int)(h & WAITER_ID_MASK)) - 1) >= 0 &&
+ while ((id = (((int)h) & WAITER_ID_MASK) - 1) >= 0 &&
(int)(h >>> EVENT_COUNT_SHIFT) != ec &&
id < n && (w = ws[id]) != null) {
if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
h, w.nextWaiter)) {
LockSupport.unpark(w);
- if (releasedOne) // exit on second release
+ if (--releases == 0)
break;
- releasedOne = true;
}
if (eventCount != ec)
break;
h = eventWaiters;
}
@@ -791,11 +801,11 @@
*/
private void eventSync(ForkJoinWorkerThread w, int ec) {
long nh = (((long)ec) << EVENT_COUNT_SHIFT) | ((long)(w.poolIndex+1));
long h;
while ((runState < SHUTDOWN || !tryTerminate(false)) &&
- (((int)((h = eventWaiters) & WAITER_ID_MASK)) == 0 ||
+ (((int)(h = eventWaiters) & WAITER_ID_MASK) == 0 ||
(int)(h >>> EVENT_COUNT_SHIFT) == ec) &&
eventCount == ec) {
if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
w.nextWaiter = h, nh)) {
awaitEvent(w, ec);
@@ -818,13 +828,13 @@
private void awaitEvent(ForkJoinWorkerThread w, int ec) {
while (eventCount == ec) {
if (tryAccumulateStealCount(w)) { // transfer while idle
boolean untimed = (w.nextWaiter != 0L ||
(workerCounts & RUNNING_COUNT_MASK) <= 1);
- long startTime = untimed? 0 : System.nanoTime();
+ long startTime = untimed ? 0 : System.nanoTime();
Thread.interrupted(); // clear/ignore interrupt
- if (eventCount != ec || w.isTerminating())
+ if (w.isTerminating() || eventCount != ec)
break; // recheck after clear
if (untimed)
LockSupport.park(w);
else {
LockSupport.parkNanos(w, SHRINK_RATE_NANOS);
@@ -858,11 +868,12 @@
int n = ws.length;
ForkJoinWorkerThread w;
if ((sw = spareWaiters) != 0 &&
(id = (sw & SPARE_ID_MASK) - 1) >= 0 &&
id < n && (w = ws[id]) != null &&
- (workerCounts & RUNNING_COUNT_MASK) < parallelism &&
+ (runState >= TERMINATING ||
+ (workerCounts & RUNNING_COUNT_MASK) < parallelism) &&
spareWaiters == sw &&
UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
sw, w.nextSpare)) {
int c; // increment running count before resume
do {} while (!UNSAFE.compareAndSwapInt
@@ -912,18 +923,14 @@
ForkJoinWorkerThread))
UNSAFE.throwException(fail);
break;
}
w.start(recordWorker(w), ueh);
- if ((workerCounts >>> TOTAL_COUNT_SHIFT) >= pc) {
- int c; // advance event count
- UNSAFE.compareAndSwapInt(this, eventCountOffset,
- c = eventCount, c+1);
+ if ((workerCounts >>> TOTAL_COUNT_SHIFT) >= pc)
break; // add at most one unless total below target
}
}
- }
if (eventWaiters != 0L)
releaseEventWaiters();
}
/**
@@ -953,11 +960,11 @@
sw, w.nextSpare))
shutdown = true;
}
else if ((h = eventWaiters) != 0L) {
long nh;
- int id = ((int)(h & WAITER_ID_MASK)) - 1;
+ int id = (((int)h) & WAITER_ID_MASK) - 1;
if (id >= 0 && id < n && (w = ws[id]) != null &&
(nh = w.nextWaiter) != 0L && // keep at least one worker
UNSAFE.compareAndSwapLong(this, eventWaitersOffset, h, nh))
shutdown = true;
}
@@ -1006,23 +1013,30 @@
if (rs >= TERMINATING) { // propagate shutdown
w.shutdown();
break;
}
if ((inactivate || (active && (rs & ACTIVE_COUNT_MASK) >= pc)) &&
- UNSAFE.compareAndSwapInt(this, runStateOffset, rs, rs - 1))
+ UNSAFE.compareAndSwapInt(this, runStateOffset, rs, --rs)) {
inactivate = active = w.active = false;
- int wc = workerCounts;
+ if (rs == SHUTDOWN) { // all inactive and shut down
+ tryTerminate(false);
+ continue;
+ }
+ }
+ int wc = workerCounts; // try to suspend as spare
if ((wc & RUNNING_COUNT_MASK) > pc) {
if (!(inactivate |= active) && // must inactivate to suspend
- workerCounts == wc && // try to suspend as spare
+ workerCounts == wc &&
UNSAFE.compareAndSwapInt(this, workerCountsOffset,
wc, wc - ONE_RUNNING))
w.suspendAsSpare();
}
else if ((wc >>> TOTAL_COUNT_SHIFT) < pc)
helpMaintainParallelism(); // not enough workers
- else if (!ran) {
+ else if (ran)
+ break;
+ else {
long h = eventWaiters;
int ec = eventCount;
if (h != 0L && (int)(h >>> EVENT_COUNT_SHIFT) != ec)
releaseEventWaiters(); // release others before waiting
else if (ec != wec) {
@@ -1030,62 +1044,93 @@
break;
}
else if (!(inactivate |= active))
eventSync(w, wec); // must inactivate before sync
}
- else
- break;
}
}
/**
* Helps and/or blocks awaiting join of the given task.
* See above for explanation.
*
* @param joinMe the task to join
* @param worker the current worker thread
+ * @param timed true if wait should time out
+ * @param nanos timeout value if timed
*/
- final void awaitJoin(ForkJoinTask<?> joinMe, ForkJoinWorkerThread worker) {
+ final void awaitJoin(ForkJoinTask<?> joinMe, ForkJoinWorkerThread worker,
+ boolean timed, long nanos) {
+ long startTime = timed ? System.nanoTime() : 0L;
int retries = 2 + (parallelism >> 2); // #helpJoins before blocking
+ boolean running = true; // false when count decremented
while (joinMe.status >= 0) {
- int wc;
- worker.helpJoinTask(joinMe);
+ if (runState >= TERMINATING) {
+ joinMe.cancelIgnoringExceptions();
+ break;
+ }
+ running = worker.helpJoinTask(joinMe, running);
if (joinMe.status < 0)
break;
- else if (retries > 0)
+ if (retries > 0) {
--retries;
- else if (((wc = workerCounts) & RUNNING_COUNT_MASK) != 0 &&
- UNSAFE.compareAndSwapInt(this, workerCountsOffset,
- wc, wc - ONE_RUNNING)) {
- int stat, c; long h;
- while ((stat = joinMe.status) >= 0 &&
- (h = eventWaiters) != 0L && // help release others
- (int)(h >>> EVENT_COUNT_SHIFT) != eventCount)
+ continue;
+ }
+ int wc = workerCounts;
+ if ((wc & RUNNING_COUNT_MASK) != 0) {
+ if (running) {
+ if (!UNSAFE.compareAndSwapInt(this, workerCountsOffset,
+ wc, wc - ONE_RUNNING))
+ continue;
+ running = false;
+ }
+ long h = eventWaiters;
+ if (h != 0L && (int)(h >>> EVENT_COUNT_SHIFT) != eventCount)
releaseEventWaiters();
- if (stat >= 0 &&
- ((workerCounts & RUNNING_COUNT_MASK) == 0 ||
- (stat =
- joinMe.internalAwaitDone(JOIN_TIMEOUT_MILLIS)) >= 0))
- helpMaintainParallelism(); // timeout or no running workers
+ if ((workerCounts & RUNNING_COUNT_MASK) != 0) {
+ long ms; int ns;
+ if (!timed) {
+ ms = JOIN_TIMEOUT_MILLIS;
+ ns = 0;
+ }
+ else { // at most JOIN_TIMEOUT_MILLIS per wait
+ long nt = nanos - (System.nanoTime() - startTime);
+ if (nt <= 0L)
+ break;
+ ms = nt / 1000000;
+ if (ms > JOIN_TIMEOUT_MILLIS) {
+ ms = JOIN_TIMEOUT_MILLIS;
+ ns = 0;
+ }
+ else
+ ns = (int) (nt % 1000000);
+ }
+ joinMe.internalAwaitDone(ms, ns);
+ }
+ if (joinMe.status < 0)
+ break;
+ }
+ helpMaintainParallelism();
+ }
+ if (!running) {
+ int c;
do {} while (!UNSAFE.compareAndSwapInt
(this, workerCountsOffset,
c = workerCounts, c + ONE_RUNNING));
- if (stat < 0)
- break; // else restart
}
}
- }
/**
* Same idea as awaitJoin, but no helping, retries, or timeouts.
*/
final void awaitBlocker(ManagedBlocker blocker)
throws InterruptedException {
while (!blocker.isReleasable()) {
int wc = workerCounts;
- if ((wc & RUNNING_COUNT_MASK) != 0 &&
- UNSAFE.compareAndSwapInt(this, workerCountsOffset,
+ if ((wc & RUNNING_COUNT_MASK) == 0)
+ helpMaintainParallelism();
+ else if (UNSAFE.compareAndSwapInt(this, workerCountsOffset,
wc, wc - ONE_RUNNING)) {
try {
while (!blocker.isReleasable()) {
long h = eventWaiters;
if (h != 0L &&
@@ -1127,16 +1172,15 @@
startTerminating();
// Finish now if all threads terminated; else in some subsequent call
if ((workerCounts >>> TOTAL_COUNT_SHIFT) == 0) {
advanceRunLevel(TERMINATED);
- termination.arrive();
+ termination.forceTermination();
}
return true;
}
-
/**
* Actions on transition to TERMINATING
*
* Runs up to four passes through workers: (0) shutting down each
* (without waking up if parked) to quickly spread notifications
@@ -1323,21 +1367,17 @@
}
// Execution methods
/**
- * Common code for execute, invoke and submit
+ * Submits task and creates, starts, or resumes some workers if necessary
*/
private <T> void doSubmit(ForkJoinTask<T> task) {
- if (task == null)
- throw new NullPointerException();
- if (runState >= SHUTDOWN)
- throw new RejectedExecutionException();
submissionQueue.offer(task);
int c; // try to increment event count -- CAS failure OK
UNSAFE.compareAndSwapInt(this, eventCountOffset, c = eventCount, c+1);
- helpMaintainParallelism(); // create, start, or resume some workers
+ helpMaintainParallelism();
}
/**
* Performs the given task, returning its result upon completion.
*
@@ -1346,24 +1386,51 @@
* @throws NullPointerException if the task is null
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
*/
public <T> T invoke(ForkJoinTask<T> task) {
+ if (task == null)
+ throw new NullPointerException();
+ if (runState >= SHUTDOWN)
+ throw new RejectedExecutionException();
+ Thread t = Thread.currentThread();
+ if ((t instanceof ForkJoinWorkerThread) &&
+ ((ForkJoinWorkerThread)t).pool == this)
+ return task.invoke(); // bypass submit if in same pool
+ else {
doSubmit(task);
return task.join();
}
+ }
/**
+ * Unless terminating, forks task if within an ongoing FJ
+ * computation in the current pool, else submits as external task.
+ */
+ private <T> void forkOrSubmit(ForkJoinTask<T> task) {
+ if (runState >= SHUTDOWN)
+ throw new RejectedExecutionException();
+ Thread t = Thread.currentThread();
+ if ((t instanceof ForkJoinWorkerThread) &&
+ ((ForkJoinWorkerThread)t).pool == this)
+ task.fork();
+ else
+ doSubmit(task);
+ }
+
+ /**
* Arranges for (asynchronous) execution of the given task.
*
* @param task the task
* @throws NullPointerException if the task is null
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
*/
public void execute(ForkJoinTask<?> task) {
- doSubmit(task);
+ if (task == null)
+ throw new NullPointerException();
+ forkOrSubmit(task);
}
// AbstractExecutorService methods
/**
@@ -1370,16 +1437,18 @@
* @throws NullPointerException if the task is null
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
*/
public void execute(Runnable task) {
+ if (task == null)
+ throw new NullPointerException();
ForkJoinTask<?> job;
if (task instanceof ForkJoinTask<?>) // avoid re-wrap
job = (ForkJoinTask<?>) task;
else
job = ForkJoinTask.adapt(task, null);
- doSubmit(job);
+ forkOrSubmit(job);
}
/**
* Submits a ForkJoinTask for execution.
*
@@ -1388,48 +1457,56 @@
* @throws NullPointerException if the task is null
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
*/
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
- doSubmit(task);
+ if (task == null)
+ throw new NullPointerException();
+ forkOrSubmit(task);
return task;
}
/**
* @throws NullPointerException if the task is null
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
*/
public <T> ForkJoinTask<T> submit(Callable<T> task) {
+ if (task == null)
+ throw new NullPointerException();
ForkJoinTask<T> job = ForkJoinTask.adapt(task);
- doSubmit(job);
+ forkOrSubmit(job);
return job;
}
/**
* @throws NullPointerException if the task is null
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
*/
public <T> ForkJoinTask<T> submit(Runnable task, T result) {
+ if (task == null)
+ throw new NullPointerException();
ForkJoinTask<T> job = ForkJoinTask.adapt(task, result);
- doSubmit(job);
+ forkOrSubmit(job);
return job;
}
/**
* @throws NullPointerException if the task is null
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
*/
public ForkJoinTask<?> submit(Runnable task) {
+ if (task == null)
+ throw new NullPointerException();
ForkJoinTask<?> job;
if (task instanceof ForkJoinTask<?>) // avoid re-wrap
job = (ForkJoinTask<?>) task;
else
job = ForkJoinTask.adapt(task, null);
- doSubmit(job);
+ forkOrSubmit(job);
return job;
}
/**
* @throws NullPointerException {@inheritDoc}
@@ -1723,12 +1800,15 @@
/**
* Returns {@code true} if the process of termination has
* commenced but not yet completed. This method may be useful for
* debugging. A return of {@code true} reported a sufficient
* period after shutdown may indicate that submitted tasks have
- * ignored or suppressed interruption, causing this executor not
- * to properly terminate.
+ * ignored or suppressed interruption, or are waiting for IO,
+ * causing this executor not to properly terminate. (See the
+ * advisory notes for class {@link ForkJoinTask} stating that
+ * tasks should not normally entail blocking operations. But if
+ * they do, they must abort them on interrupt.)
*
* @return {@code true} if terminating but not yet terminated
*/
public boolean isTerminating() {
return (runState & (TERMINATING|TERMINATED)) == TERMINATING;
@@ -1762,14 +1842,15 @@
* @throws InterruptedException if interrupted while waiting
*/
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
try {
- return termination.awaitAdvanceInterruptibly(0, timeout, unit) > 0;
+ termination.awaitAdvanceInterruptibly(0, timeout, unit);
} catch (TimeoutException ex) {
return false;
}
+ return true;
}
/**
* Interface for extending managed parallelism for tasks running
* in {@link ForkJoinPool}s.