< prev index next >
src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
Print this page
8259800: timeout in tck test testForkJoin(ForkJoinPool8Test)
Reviewed-by: martin
*** 1044,1060 ****
/**
* Locking version of tryUnpush.
*/
final boolean externalTryUnpush(ForkJoinTask<?> task) {
boolean taken = false;
int s = top, cap, k; ForkJoinTask<?>[] a;
! if ((a = array) != null && (cap = a.length) > 0 &&
! a[k = (cap - 1) & (s - 1)] == task && tryLock()) {
! if (top == s && array == a &&
! (taken = casSlotToNull(a, k, task)))
top = s - 1;
! source = 0; // release lock
}
return taken;
}
/**
--- 1044,1069 ----
/**
* Locking version of tryUnpush.
*/
final boolean externalTryUnpush(ForkJoinTask<?> task) {
boolean taken = false;
+ for (;;) {
int s = top, cap, k; ForkJoinTask<?>[] a;
! if ((a = array) == null || (cap = a.length) <= 0 ||
! a[k = (cap - 1) & (s - 1)] != task)
! break;
! if (tryLock()) {
! if (top == s && array == a) {
! if (taken = casSlotToNull(a, k, task)) {
top = s - 1;
! source = 0;
! break;
! }
! }
! source = 0; // release lock for retry
! }
! Thread.yield(); // trylock failure
}
return taken;
}
/**
*** 1192,1210 ****
if (top == p && array == a &&
(taken = casSlotToNull(a, k, t)))
top = s;
source = 0;
}
break;
}
else if ((f = f.completer) == null)
break;
}
! if (!taken)
! break;
! t.doExec();
! if (limit != 0 && --limit == 0)
break;
}
return status;
}
--- 1201,1220 ----
if (top == p && array == a &&
(taken = casSlotToNull(a, k, t)))
top = s;
source = 0;
}
+ if (taken)
+ t.doExec();
+ else if (!owned)
+ Thread.yield(); // tryLock failure
break;
}
else if ((f = f.completer) == null)
break;
}
! if (taken && limit != 0 && --limit == 0)
break;
}
return status;
}
*** 1584,1594 ****
* See above for explanation.
*
* @param w caller's WorkQueue (may be null on failed initialization)
*/
final void runWorker(WorkQueue w) {
! if (w != null) { // skip on failed init
w.config |= SRC; // mark as valid source
int r = w.stackPred, src = 0; // use seed from registerWorker
do {
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
} while ((src = scan(w, src, r)) >= 0 ||
--- 1594,1604 ----
* See above for explanation.
*
* @param w caller's WorkQueue (may be null on failed initialization)
*/
final void runWorker(WorkQueue w) {
! if (mode >= 0 && w != null) { // skip on failed init
w.config |= SRC; // mark as valid source
int r = w.stackPred, src = 0; // use seed from registerWorker
do {
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
} while ((src = scan(w, src, r)) >= 0 ||
*** 1709,1734 ****
}
// Utilities used by ForkJoinTask
/**
- * Returns true if all workers are busy, possibly creating one if allowed
- */
- final boolean isSaturated() {
- int maxTotal = bounds >>> SWIDTH;
- for (long c;;) {
- if (((int)(c = ctl) & ~UNSIGNALLED) != 0)
- return false;
- if ((short)(c >>> TC_SHIFT) >= maxTotal)
- return true;
- long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK);
- if (compareAndSetCtl(c, nc))
- return !createWorker();
- }
- }
-
- /**
* Returns true if can start terminating if enabled, or already terminated
*/
final boolean canStop() {
outer: for (long oldSum = 0L;;) { // repeat until stable
int md; WorkQueue[] qs; long c;
--- 1719,1728 ----
*** 1763,1779 ****
* @param c incoming ctl value
* @return UNCOMPENSATE: block then adjust, 0: block, -1 : retry
*/
private int tryCompensate(long c) {
Predicate<? super ForkJoinPool> sat;
! int b = bounds; // counts are signed; centered at parallelism level == 0
int minActive = (short)(b & SMASK),
maxTotal = b >>> SWIDTH,
active = (int)(c >> RC_SHIFT),
total = (short)(c >>> TC_SHIFT),
sp = (int)c & ~UNSIGNALLED;
! if (total >= 0) {
if (sp != 0) { // activate idle worker
WorkQueue[] qs; int n; WorkQueue v;
if ((qs = queues) != null && (n = qs.length) > 0 &&
(v = qs[sp & (n - 1)]) != null) {
Thread vt = v.owner;
--- 1757,1776 ----
* @param c incoming ctl value
* @return UNCOMPENSATE: block then adjust, 0: block, -1 : retry
*/
private int tryCompensate(long c) {
Predicate<? super ForkJoinPool> sat;
! int md = mode, b = bounds;
! // counts are signed; centered at parallelism level == 0
int minActive = (short)(b & SMASK),
maxTotal = b >>> SWIDTH,
active = (int)(c >> RC_SHIFT),
total = (short)(c >>> TC_SHIFT),
sp = (int)c & ~UNSIGNALLED;
! if ((md & SMASK) == 0)
! return 0; // cannot compensate if parallelism zero
! else if (total >= 0) {
if (sp != 0) { // activate idle worker
WorkQueue[] qs; int n; WorkQueue v;
if ((qs = queues) != null && (n = qs.length) > 0 &&
(v = qs[sp & (n - 1)]) != null) {
Thread vt = v.owner;
*** 1817,1829 ****
* queues for a task produced by one of w's stealers; returning
* compensated blocking sentinel if none are found.
*
* @param task the task
* @param w caller's WorkQueue
* @return task status on exit, or UNCOMPENSATE for compensated blocking
*/
! final int helpJoin(ForkJoinTask<?> task, WorkQueue w) {
int s = 0;
if (task != null && w != null) {
int wsrc = w.source, wid = w.config & SMASK, r = wid + 2;
boolean scan = true;
long c = 0L; // track ctl stability
--- 1814,1827 ----
* queues for a task produced by one of w's stealers; returning
* compensated blocking sentinel if none are found.
*
* @param task the task
* @param w caller's WorkQueue
+ * @param canHelp if false, compensate only
* @return task status on exit, or UNCOMPENSATE for compensated blocking
*/
! final int helpJoin(ForkJoinTask<?> task, WorkQueue w, boolean canHelp) {
int s = 0;
if (task != null && w != null) {
int wsrc = w.source, wid = w.config & SMASK, r = wid + 2;
boolean scan = true;
long c = 0L; // track ctl stability
*** 1834,1844 ****
if (mode < 0)
ForkJoinTask.cancelIgnoringExceptions(task);
else if (c == (c = ctl) && (s = tryCompensate(c)) >= 0)
break; // block
}
! else { // scan for subtasks
WorkQueue[] qs = queues;
int n = (qs == null) ? 0 : qs.length, m = n - 1;
for (int i = n; i > 0; i -= 2, r += 2) {
int j; WorkQueue q, x, y; ForkJoinTask<?>[] a;
if ((q = qs[j = r & m]) != null) {
--- 1832,1842 ----
if (mode < 0)
ForkJoinTask.cancelIgnoringExceptions(task);
else if (c == (c = ctl) && (s = tryCompensate(c)) >= 0)
break; // block
}
! else if (canHelp) { // scan for subtasks
WorkQueue[] qs = queues;
int n = (qs == null) ? 0 : qs.length, m = n - 1;
for (int i = n; i > 0; i -= 2, r += 2) {
int j; WorkQueue q, x, y; ForkJoinTask<?>[] a;
if ((q = qs[j = r & m]) != null) {
*** 2193,2214 ****
(n = qs.length) > 0 && r != 0) ?
qs[(n - 1) & (r << 1)] : null;
}
/**
* If the given executor is a ForkJoinPool, poll and execute
* AsynchronousCompletionTasks from worker's queue until none are
* available or blocker is released.
*/
static void helpAsyncBlocker(Executor e, ManagedBlocker blocker) {
WorkQueue w = null; Thread t; ForkJoinWorkerThread wt;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
if ((wt = (ForkJoinWorkerThread)t).pool == e)
w = wt.workQueue;
}
! else if (e == common)
! w = commonQueue();
if (w != null)
w.helpAsyncBlocker(blocker);
}
/**
--- 2191,2222 ----
(n = qs.length) > 0 && r != 0) ?
qs[(n - 1) & (r << 1)] : null;
}
/**
+ * Returns queue for an external thread, if one exists
+ */
+ final WorkQueue externalQueue() {
+ WorkQueue[] qs;
+ int r = ThreadLocalRandom.getProbe(), n;
+ return ((qs = queues) != null && (n = qs.length) > 0 && r != 0) ?
+ qs[(n - 1) & (r << 1)] : null;
+ }
+
+ /**
* If the given executor is a ForkJoinPool, poll and execute
* AsynchronousCompletionTasks from worker's queue until none are
* available or blocker is released.
*/
static void helpAsyncBlocker(Executor e, ManagedBlocker blocker) {
WorkQueue w = null; Thread t; ForkJoinWorkerThread wt;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
if ((wt = (ForkJoinWorkerThread)t).pool == e)
w = wt.workQueue;
}
! else if (e instanceof ForkJoinPool)
! w = ((ForkJoinPool)e).externalQueue();
if (w != null)
w.helpAsyncBlocker(blocker);
}
/**
*** 2290,2307 ****
if ((md & STOP) == 0) {
if (!now && !canStop())
return false;
md = getAndBitwiseOrMode(STOP);
}
! for (int k = 0; k < 2; ++k) { // twice in case of lagging qs updates
! for (ForkJoinTask<?> t; (t = pollScan(false)) != null; )
ForkJoinTask.cancelIgnoringExceptions(t); // help cancel
WorkQueue[] qs; int n; WorkQueue q; Thread thread;
if ((qs = queues) != null && (n = qs.length) > 0) {
for (int j = 1; j < n; j += 2) { // unblock other workers
if ((q = qs[j]) != null && (thread = q.owner) != null &&
!thread.isInterrupted()) {
try {
thread.interrupt();
} catch (Throwable ignore) {
}
}
--- 2298,2319 ----
if ((md & STOP) == 0) {
if (!now && !canStop())
return false;
md = getAndBitwiseOrMode(STOP);
}
! for (boolean rescan = true;;) { // repeat until no changes
! boolean changed = false;
! for (ForkJoinTask<?> t; (t = pollScan(false)) != null; ) {
! changed = true;
ForkJoinTask.cancelIgnoringExceptions(t); // help cancel
+ }
WorkQueue[] qs; int n; WorkQueue q; Thread thread;
if ((qs = queues) != null && (n = qs.length) > 0) {
for (int j = 1; j < n; j += 2) { // unblock other workers
if ((q = qs[j]) != null && (thread = q.owner) != null &&
!thread.isInterrupted()) {
+ changed = true;
try {
thread.interrupt();
} catch (Throwable ignore) {
}
}
*** 2315,2324 ****
--- 2327,2342 ----
lock.lock();
if ((cond = termination) != null)
cond.signalAll();
lock.unlock();
}
+ if (changed)
+ rescan = true;
+ else if (rescan)
+ rescan = false;
+ else
+ break;
}
return true;
}
// Exported methods
*** 2538,2555 ****
if (pp != null)
parallelism = Integer.parseInt(pp);
} catch (Exception ignore) {
}
int p = this.mode = Math.min(Math.max(parallelism, 0), MAX_CAP);
int size = 1 << (33 - Integer.numberOfLeadingZeros(p > 0 ? p - 1 : 1));
this.factory = (fac != null) ? fac :
new DefaultCommonPoolForkJoinWorkerThreadFactory();
this.ueh = handler;
this.keepAlive = DEFAULT_KEEPALIVE;
this.saturate = null;
this.workerNamePrefix = null;
! this.bounds = ((1 - p) & SMASK) | (COMMON_MAX_SPARES << SWIDTH);
this.ctl = ((((long)(-p) << TC_SHIFT) & TC_MASK) |
(((long)(-p) << RC_SHIFT) & RC_MASK));
this.queues = new WorkQueue[size];
this.registrationLock = new ReentrantLock();
}
--- 2556,2575 ----
if (pp != null)
parallelism = Integer.parseInt(pp);
} catch (Exception ignore) {
}
int p = this.mode = Math.min(Math.max(parallelism, 0), MAX_CAP);
+ int maxSpares = (p == 0) ? 0 : COMMON_MAX_SPARES;
+ int bnds = ((1 - p) & SMASK) | (maxSpares << SWIDTH);
int size = 1 << (33 - Integer.numberOfLeadingZeros(p > 0 ? p - 1 : 1));
this.factory = (fac != null) ? fac :
new DefaultCommonPoolForkJoinWorkerThreadFactory();
this.ueh = handler;
this.keepAlive = DEFAULT_KEEPALIVE;
this.saturate = null;
this.workerNamePrefix = null;
! this.bounds = bnds;
this.ctl = ((((long)(-p) << TC_SHIFT) & TC_MASK) |
(((long)(-p) << RC_SHIFT) & RC_MASK));
this.queues = new WorkQueue[size];
this.registrationLock = new ReentrantLock();
}
*** 2591,2601 ****
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
*/
public <T> T invoke(ForkJoinTask<T> task) {
externalSubmit(task);
! return task.join();
}
/**
* Arranges for (asynchronous) execution of the given task.
*
--- 2611,2621 ----
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
*/
public <T> T invoke(ForkJoinTask<T> task) {
externalSubmit(task);
! return task.joinForPoolInvoke(this);
}
/**
* Arranges for (asynchronous) execution of the given task.
*
*** 2683,2693 ****
new ForkJoinTask.AdaptedInterruptibleCallable<T>(t);
futures.add(f);
externalSubmit(f);
}
for (int i = futures.size() - 1; i >= 0; --i)
! ((ForkJoinTask<?>)futures.get(i)).quietlyJoin();
return futures;
} catch (Throwable t) {
for (Future<T> e : futures)
ForkJoinTask.cancelIgnoringExceptions(e);
throw t;
--- 2703,2713 ----
new ForkJoinTask.AdaptedInterruptibleCallable<T>(t);
futures.add(f);
externalSubmit(f);
}
for (int i = futures.size() - 1; i >= 0; --i)
! ((ForkJoinTask<?>)futures.get(i)).awaitPoolInvoke(this);
return futures;
} catch (Throwable t) {
for (Future<T> e : futures)
ForkJoinTask.cancelIgnoringExceptions(e);
throw t;
*** 2713,2727 ****
Future<T> f = futures.get(i);
if (!f.isDone()) {
if (timedOut)
ForkJoinTask.cancelIgnoringExceptions(f);
else {
! try {
! f.get(ns, TimeUnit.NANOSECONDS);
! } catch (CancellationException | TimeoutException |
! ExecutionException ok) {
! }
if ((ns = nanos - (System.nanoTime() - startTime)) < 0L)
timedOut = true;
}
}
}
--- 2733,2743 ----
Future<T> f = futures.get(i);
if (!f.isDone()) {
if (timedOut)
ForkJoinTask.cancelIgnoringExceptions(f);
else {
! ((ForkJoinTask<T>)f).awaitPoolInvoke(this, ns);
if ((ns = nanos - (System.nanoTime() - startTime)) < 0L)
timedOut = true;
}
}
}
*** 2744,2758 ****
pool = p;
count = new AtomicInteger(n);
}
final void tryComplete(Callable<E> c) { // called by InvokeAnyTasks
Throwable ex = null;
! boolean failed = (c == null || isCancelled() ||
! (pool != null && pool.mode < 0));
! if (!failed && !isDone()) {
try {
complete(c.call());
} catch (Throwable tx) {
ex = tx;
failed = true;
}
}
--- 2760,2779 ----
pool = p;
count = new AtomicInteger(n);
}
final void tryComplete(Callable<E> c) { // called by InvokeAnyTasks
Throwable ex = null;
! boolean failed;
! if (c == null || Thread.interrupted() ||
! (pool != null && pool.mode < 0))
! failed = true;
! else if (isDone())
! failed = false;
! else {
try {
complete(c.call());
+ failed = false;
} catch (Throwable tx) {
ex = tx;
failed = true;
}
}
*** 2815,2825 ****
fs.add(f);
externalSubmit(f);
if (root.isDone())
break;
}
! return root.get();
} finally {
for (InvokeAnyTask<T> f : fs)
ForkJoinTask.cancelIgnoringExceptions(f);
}
}
--- 2836,2846 ----
fs.add(f);
externalSubmit(f);
if (root.isDone())
break;
}
! return root.getForPoolInvoke(this);
} finally {
for (InvokeAnyTask<T> f : fs)
ForkJoinTask.cancelIgnoringExceptions(f);
}
}
*** 2842,2852 ****
fs.add(f);
externalSubmit(f);
if (root.isDone())
break;
}
! return root.get(nanos, TimeUnit.NANOSECONDS);
} finally {
for (InvokeAnyTask<T> f : fs)
ForkJoinTask.cancelIgnoringExceptions(f);
}
}
--- 2863,2873 ----
fs.add(f);
externalSubmit(f);
if (root.isDone())
break;
}
! return root.getForPoolInvoke(this, nanos);
} finally {
for (InvokeAnyTask<T> f : fs)
ForkJoinTask.cancelIgnoringExceptions(f);
}
}
< prev index next >