src/share/classes/java/util/concurrent/ForkJoinPool.java
Print this page
*** 524,534 ****
* worker thread. (Bits 16-31 are unused.)
*/
private volatile long eventWaiters;
private static final int EVENT_COUNT_SHIFT = 32;
! private static final long WAITER_ID_MASK = (1L << 16) - 1L;
/**
* A counter for events that may wake up worker threads:
* - Submission of a new task to the pool
* - A worker pushing a task on an empty queue
--- 524,534 ----
* worker thread. (Bits 16-31 are unused.)
*/
private volatile long eventWaiters;
private static final int EVENT_COUNT_SHIFT = 32;
! private static final int WAITER_ID_MASK = (1 << 16) - 1;
/**
* A counter for events that may wake up worker threads:
* - Submission of a new task to the pool
* - A worker pushing a task on an empty queue
*** 613,634 ****
// Utilities for CASing fields. Note that most of these
// are usually manually inlined by callers
/**
! * Increments running count part of workerCounts
*/
final void incrementRunningCount() {
int c;
do {} while (!UNSAFE.compareAndSwapInt(this, workerCountsOffset,
c = workerCounts,
c + ONE_RUNNING));
}
/**
! * Tries to decrement running count unless already zero
*/
final boolean tryDecrementRunningCount() {
int wc = workerCounts;
if ((wc & RUNNING_COUNT_MASK) == 0)
return false;
return UNSAFE.compareAndSwapInt(this, workerCountsOffset,
--- 613,644 ----
// Utilities for CASing fields. Note that most of these
// are usually manually inlined by callers
/**
! * Increments running count part of workerCounts.
*/
final void incrementRunningCount() {
int c;
do {} while (!UNSAFE.compareAndSwapInt(this, workerCountsOffset,
c = workerCounts,
c + ONE_RUNNING));
}
/**
! * Tries to increment running count part of workerCounts.
*/
+ final boolean tryIncrementRunningCount() {
+ int c;
+ return UNSAFE.compareAndSwapInt(this, workerCountsOffset,
+ c = workerCounts,
+ c + ONE_RUNNING);
+ }
+
+ /**
+ * Tries to decrement running count unless already zero.
+ */
final boolean tryDecrementRunningCount() {
int wc = workerCounts;
if ((wc & RUNNING_COUNT_MASK) == 0)
return false;
return UNSAFE.compareAndSwapInt(this, workerCountsOffset,
*** 696,709 ****
int n = ws.length;
if (k < 0 || k >= n || ws[k] != null) {
for (k = 0; k < n && ws[k] != null; ++k)
;
if (k == n)
! ws = Arrays.copyOf(ws, n << 1);
}
ws[k] = w;
! workers = ws; // volatile array write ensures slot visibility
} finally {
lock.unlock();
}
return k;
}
--- 706,720 ----
int n = ws.length;
if (k < 0 || k >= n || ws[k] != null) {
for (k = 0; k < n && ws[k] != null; ++k)
;
if (k == n)
! ws = workers = Arrays.copyOf(ws, n << 1);
}
ws[k] = w;
! int c = eventCount; // advance event count to ensure visibility
! UNSAFE.compareAndSwapInt(this, eventCountOffset, c, c+1);
} finally {
lock.unlock();
}
return k;
}
*** 732,742 ****
*
* @param w the worker
*/
final void workerTerminated(ForkJoinWorkerThread w) {
forgetWorker(w);
! decrementWorkerCounts(w.isTrimmed()? 0 : ONE_RUNNING, ONE_TOTAL);
while (w.stealCount != 0) // collect final count
tryAccumulateStealCount(w);
tryTerminate(false);
}
--- 743,753 ----
*
* @param w the worker
*/
final void workerTerminated(ForkJoinWorkerThread w) {
forgetWorker(w);
! decrementWorkerCounts(w.isTrimmed() ? 0 : ONE_RUNNING, ONE_TOTAL);
while (w.stealCount != 0) // collect final count
tryAccumulateStealCount(w);
tryTerminate(false);
}
*** 744,771 ****
/**
* Releases workers blocked on a count not equal to current count.
* Normally called after precheck that eventWaiters isn't zero to
* avoid wasted array checks. Gives up upon a change in count or
! * upon releasing two workers, letting others take over.
*/
private void releaseEventWaiters() {
ForkJoinWorkerThread[] ws = workers;
int n = ws.length;
long h = eventWaiters;
int ec = eventCount;
! boolean releasedOne = false;
ForkJoinWorkerThread w; int id;
! while ((id = ((int)(h & WAITER_ID_MASK)) - 1) >= 0 &&
(int)(h >>> EVENT_COUNT_SHIFT) != ec &&
id < n && (w = ws[id]) != null) {
if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
h, w.nextWaiter)) {
LockSupport.unpark(w);
! if (releasedOne) // exit on second release
break;
- releasedOne = true;
}
if (eventCount != ec)
break;
h = eventWaiters;
}
--- 755,781 ----
/**
* Releases workers blocked on a count not equal to current count.
* Normally called after precheck that eventWaiters isn't zero to
* avoid wasted array checks. Gives up upon a change in count or
! * upon releasing four workers, letting others take over.
*/
private void releaseEventWaiters() {
ForkJoinWorkerThread[] ws = workers;
int n = ws.length;
long h = eventWaiters;
int ec = eventCount;
! int releases = 4;
ForkJoinWorkerThread w; int id;
! while ((id = (((int)h) & WAITER_ID_MASK) - 1) >= 0 &&
(int)(h >>> EVENT_COUNT_SHIFT) != ec &&
id < n && (w = ws[id]) != null) {
if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
h, w.nextWaiter)) {
LockSupport.unpark(w);
! if (--releases == 0)
break;
}
if (eventCount != ec)
break;
h = eventWaiters;
}
*** 791,801 ****
*/
private void eventSync(ForkJoinWorkerThread w, int ec) {
long nh = (((long)ec) << EVENT_COUNT_SHIFT) | ((long)(w.poolIndex+1));
long h;
while ((runState < SHUTDOWN || !tryTerminate(false)) &&
! (((int)((h = eventWaiters) & WAITER_ID_MASK)) == 0 ||
(int)(h >>> EVENT_COUNT_SHIFT) == ec) &&
eventCount == ec) {
if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
w.nextWaiter = h, nh)) {
awaitEvent(w, ec);
--- 801,811 ----
*/
private void eventSync(ForkJoinWorkerThread w, int ec) {
long nh = (((long)ec) << EVENT_COUNT_SHIFT) | ((long)(w.poolIndex+1));
long h;
while ((runState < SHUTDOWN || !tryTerminate(false)) &&
! (((int)(h = eventWaiters) & WAITER_ID_MASK) == 0 ||
(int)(h >>> EVENT_COUNT_SHIFT) == ec) &&
eventCount == ec) {
if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
w.nextWaiter = h, nh)) {
awaitEvent(w, ec);
*** 818,830 ****
private void awaitEvent(ForkJoinWorkerThread w, int ec) {
while (eventCount == ec) {
if (tryAccumulateStealCount(w)) { // transfer while idle
boolean untimed = (w.nextWaiter != 0L ||
(workerCounts & RUNNING_COUNT_MASK) <= 1);
! long startTime = untimed? 0 : System.nanoTime();
Thread.interrupted(); // clear/ignore interrupt
! if (eventCount != ec || w.isTerminating())
break; // recheck after clear
if (untimed)
LockSupport.park(w);
else {
LockSupport.parkNanos(w, SHRINK_RATE_NANOS);
--- 828,840 ----
private void awaitEvent(ForkJoinWorkerThread w, int ec) {
while (eventCount == ec) {
if (tryAccumulateStealCount(w)) { // transfer while idle
boolean untimed = (w.nextWaiter != 0L ||
(workerCounts & RUNNING_COUNT_MASK) <= 1);
! long startTime = untimed ? 0 : System.nanoTime();
Thread.interrupted(); // clear/ignore interrupt
! if (w.isTerminating() || eventCount != ec)
break; // recheck after clear
if (untimed)
LockSupport.park(w);
else {
LockSupport.parkNanos(w, SHRINK_RATE_NANOS);
*** 858,868 ****
int n = ws.length;
ForkJoinWorkerThread w;
if ((sw = spareWaiters) != 0 &&
(id = (sw & SPARE_ID_MASK) - 1) >= 0 &&
id < n && (w = ws[id]) != null &&
! (workerCounts & RUNNING_COUNT_MASK) < parallelism &&
spareWaiters == sw &&
UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
sw, w.nextSpare)) {
int c; // increment running count before resume
do {} while (!UNSAFE.compareAndSwapInt
--- 868,879 ----
int n = ws.length;
ForkJoinWorkerThread w;
if ((sw = spareWaiters) != 0 &&
(id = (sw & SPARE_ID_MASK) - 1) >= 0 &&
id < n && (w = ws[id]) != null &&
! (runState >= TERMINATING ||
! (workerCounts & RUNNING_COUNT_MASK) < parallelism) &&
spareWaiters == sw &&
UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
sw, w.nextSpare)) {
int c; // increment running count before resume
do {} while (!UNSAFE.compareAndSwapInt
*** 912,929 ****
ForkJoinWorkerThread))
UNSAFE.throwException(fail);
break;
}
w.start(recordWorker(w), ueh);
! if ((workerCounts >>> TOTAL_COUNT_SHIFT) >= pc) {
! int c; // advance event count
! UNSAFE.compareAndSwapInt(this, eventCountOffset,
! c = eventCount, c+1);
break; // add at most one unless total below target
}
}
- }
if (eventWaiters != 0L)
releaseEventWaiters();
}
/**
--- 923,936 ----
ForkJoinWorkerThread))
UNSAFE.throwException(fail);
break;
}
w.start(recordWorker(w), ueh);
! if ((workerCounts >>> TOTAL_COUNT_SHIFT) >= pc)
break; // add at most one unless total below target
}
}
if (eventWaiters != 0L)
releaseEventWaiters();
}
/**
*** 953,963 ****
sw, w.nextSpare))
shutdown = true;
}
else if ((h = eventWaiters) != 0L) {
long nh;
! int id = ((int)(h & WAITER_ID_MASK)) - 1;
if (id >= 0 && id < n && (w = ws[id]) != null &&
(nh = w.nextWaiter) != 0L && // keep at least one worker
UNSAFE.compareAndSwapLong(this, eventWaitersOffset, h, nh))
shutdown = true;
}
--- 960,970 ----
sw, w.nextSpare))
shutdown = true;
}
else if ((h = eventWaiters) != 0L) {
long nh;
! int id = (((int)h) & WAITER_ID_MASK) - 1;
if (id >= 0 && id < n && (w = ws[id]) != null &&
(nh = w.nextWaiter) != 0L && // keep at least one worker
UNSAFE.compareAndSwapLong(this, eventWaitersOffset, h, nh))
shutdown = true;
}
*** 1006,1028 ****
if (rs >= TERMINATING) { // propagate shutdown
w.shutdown();
break;
}
if ((inactivate || (active && (rs & ACTIVE_COUNT_MASK) >= pc)) &&
! UNSAFE.compareAndSwapInt(this, runStateOffset, rs, rs - 1))
inactivate = active = w.active = false;
! int wc = workerCounts;
if ((wc & RUNNING_COUNT_MASK) > pc) {
if (!(inactivate |= active) && // must inactivate to suspend
! workerCounts == wc && // try to suspend as spare
UNSAFE.compareAndSwapInt(this, workerCountsOffset,
wc, wc - ONE_RUNNING))
w.suspendAsSpare();
}
else if ((wc >>> TOTAL_COUNT_SHIFT) < pc)
helpMaintainParallelism(); // not enough workers
! else if (!ran) {
long h = eventWaiters;
int ec = eventCount;
if (h != 0L && (int)(h >>> EVENT_COUNT_SHIFT) != ec)
releaseEventWaiters(); // release others before waiting
else if (ec != wec) {
--- 1013,1042 ----
if (rs >= TERMINATING) { // propagate shutdown
w.shutdown();
break;
}
if ((inactivate || (active && (rs & ACTIVE_COUNT_MASK) >= pc)) &&
! UNSAFE.compareAndSwapInt(this, runStateOffset, rs, --rs)) {
inactivate = active = w.active = false;
! if (rs == SHUTDOWN) { // all inactive and shut down
! tryTerminate(false);
! continue;
! }
! }
! int wc = workerCounts; // try to suspend as spare
if ((wc & RUNNING_COUNT_MASK) > pc) {
if (!(inactivate |= active) && // must inactivate to suspend
! workerCounts == wc &&
UNSAFE.compareAndSwapInt(this, workerCountsOffset,
wc, wc - ONE_RUNNING))
w.suspendAsSpare();
}
else if ((wc >>> TOTAL_COUNT_SHIFT) < pc)
helpMaintainParallelism(); // not enough workers
! else if (ran)
! break;
! else {
long h = eventWaiters;
int ec = eventCount;
if (h != 0L && (int)(h >>> EVENT_COUNT_SHIFT) != ec)
releaseEventWaiters(); // release others before waiting
else if (ec != wec) {
*** 1030,1091 ****
break;
}
else if (!(inactivate |= active))
eventSync(w, wec); // must inactivate before sync
}
- else
- break;
}
}
/**
* Helps and/or blocks awaiting join of the given task.
* See above for explanation.
*
* @param joinMe the task to join
* @param worker the current worker thread
*/
! final void awaitJoin(ForkJoinTask<?> joinMe, ForkJoinWorkerThread worker) {
int retries = 2 + (parallelism >> 2); // #helpJoins before blocking
while (joinMe.status >= 0) {
! int wc;
! worker.helpJoinTask(joinMe);
if (joinMe.status < 0)
break;
! else if (retries > 0)
--retries;
! else if (((wc = workerCounts) & RUNNING_COUNT_MASK) != 0 &&
! UNSAFE.compareAndSwapInt(this, workerCountsOffset,
! wc, wc - ONE_RUNNING)) {
! int stat, c; long h;
! while ((stat = joinMe.status) >= 0 &&
! (h = eventWaiters) != 0L && // help release others
! (int)(h >>> EVENT_COUNT_SHIFT) != eventCount)
releaseEventWaiters();
! if (stat >= 0 &&
! ((workerCounts & RUNNING_COUNT_MASK) == 0 ||
! (stat =
! joinMe.internalAwaitDone(JOIN_TIMEOUT_MILLIS)) >= 0))
! helpMaintainParallelism(); // timeout or no running workers
do {} while (!UNSAFE.compareAndSwapInt
(this, workerCountsOffset,
c = workerCounts, c + ONE_RUNNING));
- if (stat < 0)
- break; // else restart
}
}
- }
/**
* Same idea as awaitJoin, but no helping, retries, or timeouts.
*/
final void awaitBlocker(ManagedBlocker blocker)
throws InterruptedException {
while (!blocker.isReleasable()) {
int wc = workerCounts;
! if ((wc & RUNNING_COUNT_MASK) != 0 &&
! UNSAFE.compareAndSwapInt(this, workerCountsOffset,
wc, wc - ONE_RUNNING)) {
try {
while (!blocker.isReleasable()) {
long h = eventWaiters;
if (h != 0L &&
--- 1044,1136 ----
break;
}
else if (!(inactivate |= active))
eventSync(w, wec); // must inactivate before sync
}
}
}
/**
* Helps and/or blocks awaiting join of the given task.
* See above for explanation.
*
* @param joinMe the task to join
* @param worker the current worker thread
+ * @param timed true if wait should time out
+ * @param nanos timeout value if timed
*/
! final void awaitJoin(ForkJoinTask<?> joinMe, ForkJoinWorkerThread worker,
! boolean timed, long nanos) {
! long startTime = timed ? System.nanoTime() : 0L;
int retries = 2 + (parallelism >> 2); // #helpJoins before blocking
+ boolean running = true; // false when count decremented
while (joinMe.status >= 0) {
! if (runState >= TERMINATING) {
! joinMe.cancelIgnoringExceptions();
! break;
! }
! running = worker.helpJoinTask(joinMe, running);
if (joinMe.status < 0)
break;
! if (retries > 0) {
--retries;
! continue;
! }
! int wc = workerCounts;
! if ((wc & RUNNING_COUNT_MASK) != 0) {
! if (running) {
! if (!UNSAFE.compareAndSwapInt(this, workerCountsOffset,
! wc, wc - ONE_RUNNING))
! continue;
! running = false;
! }
! long h = eventWaiters;
! if (h != 0L && (int)(h >>> EVENT_COUNT_SHIFT) != eventCount)
releaseEventWaiters();
! if ((workerCounts & RUNNING_COUNT_MASK) != 0) {
! long ms; int ns;
! if (!timed) {
! ms = JOIN_TIMEOUT_MILLIS;
! ns = 0;
! }
! else { // at most JOIN_TIMEOUT_MILLIS per wait
! long nt = nanos - (System.nanoTime() - startTime);
! if (nt <= 0L)
! break;
! ms = nt / 1000000;
! if (ms > JOIN_TIMEOUT_MILLIS) {
! ms = JOIN_TIMEOUT_MILLIS;
! ns = 0;
! }
! else
! ns = (int) (nt % 1000000);
! }
! joinMe.internalAwaitDone(ms, ns);
! }
! if (joinMe.status < 0)
! break;
! }
! helpMaintainParallelism();
! }
! if (!running) {
! int c;
do {} while (!UNSAFE.compareAndSwapInt
(this, workerCountsOffset,
c = workerCounts, c + ONE_RUNNING));
}
}
/**
* Same idea as awaitJoin, but no helping, retries, or timeouts.
*/
final void awaitBlocker(ManagedBlocker blocker)
throws InterruptedException {
while (!blocker.isReleasable()) {
int wc = workerCounts;
! if ((wc & RUNNING_COUNT_MASK) == 0)
! helpMaintainParallelism();
! else if (UNSAFE.compareAndSwapInt(this, workerCountsOffset,
wc, wc - ONE_RUNNING)) {
try {
while (!blocker.isReleasable()) {
long h = eventWaiters;
if (h != 0L &&
*** 1127,1142 ****
startTerminating();
// Finish now if all threads terminated; else in some subsequent call
if ((workerCounts >>> TOTAL_COUNT_SHIFT) == 0) {
advanceRunLevel(TERMINATED);
! termination.arrive();
}
return true;
}
-
/**
* Actions on transition to TERMINATING
*
* Runs up to four passes through workers: (0) shutting down each
* (without waking up if parked) to quickly spread notifications
--- 1172,1186 ----
startTerminating();
// Finish now if all threads terminated; else in some subsequent call
if ((workerCounts >>> TOTAL_COUNT_SHIFT) == 0) {
advanceRunLevel(TERMINATED);
! termination.forceTermination();
}
return true;
}
/**
* Actions on transition to TERMINATING
*
* Runs up to four passes through workers: (0) shutting down each
* (without waking up if parked) to quickly spread notifications
*** 1323,1343 ****
}
// Execution methods
/**
! * Common code for execute, invoke and submit
*/
private <T> void doSubmit(ForkJoinTask<T> task) {
- if (task == null)
- throw new NullPointerException();
- if (runState >= SHUTDOWN)
- throw new RejectedExecutionException();
submissionQueue.offer(task);
int c; // try to increment event count -- CAS failure OK
UNSAFE.compareAndSwapInt(this, eventCountOffset, c = eventCount, c+1);
! helpMaintainParallelism(); // create, start, or resume some workers
}
/**
* Performs the given task, returning its result upon completion.
*
--- 1367,1383 ----
}
// Execution methods
/**
! * Submits task and creates, starts, or resumes some workers if necessary
*/
private <T> void doSubmit(ForkJoinTask<T> task) {
submissionQueue.offer(task);
int c; // try to increment event count -- CAS failure OK
UNSAFE.compareAndSwapInt(this, eventCountOffset, c = eventCount, c+1);
! helpMaintainParallelism();
}
/**
* Performs the given task, returning its result upon completion.
*
*** 1346,1369 ****
* @throws NullPointerException if the task is null
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
*/
public <T> T invoke(ForkJoinTask<T> task) {
doSubmit(task);
return task.join();
}
/**
* Arranges for (asynchronous) execution of the given task.
*
* @param task the task
* @throws NullPointerException if the task is null
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
*/
public void execute(ForkJoinTask<?> task) {
! doSubmit(task);
}
// AbstractExecutorService methods
/**
--- 1386,1436 ----
* @throws NullPointerException if the task is null
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
*/
public <T> T invoke(ForkJoinTask<T> task) {
+ if (task == null)
+ throw new NullPointerException();
+ if (runState >= SHUTDOWN)
+ throw new RejectedExecutionException();
+ Thread t = Thread.currentThread();
+ if ((t instanceof ForkJoinWorkerThread) &&
+ ((ForkJoinWorkerThread)t).pool == this)
+ return task.invoke(); // bypass submit if in same pool
+ else {
doSubmit(task);
return task.join();
}
+ }
/**
+ * Unless terminating, forks task if within an ongoing FJ
+ * computation in the current pool, else submits as external task.
+ */
+ private <T> void forkOrSubmit(ForkJoinTask<T> task) {
+ if (runState >= SHUTDOWN)
+ throw new RejectedExecutionException();
+ Thread t = Thread.currentThread();
+ if ((t instanceof ForkJoinWorkerThread) &&
+ ((ForkJoinWorkerThread)t).pool == this)
+ task.fork();
+ else
+ doSubmit(task);
+ }
+
+ /**
* Arranges for (asynchronous) execution of the given task.
*
* @param task the task
* @throws NullPointerException if the task is null
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
*/
public void execute(ForkJoinTask<?> task) {
! if (task == null)
! throw new NullPointerException();
! forkOrSubmit(task);
}
// AbstractExecutorService methods
/**
*** 1370,1385 ****
* @throws NullPointerException if the task is null
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
*/
public void execute(Runnable task) {
ForkJoinTask<?> job;
if (task instanceof ForkJoinTask<?>) // avoid re-wrap
job = (ForkJoinTask<?>) task;
else
job = ForkJoinTask.adapt(task, null);
! doSubmit(job);
}
/**
* Submits a ForkJoinTask for execution.
*
--- 1437,1454 ----
* @throws NullPointerException if the task is null
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
*/
public void execute(Runnable task) {
+ if (task == null)
+ throw new NullPointerException();
ForkJoinTask<?> job;
if (task instanceof ForkJoinTask<?>) // avoid re-wrap
job = (ForkJoinTask<?>) task;
else
job = ForkJoinTask.adapt(task, null);
! forkOrSubmit(job);
}
/**
* Submits a ForkJoinTask for execution.
*
*** 1388,1435 ****
* @throws NullPointerException if the task is null
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
*/
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
! doSubmit(task);
return task;
}
/**
* @throws NullPointerException if the task is null
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
*/
public <T> ForkJoinTask<T> submit(Callable<T> task) {
ForkJoinTask<T> job = ForkJoinTask.adapt(task);
! doSubmit(job);
return job;
}
/**
* @throws NullPointerException if the task is null
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
*/
public <T> ForkJoinTask<T> submit(Runnable task, T result) {
ForkJoinTask<T> job = ForkJoinTask.adapt(task, result);
! doSubmit(job);
return job;
}
/**
* @throws NullPointerException if the task is null
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
*/
public ForkJoinTask<?> submit(Runnable task) {
ForkJoinTask<?> job;
if (task instanceof ForkJoinTask<?>) // avoid re-wrap
job = (ForkJoinTask<?>) task;
else
job = ForkJoinTask.adapt(task, null);
! doSubmit(job);
return job;
}
/**
* @throws NullPointerException {@inheritDoc}
--- 1457,1512 ----
* @throws NullPointerException if the task is null
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
*/
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
! if (task == null)
! throw new NullPointerException();
! forkOrSubmit(task);
return task;
}
/**
* @throws NullPointerException if the task is null
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
*/
public <T> ForkJoinTask<T> submit(Callable<T> task) {
+ if (task == null)
+ throw new NullPointerException();
ForkJoinTask<T> job = ForkJoinTask.adapt(task);
! forkOrSubmit(job);
return job;
}
/**
* @throws NullPointerException if the task is null
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
*/
public <T> ForkJoinTask<T> submit(Runnable task, T result) {
+ if (task == null)
+ throw new NullPointerException();
ForkJoinTask<T> job = ForkJoinTask.adapt(task, result);
! forkOrSubmit(job);
return job;
}
/**
* @throws NullPointerException if the task is null
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
*/
public ForkJoinTask<?> submit(Runnable task) {
+ if (task == null)
+ throw new NullPointerException();
ForkJoinTask<?> job;
if (task instanceof ForkJoinTask<?>) // avoid re-wrap
job = (ForkJoinTask<?>) task;
else
job = ForkJoinTask.adapt(task, null);
! forkOrSubmit(job);
return job;
}
/**
* @throws NullPointerException {@inheritDoc}
*** 1723,1734 ****
/**
* Returns {@code true} if the process of termination has
* commenced but not yet completed. This method may be useful for
* debugging. A return of {@code true} reported a sufficient
* period after shutdown may indicate that submitted tasks have
! * ignored or suppressed interruption, causing this executor not
! * to properly terminate.
*
* @return {@code true} if terminating but not yet terminated
*/
public boolean isTerminating() {
return (runState & (TERMINATING|TERMINATED)) == TERMINATING;
--- 1800,1814 ----
/**
* Returns {@code true} if the process of termination has
* commenced but not yet completed. This method may be useful for
* debugging. A return of {@code true} reported a sufficient
* period after shutdown may indicate that submitted tasks have
! * ignored or suppressed interruption, or are waiting for IO,
! * causing this executor not to properly terminate. (See the
! * advisory notes for class {@link ForkJoinTask} stating that
! * tasks should not normally entail blocking operations. But if
! * they do, they must abort them on interrupt.)
*
* @return {@code true} if terminating but not yet terminated
*/
public boolean isTerminating() {
return (runState & (TERMINATING|TERMINATED)) == TERMINATING;
*** 1762,1775 ****
* @throws InterruptedException if interrupted while waiting
*/
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
try {
! return termination.awaitAdvanceInterruptibly(0, timeout, unit) > 0;
} catch (TimeoutException ex) {
return false;
}
}
/**
* Interface for extending managed parallelism for tasks running
* in {@link ForkJoinPool}s.
--- 1842,1856 ----
* @throws InterruptedException if interrupted while waiting
*/
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
try {
! termination.awaitAdvanceInterruptibly(0, timeout, unit);
} catch (TimeoutException ex) {
return false;
}
+ return true;
}
/**
* Interface for extending managed parallelism for tasks running
* in {@link ForkJoinPool}s.