src/share/classes/java/util/concurrent/ForkJoinPool.java

Print this page

        

*** 524,534 **** * worker thread. (Bits 16-31 are unused.) */ private volatile long eventWaiters; private static final int EVENT_COUNT_SHIFT = 32; ! private static final long WAITER_ID_MASK = (1L << 16) - 1L; /** * A counter for events that may wake up worker threads: * - Submission of a new task to the pool * - A worker pushing a task on an empty queue --- 524,534 ---- * worker thread. (Bits 16-31 are unused.) */ private volatile long eventWaiters; private static final int EVENT_COUNT_SHIFT = 32; ! private static final int WAITER_ID_MASK = (1 << 16) - 1; /** * A counter for events that may wake up worker threads: * - Submission of a new task to the pool * - A worker pushing a task on an empty queue
*** 613,634 **** // Utilities for CASing fields. Note that most of these // are usually manually inlined by callers /** ! * Increments running count part of workerCounts */ final void incrementRunningCount() { int c; do {} while (!UNSAFE.compareAndSwapInt(this, workerCountsOffset, c = workerCounts, c + ONE_RUNNING)); } /** ! * Tries to decrement running count unless already zero */ final boolean tryDecrementRunningCount() { int wc = workerCounts; if ((wc & RUNNING_COUNT_MASK) == 0) return false; return UNSAFE.compareAndSwapInt(this, workerCountsOffset, --- 613,644 ---- // Utilities for CASing fields. Note that most of these // are usually manually inlined by callers /** ! * Increments running count part of workerCounts. */ final void incrementRunningCount() { int c; do {} while (!UNSAFE.compareAndSwapInt(this, workerCountsOffset, c = workerCounts, c + ONE_RUNNING)); } /** ! * Tries to increment running count part of workerCounts. */ + final boolean tryIncrementRunningCount() { + int c; + return UNSAFE.compareAndSwapInt(this, workerCountsOffset, + c = workerCounts, + c + ONE_RUNNING); + } + + /** + * Tries to decrement running count unless already zero. + */ final boolean tryDecrementRunningCount() { int wc = workerCounts; if ((wc & RUNNING_COUNT_MASK) == 0) return false; return UNSAFE.compareAndSwapInt(this, workerCountsOffset,
*** 696,709 **** int n = ws.length; if (k < 0 || k >= n || ws[k] != null) { for (k = 0; k < n && ws[k] != null; ++k) ; if (k == n) ! ws = Arrays.copyOf(ws, n << 1); } ws[k] = w; ! workers = ws; // volatile array write ensures slot visibility } finally { lock.unlock(); } return k; } --- 706,720 ---- int n = ws.length; if (k < 0 || k >= n || ws[k] != null) { for (k = 0; k < n && ws[k] != null; ++k) ; if (k == n) ! ws = workers = Arrays.copyOf(ws, n << 1); } ws[k] = w; ! int c = eventCount; // advance event count to ensure visibility ! UNSAFE.compareAndSwapInt(this, eventCountOffset, c, c+1); } finally { lock.unlock(); } return k; }
*** 732,742 **** * * @param w the worker */ final void workerTerminated(ForkJoinWorkerThread w) { forgetWorker(w); ! decrementWorkerCounts(w.isTrimmed()? 0 : ONE_RUNNING, ONE_TOTAL); while (w.stealCount != 0) // collect final count tryAccumulateStealCount(w); tryTerminate(false); } --- 743,753 ---- * * @param w the worker */ final void workerTerminated(ForkJoinWorkerThread w) { forgetWorker(w); ! decrementWorkerCounts(w.isTrimmed() ? 0 : ONE_RUNNING, ONE_TOTAL); while (w.stealCount != 0) // collect final count tryAccumulateStealCount(w); tryTerminate(false); }
*** 744,771 **** /** * Releases workers blocked on a count not equal to current count. * Normally called after precheck that eventWaiters isn't zero to * avoid wasted array checks. Gives up upon a change in count or ! * upon releasing two workers, letting others take over. */ private void releaseEventWaiters() { ForkJoinWorkerThread[] ws = workers; int n = ws.length; long h = eventWaiters; int ec = eventCount; ! boolean releasedOne = false; ForkJoinWorkerThread w; int id; ! while ((id = ((int)(h & WAITER_ID_MASK)) - 1) >= 0 && (int)(h >>> EVENT_COUNT_SHIFT) != ec && id < n && (w = ws[id]) != null) { if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset, h, w.nextWaiter)) { LockSupport.unpark(w); ! if (releasedOne) // exit on second release break; - releasedOne = true; } if (eventCount != ec) break; h = eventWaiters; } --- 755,781 ---- /** * Releases workers blocked on a count not equal to current count. * Normally called after precheck that eventWaiters isn't zero to * avoid wasted array checks. Gives up upon a change in count or ! * upon releasing four workers, letting others take over. */ private void releaseEventWaiters() { ForkJoinWorkerThread[] ws = workers; int n = ws.length; long h = eventWaiters; int ec = eventCount; ! int releases = 4; ForkJoinWorkerThread w; int id; ! while ((id = (((int)h) & WAITER_ID_MASK) - 1) >= 0 && (int)(h >>> EVENT_COUNT_SHIFT) != ec && id < n && (w = ws[id]) != null) { if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset, h, w.nextWaiter)) { LockSupport.unpark(w); ! if (--releases == 0) break; } if (eventCount != ec) break; h = eventWaiters; }
*** 791,801 **** */ private void eventSync(ForkJoinWorkerThread w, int ec) { long nh = (((long)ec) << EVENT_COUNT_SHIFT) | ((long)(w.poolIndex+1)); long h; while ((runState < SHUTDOWN || !tryTerminate(false)) && ! (((int)((h = eventWaiters) & WAITER_ID_MASK)) == 0 || (int)(h >>> EVENT_COUNT_SHIFT) == ec) && eventCount == ec) { if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset, w.nextWaiter = h, nh)) { awaitEvent(w, ec); --- 801,811 ---- */ private void eventSync(ForkJoinWorkerThread w, int ec) { long nh = (((long)ec) << EVENT_COUNT_SHIFT) | ((long)(w.poolIndex+1)); long h; while ((runState < SHUTDOWN || !tryTerminate(false)) && ! (((int)(h = eventWaiters) & WAITER_ID_MASK) == 0 || (int)(h >>> EVENT_COUNT_SHIFT) == ec) && eventCount == ec) { if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset, w.nextWaiter = h, nh)) { awaitEvent(w, ec);
*** 818,830 **** private void awaitEvent(ForkJoinWorkerThread w, int ec) { while (eventCount == ec) { if (tryAccumulateStealCount(w)) { // transfer while idle boolean untimed = (w.nextWaiter != 0L || (workerCounts & RUNNING_COUNT_MASK) <= 1); ! long startTime = untimed? 0 : System.nanoTime(); Thread.interrupted(); // clear/ignore interrupt ! if (eventCount != ec || w.isTerminating()) break; // recheck after clear if (untimed) LockSupport.park(w); else { LockSupport.parkNanos(w, SHRINK_RATE_NANOS); --- 828,840 ---- private void awaitEvent(ForkJoinWorkerThread w, int ec) { while (eventCount == ec) { if (tryAccumulateStealCount(w)) { // transfer while idle boolean untimed = (w.nextWaiter != 0L || (workerCounts & RUNNING_COUNT_MASK) <= 1); ! long startTime = untimed ? 0 : System.nanoTime(); Thread.interrupted(); // clear/ignore interrupt ! if (w.isTerminating() || eventCount != ec) break; // recheck after clear if (untimed) LockSupport.park(w); else { LockSupport.parkNanos(w, SHRINK_RATE_NANOS);
*** 858,868 **** int n = ws.length; ForkJoinWorkerThread w; if ((sw = spareWaiters) != 0 && (id = (sw & SPARE_ID_MASK) - 1) >= 0 && id < n && (w = ws[id]) != null && ! (workerCounts & RUNNING_COUNT_MASK) < parallelism && spareWaiters == sw && UNSAFE.compareAndSwapInt(this, spareWaitersOffset, sw, w.nextSpare)) { int c; // increment running count before resume do {} while (!UNSAFE.compareAndSwapInt --- 868,879 ---- int n = ws.length; ForkJoinWorkerThread w; if ((sw = spareWaiters) != 0 && (id = (sw & SPARE_ID_MASK) - 1) >= 0 && id < n && (w = ws[id]) != null && ! (runState >= TERMINATING || ! (workerCounts & RUNNING_COUNT_MASK) < parallelism) && spareWaiters == sw && UNSAFE.compareAndSwapInt(this, spareWaitersOffset, sw, w.nextSpare)) { int c; // increment running count before resume do {} while (!UNSAFE.compareAndSwapInt
*** 912,929 **** ForkJoinWorkerThread)) UNSAFE.throwException(fail); break; } w.start(recordWorker(w), ueh); ! if ((workerCounts >>> TOTAL_COUNT_SHIFT) >= pc) { ! int c; // advance event count ! UNSAFE.compareAndSwapInt(this, eventCountOffset, ! c = eventCount, c+1); break; // add at most one unless total below target } } - } if (eventWaiters != 0L) releaseEventWaiters(); } /** --- 923,936 ---- ForkJoinWorkerThread)) UNSAFE.throwException(fail); break; } w.start(recordWorker(w), ueh); ! if ((workerCounts >>> TOTAL_COUNT_SHIFT) >= pc) break; // add at most one unless total below target } } if (eventWaiters != 0L) releaseEventWaiters(); } /**
*** 953,963 **** sw, w.nextSpare)) shutdown = true; } else if ((h = eventWaiters) != 0L) { long nh; ! int id = ((int)(h & WAITER_ID_MASK)) - 1; if (id >= 0 && id < n && (w = ws[id]) != null && (nh = w.nextWaiter) != 0L && // keep at least one worker UNSAFE.compareAndSwapLong(this, eventWaitersOffset, h, nh)) shutdown = true; } --- 960,970 ---- sw, w.nextSpare)) shutdown = true; } else if ((h = eventWaiters) != 0L) { long nh; ! int id = (((int)h) & WAITER_ID_MASK) - 1; if (id >= 0 && id < n && (w = ws[id]) != null && (nh = w.nextWaiter) != 0L && // keep at least one worker UNSAFE.compareAndSwapLong(this, eventWaitersOffset, h, nh)) shutdown = true; }
*** 1006,1028 **** if (rs >= TERMINATING) { // propagate shutdown w.shutdown(); break; } if ((inactivate || (active && (rs & ACTIVE_COUNT_MASK) >= pc)) && ! UNSAFE.compareAndSwapInt(this, runStateOffset, rs, rs - 1)) inactivate = active = w.active = false; ! int wc = workerCounts; if ((wc & RUNNING_COUNT_MASK) > pc) { if (!(inactivate |= active) && // must inactivate to suspend ! workerCounts == wc && // try to suspend as spare UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc, wc - ONE_RUNNING)) w.suspendAsSpare(); } else if ((wc >>> TOTAL_COUNT_SHIFT) < pc) helpMaintainParallelism(); // not enough workers ! else if (!ran) { long h = eventWaiters; int ec = eventCount; if (h != 0L && (int)(h >>> EVENT_COUNT_SHIFT) != ec) releaseEventWaiters(); // release others before waiting else if (ec != wec) { --- 1013,1042 ---- if (rs >= TERMINATING) { // propagate shutdown w.shutdown(); break; } if ((inactivate || (active && (rs & ACTIVE_COUNT_MASK) >= pc)) && ! UNSAFE.compareAndSwapInt(this, runStateOffset, rs, --rs)) { inactivate = active = w.active = false; ! if (rs == SHUTDOWN) { // all inactive and shut down ! tryTerminate(false); ! continue; ! } ! } ! int wc = workerCounts; // try to suspend as spare if ((wc & RUNNING_COUNT_MASK) > pc) { if (!(inactivate |= active) && // must inactivate to suspend ! workerCounts == wc && UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc, wc - ONE_RUNNING)) w.suspendAsSpare(); } else if ((wc >>> TOTAL_COUNT_SHIFT) < pc) helpMaintainParallelism(); // not enough workers ! else if (ran) ! break; ! else { long h = eventWaiters; int ec = eventCount; if (h != 0L && (int)(h >>> EVENT_COUNT_SHIFT) != ec) releaseEventWaiters(); // release others before waiting else if (ec != wec) {
*** 1030,1091 **** break; } else if (!(inactivate |= active)) eventSync(w, wec); // must inactivate before sync } - else - break; } } /** * Helps and/or blocks awaiting join of the given task. * See above for explanation. * * @param joinMe the task to join * @param worker the current worker thread */ ! final void awaitJoin(ForkJoinTask<?> joinMe, ForkJoinWorkerThread worker) { int retries = 2 + (parallelism >> 2); // #helpJoins before blocking while (joinMe.status >= 0) { ! int wc; ! worker.helpJoinTask(joinMe); if (joinMe.status < 0) break; ! else if (retries > 0) --retries; ! else if (((wc = workerCounts) & RUNNING_COUNT_MASK) != 0 && ! UNSAFE.compareAndSwapInt(this, workerCountsOffset, ! wc, wc - ONE_RUNNING)) { ! int stat, c; long h; ! while ((stat = joinMe.status) >= 0 && ! (h = eventWaiters) != 0L && // help release others ! (int)(h >>> EVENT_COUNT_SHIFT) != eventCount) releaseEventWaiters(); ! if (stat >= 0 && ! ((workerCounts & RUNNING_COUNT_MASK) == 0 || ! (stat = ! joinMe.internalAwaitDone(JOIN_TIMEOUT_MILLIS)) >= 0)) ! helpMaintainParallelism(); // timeout or no running workers do {} while (!UNSAFE.compareAndSwapInt (this, workerCountsOffset, c = workerCounts, c + ONE_RUNNING)); - if (stat < 0) - break; // else restart } } - } /** * Same idea as awaitJoin, but no helping, retries, or timeouts. */ final void awaitBlocker(ManagedBlocker blocker) throws InterruptedException { while (!blocker.isReleasable()) { int wc = workerCounts; ! if ((wc & RUNNING_COUNT_MASK) != 0 && ! UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc, wc - ONE_RUNNING)) { try { while (!blocker.isReleasable()) { long h = eventWaiters; if (h != 0L && --- 1044,1136 ---- break; } else if (!(inactivate |= active)) eventSync(w, wec); // must inactivate before sync } } } /** * Helps and/or blocks awaiting join of the given task. * See above for explanation. * * @param joinMe the task to join * @param worker the current worker thread + * @param timed true if wait should time out + * @param nanos timeout value if timed */ ! final void awaitJoin(ForkJoinTask<?> joinMe, ForkJoinWorkerThread worker, ! boolean timed, long nanos) { ! long startTime = timed ? System.nanoTime() : 0L; int retries = 2 + (parallelism >> 2); // #helpJoins before blocking + boolean running = true; // false when count decremented while (joinMe.status >= 0) { ! if (runState >= TERMINATING) { ! joinMe.cancelIgnoringExceptions(); ! break; ! } ! running = worker.helpJoinTask(joinMe, running); if (joinMe.status < 0) break; ! if (retries > 0) { --retries; ! continue; ! } ! int wc = workerCounts; ! if ((wc & RUNNING_COUNT_MASK) != 0) { ! if (running) { ! if (!UNSAFE.compareAndSwapInt(this, workerCountsOffset, ! wc, wc - ONE_RUNNING)) ! continue; ! running = false; ! } ! long h = eventWaiters; ! if (h != 0L && (int)(h >>> EVENT_COUNT_SHIFT) != eventCount) releaseEventWaiters(); ! if ((workerCounts & RUNNING_COUNT_MASK) != 0) { ! long ms; int ns; ! if (!timed) { ! ms = JOIN_TIMEOUT_MILLIS; ! ns = 0; ! } ! else { // at most JOIN_TIMEOUT_MILLIS per wait ! long nt = nanos - (System.nanoTime() - startTime); ! if (nt <= 0L) ! break; ! ms = nt / 1000000; ! if (ms > JOIN_TIMEOUT_MILLIS) { ! ms = JOIN_TIMEOUT_MILLIS; ! ns = 0; ! } ! else ! ns = (int) (nt % 1000000); ! } ! joinMe.internalAwaitDone(ms, ns); ! } ! if (joinMe.status < 0) ! break; ! } ! helpMaintainParallelism(); ! } ! if (!running) { ! int c; do {} while (!UNSAFE.compareAndSwapInt (this, workerCountsOffset, c = workerCounts, c + ONE_RUNNING)); } } /** * Same idea as awaitJoin, but no helping, retries, or timeouts. */ final void awaitBlocker(ManagedBlocker blocker) throws InterruptedException { while (!blocker.isReleasable()) { int wc = workerCounts; ! if ((wc & RUNNING_COUNT_MASK) == 0) ! helpMaintainParallelism(); ! else if (UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc, wc - ONE_RUNNING)) { try { while (!blocker.isReleasable()) { long h = eventWaiters; if (h != 0L &&
*** 1127,1142 **** startTerminating(); // Finish now if all threads terminated; else in some subsequent call if ((workerCounts >>> TOTAL_COUNT_SHIFT) == 0) { advanceRunLevel(TERMINATED); ! termination.arrive(); } return true; } - /** * Actions on transition to TERMINATING * * Runs up to four passes through workers: (0) shutting down each * (without waking up if parked) to quickly spread notifications --- 1172,1186 ---- startTerminating(); // Finish now if all threads terminated; else in some subsequent call if ((workerCounts >>> TOTAL_COUNT_SHIFT) == 0) { advanceRunLevel(TERMINATED); ! termination.forceTermination(); } return true; } /** * Actions on transition to TERMINATING * * Runs up to four passes through workers: (0) shutting down each * (without waking up if parked) to quickly spread notifications
*** 1323,1343 **** } // Execution methods /** ! * Common code for execute, invoke and submit */ private <T> void doSubmit(ForkJoinTask<T> task) { - if (task == null) - throw new NullPointerException(); - if (runState >= SHUTDOWN) - throw new RejectedExecutionException(); submissionQueue.offer(task); int c; // try to increment event count -- CAS failure OK UNSAFE.compareAndSwapInt(this, eventCountOffset, c = eventCount, c+1); ! helpMaintainParallelism(); // create, start, or resume some workers } /** * Performs the given task, returning its result upon completion. * --- 1367,1383 ---- } // Execution methods /** ! * Submits task and creates, starts, or resumes some workers if necessary */ private <T> void doSubmit(ForkJoinTask<T> task) { submissionQueue.offer(task); int c; // try to increment event count -- CAS failure OK UNSAFE.compareAndSwapInt(this, eventCountOffset, c = eventCount, c+1); ! helpMaintainParallelism(); } /** * Performs the given task, returning its result upon completion. *
*** 1346,1369 **** * @throws NullPointerException if the task is null * @throws RejectedExecutionException if the task cannot be * scheduled for execution */ public <T> T invoke(ForkJoinTask<T> task) { doSubmit(task); return task.join(); } /** * Arranges for (asynchronous) execution of the given task. * * @param task the task * @throws NullPointerException if the task is null * @throws RejectedExecutionException if the task cannot be * scheduled for execution */ public void execute(ForkJoinTask<?> task) { ! doSubmit(task); } // AbstractExecutorService methods /** --- 1386,1436 ---- * @throws NullPointerException if the task is null * @throws RejectedExecutionException if the task cannot be * scheduled for execution */ public <T> T invoke(ForkJoinTask<T> task) { + if (task == null) + throw new NullPointerException(); + if (runState >= SHUTDOWN) + throw new RejectedExecutionException(); + Thread t = Thread.currentThread(); + if ((t instanceof ForkJoinWorkerThread) && + ((ForkJoinWorkerThread)t).pool == this) + return task.invoke(); // bypass submit if in same pool + else { doSubmit(task); return task.join(); } + } /** + * Unless terminating, forks task if within an ongoing FJ + * computation in the current pool, else submits as external task. + */ + private <T> void forkOrSubmit(ForkJoinTask<T> task) { + if (runState >= SHUTDOWN) + throw new RejectedExecutionException(); + Thread t = Thread.currentThread(); + if ((t instanceof ForkJoinWorkerThread) && + ((ForkJoinWorkerThread)t).pool == this) + task.fork(); + else + doSubmit(task); + } + + /** * Arranges for (asynchronous) execution of the given task. * * @param task the task * @throws NullPointerException if the task is null * @throws RejectedExecutionException if the task cannot be * scheduled for execution */ public void execute(ForkJoinTask<?> task) { ! if (task == null) ! throw new NullPointerException(); ! forkOrSubmit(task); } // AbstractExecutorService methods /**
*** 1370,1385 **** * @throws NullPointerException if the task is null * @throws RejectedExecutionException if the task cannot be * scheduled for execution */ public void execute(Runnable task) { ForkJoinTask<?> job; if (task instanceof ForkJoinTask<?>) // avoid re-wrap job = (ForkJoinTask<?>) task; else job = ForkJoinTask.adapt(task, null); ! doSubmit(job); } /** * Submits a ForkJoinTask for execution. * --- 1437,1454 ---- * @throws NullPointerException if the task is null * @throws RejectedExecutionException if the task cannot be * scheduled for execution */ public void execute(Runnable task) { + if (task == null) + throw new NullPointerException(); ForkJoinTask<?> job; if (task instanceof ForkJoinTask<?>) // avoid re-wrap job = (ForkJoinTask<?>) task; else job = ForkJoinTask.adapt(task, null); ! forkOrSubmit(job); } /** * Submits a ForkJoinTask for execution. *
*** 1388,1435 **** * @throws NullPointerException if the task is null * @throws RejectedExecutionException if the task cannot be * scheduled for execution */ public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) { ! doSubmit(task); return task; } /** * @throws NullPointerException if the task is null * @throws RejectedExecutionException if the task cannot be * scheduled for execution */ public <T> ForkJoinTask<T> submit(Callable<T> task) { ForkJoinTask<T> job = ForkJoinTask.adapt(task); ! doSubmit(job); return job; } /** * @throws NullPointerException if the task is null * @throws RejectedExecutionException if the task cannot be * scheduled for execution */ public <T> ForkJoinTask<T> submit(Runnable task, T result) { ForkJoinTask<T> job = ForkJoinTask.adapt(task, result); ! doSubmit(job); return job; } /** * @throws NullPointerException if the task is null * @throws RejectedExecutionException if the task cannot be * scheduled for execution */ public ForkJoinTask<?> submit(Runnable task) { ForkJoinTask<?> job; if (task instanceof ForkJoinTask<?>) // avoid re-wrap job = (ForkJoinTask<?>) task; else job = ForkJoinTask.adapt(task, null); ! doSubmit(job); return job; } /** * @throws NullPointerException {@inheritDoc} --- 1457,1512 ---- * @throws NullPointerException if the task is null * @throws RejectedExecutionException if the task cannot be * scheduled for execution */ public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) { ! if (task == null) ! throw new NullPointerException(); ! forkOrSubmit(task); return task; } /** * @throws NullPointerException if the task is null * @throws RejectedExecutionException if the task cannot be * scheduled for execution */ public <T> ForkJoinTask<T> submit(Callable<T> task) { + if (task == null) + throw new NullPointerException(); ForkJoinTask<T> job = ForkJoinTask.adapt(task); ! forkOrSubmit(job); return job; } /** * @throws NullPointerException if the task is null * @throws RejectedExecutionException if the task cannot be * scheduled for execution */ public <T> ForkJoinTask<T> submit(Runnable task, T result) { + if (task == null) + throw new NullPointerException(); ForkJoinTask<T> job = ForkJoinTask.adapt(task, result); ! forkOrSubmit(job); return job; } /** * @throws NullPointerException if the task is null * @throws RejectedExecutionException if the task cannot be * scheduled for execution */ public ForkJoinTask<?> submit(Runnable task) { + if (task == null) + throw new NullPointerException(); ForkJoinTask<?> job; if (task instanceof ForkJoinTask<?>) // avoid re-wrap job = (ForkJoinTask<?>) task; else job = ForkJoinTask.adapt(task, null); ! forkOrSubmit(job); return job; } /** * @throws NullPointerException {@inheritDoc}
*** 1723,1734 **** /** * Returns {@code true} if the process of termination has * commenced but not yet completed. This method may be useful for * debugging. A return of {@code true} reported a sufficient * period after shutdown may indicate that submitted tasks have ! * ignored or suppressed interruption, causing this executor not ! * to properly terminate. * * @return {@code true} if terminating but not yet terminated */ public boolean isTerminating() { return (runState & (TERMINATING|TERMINATED)) == TERMINATING; --- 1800,1814 ---- /** * Returns {@code true} if the process of termination has * commenced but not yet completed. This method may be useful for * debugging. A return of {@code true} reported a sufficient * period after shutdown may indicate that submitted tasks have ! * ignored or suppressed interruption, or are waiting for IO, ! * causing this executor not to properly terminate. (See the ! * advisory notes for class {@link ForkJoinTask} stating that ! * tasks should not normally entail blocking operations. But if ! * they do, they must abort them on interrupt.) * * @return {@code true} if terminating but not yet terminated */ public boolean isTerminating() { return (runState & (TERMINATING|TERMINATED)) == TERMINATING;
*** 1762,1775 **** * @throws InterruptedException if interrupted while waiting */ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { try { ! return termination.awaitAdvanceInterruptibly(0, timeout, unit) > 0; } catch (TimeoutException ex) { return false; } } /** * Interface for extending managed parallelism for tasks running * in {@link ForkJoinPool}s. --- 1842,1856 ---- * @throws InterruptedException if interrupted while waiting */ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { try { ! termination.awaitAdvanceInterruptibly(0, timeout, unit); } catch (TimeoutException ex) { return false; } + return true; } /** * Interface for extending managed parallelism for tasks running * in {@link ForkJoinPool}s.