< prev index next >

src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java

Print this page
8259800: timeout in tck test testForkJoin(ForkJoinPool8Test)
Reviewed-by: martin

*** 274,284 **** private static final VarHandle AUX; private int getAndBitwiseOrStatus(int v) { return (int)STATUS.getAndBitwiseOr(this, v); } private boolean casStatus(int c, int v) { ! return STATUS.weakCompareAndSet(this, c, v); } private boolean casAux(Aux c, Aux v) { return AUX.compareAndSet(this, c, v); } --- 274,284 ---- private static final VarHandle AUX; private int getAndBitwiseOrStatus(int v) { return (int)STATUS.getAndBitwiseOr(this, v); } private boolean casStatus(int c, int v) { ! return STATUS.compareAndSet(this, c, v); } private boolean casAux(Aux c, Aux v) { return AUX.compareAndSet(this, c, v); }
*** 294,381 **** } } } /** - * Possibly blocks until task is done or interrupted or timed out. - * - * @param interruptible true if wait can be cancelled by interrupt - * @param deadline if non-zero use timed waits and possibly timeout - * @param pool if nonnull pool to uncompensate after unblocking - * @return status on exit, or ABNORMAL if interrupted while waiting - */ - private int awaitDone(boolean interruptible, long deadline, - ForkJoinPool pool) { - int s; - boolean interrupted = false, queued = false, parked = false; - Aux node = null; - while ((s = status) >= 0) { - Aux a; long ns; - if (parked && Thread.interrupted()) { - if (interruptible) { - s = ABNORMAL; - break; - } - interrupted = true; - } - else if (queued) { - if (deadline != 0L) { - if ((ns = deadline - System.nanoTime()) <= 0L) - break; - LockSupport.parkNanos(ns); - } - else - LockSupport.park(); - parked = true; - } - else if (node != null) { - if ((a = aux) != null && a.ex != null) - Thread.onSpinWait(); // exception in progress - else if (queued = casAux(node.next = a, node)) - LockSupport.setCurrentBlocker(this); - } - else { - try { - node = new Aux(Thread.currentThread(), null); - } catch (Throwable ex) { // try to cancel if cannot create - casStatus(s, s | (DONE | ABNORMAL)); - } - } - } - if (pool != null) - pool.uncompensate(); - - if (queued) { - LockSupport.setCurrentBlocker(null); - if (s >= 0) { // cancellation similar to AbstractQueuedSynchronizer - outer: for (Aux a; (a = aux) != null && a.ex == null; ) { - for (Aux trail = null;;) { - Aux next = a.next; - if (a == node) { - if (trail != null) - trail.casNext(trail, next); - else if (casAux(a, next)) - break outer; // cannot be re-encountered - break; // restart - } else { - trail = a; - if ((a = next) == null) - break outer; - } - } - } - } - else { - signalWaiters(); // help clean or signal - if (interrupted) - Thread.currentThread().interrupt(); - } - } - return s; - } - - /** * Sets DONE status and wakes up threads waiting to join this task. * @return status on exit */ private int setDone() { int s = getAndBitwiseOrStatus(DONE) | DONE; --- 294,303 ----
*** 461,516 **** /** * Helps and/or waits for completion from join, get, or invoke; * called from either internal or external threads. * * @param ran true if task known to have been exec'd * @param interruptible true if park interruptibly when external * @param timed true if use timed wait * @param nanos if timed, timeout value * @return ABNORMAL if interrupted, else status on exit */ ! private int awaitJoin(boolean ran, boolean interruptible, boolean timed, long nanos) { ! boolean internal; ForkJoinPool p; ForkJoinPool.WorkQueue q; int s; ! Thread t; ForkJoinWorkerThread wt; ! if (internal = ((t = Thread.currentThread()) ! instanceof ForkJoinWorkerThread)) { ! p = (wt = (ForkJoinWorkerThread)t).pool; q = wt.workQueue; } else { p = ForkJoinPool.common; ! q = ForkJoinPool.commonQueue(); if (interruptible && Thread.interrupted()) return ABNORMAL; - } if ((s = status) < 0) return s; long deadline = 0L; if (timed) { if (nanos <= 0L) return 0; else if ((deadline = nanos + System.nanoTime()) == 0L) deadline = 1L; } ! ForkJoinPool uncompensate = null; if (q != null && p != null) { // try helping ! if ((!timed || p.isSaturated()) && ! ((this instanceof CountedCompleter) ? ! (s = p.helpComplete(this, q, internal)) < 0 : ! (q.tryRemove(this, internal) && (s = doExec()) < 0))) return s; if (internal) { ! if ((s = p.helpJoin(this, q)) < 0) return s; if (s == UNCOMPENSATE) ! uncompensate = p; ! interruptible = false; } } ! return awaitDone(interruptible, deadline, uncompensate); } /** * Cancels, ignoring any exceptions thrown by cancel. Cancel is * spec'ed not to throw any exceptions, but if it does anyway, we --- 383,519 ---- /** * Helps and/or waits for completion from join, get, or invoke; * called from either internal or external threads. * + * @param pool if nonnull, known submitted pool, else assumes current pool * @param ran true if task known to have been exec'd * @param interruptible true if park interruptibly when external * @param timed true if use timed wait * @param nanos if timed, timeout value * @return ABNORMAL if interrupted, else status on exit */ ! private int awaitDone(ForkJoinPool pool, boolean ran, ! boolean interruptible, boolean timed, long nanos) { ! ForkJoinPool p; boolean internal; int s; Thread t; ! ForkJoinPool.WorkQueue q = null; ! if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) { ! ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t; ! p = wt.pool; ! if (pool == null) ! pool = p; ! if (internal = (pool == p)) q = wt.workQueue; } else { + internal = false; p = ForkJoinPool.common; ! if (pool == null) ! pool = p; ! if (pool == p && p != null) ! q = p.externalQueue(); ! } if (interruptible && Thread.interrupted()) return ABNORMAL; if ((s = status) < 0) return s; long deadline = 0L; if (timed) { if (nanos <= 0L) return 0; else if ((deadline = nanos + System.nanoTime()) == 0L) deadline = 1L; } ! boolean uncompensate = false; if (q != null && p != null) { // try helping ! // help even in timed mode if pool has no parallelism ! boolean canHelp = !timed || (p.mode & SMASK) == 0; ! if (canHelp) { ! if ((this instanceof CountedCompleter) && ! (s = p.helpComplete(this, q, internal)) < 0) ! return s; ! if (!ran && ((!internal && q.externalTryUnpush(this)) || ! q.tryRemove(this, internal)) && (s = doExec()) < 0) return s; + } if (internal) { ! if ((s = p.helpJoin(this, q, canHelp)) < 0) return s; if (s == UNCOMPENSATE) ! uncompensate = true; ! } ! } ! // block until done or cancelled wait ! boolean interrupted = false, queued = false; ! boolean parked = false, fail = false; ! Aux node = null; ! while ((s = status) >= 0) { ! Aux a; long ns; ! if (fail || (fail = (pool != null && pool.mode < 0))) ! casStatus(s, s | (DONE | ABNORMAL)); // try to cancel ! else if (parked && Thread.interrupted()) { ! if (interruptible) { ! s = ABNORMAL; ! break; ! } ! interrupted = true; ! } ! else if (queued) { ! if (deadline != 0L) { ! if ((ns = deadline - System.nanoTime()) <= 0L) ! break; ! LockSupport.parkNanos(ns); ! } ! else ! LockSupport.park(); ! parked = true; ! } ! else if (node != null) { ! if ((a = aux) != null && a.ex != null) ! Thread.onSpinWait(); // exception in progress ! else if (queued = casAux(node.next = a, node)) ! LockSupport.setCurrentBlocker(this); ! } ! else { ! try { ! node = new Aux(Thread.currentThread(), null); ! } catch (Throwable ex) { // cannot create ! fail = true; } } ! } ! if (pool != null && uncompensate) ! pool.uncompensate(); ! ! if (queued) { ! LockSupport.setCurrentBlocker(null); ! if (s >= 0) { // cancellation similar to AbstractQueuedSynchronizer ! outer: for (Aux a; (a = aux) != null && a.ex == null; ) { ! for (Aux trail = null;;) { ! Aux next = a.next; ! if (a == node) { ! if (trail != null) ! trail.casNext(trail, next); ! else if (casAux(a, next)) ! break outer; // cannot be re-encountered ! break; // restart ! } else { ! trail = a; ! if ((a = next) == null) ! break outer; ! } ! } ! } ! } ! else { ! signalWaiters(); // help clean or signal ! if (interrupted) ! Thread.currentThread().interrupt(); ! } ! } ! return s; } /** * Cancels, ignoring any exceptions thrown by cancel. Cancel is * spec'ed not to throw any exceptions, but if it does anyway, we
*** 662,672 **** * @return the computed result */ public final V join() { int s; if ((s = status) >= 0) ! s = awaitJoin(false, false, false, 0L); if ((s & ABNORMAL) != 0) reportException(s); return getRawResult(); } --- 665,675 ---- * @return the computed result */ public final V join() { int s; if ((s = status) >= 0) ! s = awaitDone(null, false, false, false, 0L); if ((s & ABNORMAL) != 0) reportException(s); return getRawResult(); }
*** 679,689 **** * @return the computed result */ public final V invoke() { int s; if ((s = doExec()) >= 0) ! s = awaitJoin(true, false, false, 0L); if ((s & ABNORMAL) != 0) reportException(s); return getRawResult(); } --- 682,692 ---- * @return the computed result */ public final V invoke() { int s; if ((s = doExec()) >= 0) ! s = awaitDone(null, true, false, false, 0L); if ((s & ABNORMAL) != 0) reportException(s); return getRawResult(); }
*** 708,723 **** int s1, s2; if (t1 == null || t2 == null) throw new NullPointerException(); t2.fork(); if ((s1 = t1.doExec()) >= 0) ! s1 = t1.awaitJoin(true, false, false, 0L); if ((s1 & ABNORMAL) != 0) { cancelIgnoringExceptions(t2); t1.reportException(s1); } ! else if (((s2 = t2.awaitJoin(false, false, false, 0L)) & ABNORMAL) != 0) t2.reportException(s2); } /** * Forks the given tasks, returning when {@code isDone} holds for --- 711,726 ---- int s1, s2; if (t1 == null || t2 == null) throw new NullPointerException(); t2.fork(); if ((s1 = t1.doExec()) >= 0) ! s1 = t1.awaitDone(null, true, false, false, 0L); if ((s1 & ABNORMAL) != 0) { cancelIgnoringExceptions(t2); t1.reportException(s1); } ! else if (((s2 = t2.awaitDone(null, false, false, false, 0L)) & ABNORMAL) != 0) t2.reportException(s2); } /** * Forks the given tasks, returning when {@code isDone} holds for
*** 744,754 **** break; } if (i == 0) { int s; if ((s = t.doExec()) >= 0) ! s = t.awaitJoin(true, false, false, 0L); if ((s & ABNORMAL) != 0) ex = t.getException(s); break; } t.fork(); --- 747,757 ---- break; } if (i == 0) { int s; if ((s = t.doExec()) >= 0) ! s = t.awaitDone(null, true, false, false, 0L); if ((s & ABNORMAL) != 0) ex = t.getException(s); break; } t.fork();
*** 757,767 **** for (int i = 1; i <= last; ++i) { ForkJoinTask<?> t; if ((t = tasks[i]) != null) { int s; if ((s = t.status) >= 0) ! s = t.awaitJoin(false, false, false, 0L); if ((s & ABNORMAL) != 0 && (ex = t.getException(s)) != null) break; } } } --- 760,770 ---- for (int i = 1; i <= last; ++i) { ForkJoinTask<?> t; if ((t = tasks[i]) != null) { int s; if ((s = t.status) >= 0) ! s = t.awaitDone(null, false, false, false, 0L); if ((s & ABNORMAL) != 0 && (ex = t.getException(s)) != null) break; } } }
*** 807,817 **** break; } if (i == 0) { int s; if ((s = t.doExec()) >= 0) ! s = t.awaitJoin(true, false, false, 0L); if ((s & ABNORMAL) != 0) ex = t.getException(s); break; } t.fork(); --- 810,820 ---- break; } if (i == 0) { int s; if ((s = t.doExec()) >= 0) ! s = t.awaitDone(null, true, false, false, 0L); if ((s & ABNORMAL) != 0) ex = t.getException(s); break; } t.fork();
*** 820,830 **** for (int i = 1; i <= last; ++i) { ForkJoinTask<?> t; if ((t = ts.get(i)) != null) { int s; if ((s = t.status) >= 0) ! s = t.awaitJoin(false, false, false, 0L); if ((s & ABNORMAL) != 0 && (ex = t.getException(s)) != null) break; } } } --- 823,833 ---- for (int i = 1; i <= last; ++i) { ForkJoinTask<?> t; if ((t = ts.get(i)) != null) { int s; if ((s = t.status) >= 0) ! s = t.awaitDone(null, false, false, false, 0L); if ((s & ABNORMAL) != 0 && (ex = t.getException(s)) != null) break; } } }
*** 971,982 **** * exception * @throws InterruptedException if the current thread is not a * member of a ForkJoinPool and was interrupted while waiting */ public final V get() throws InterruptedException, ExecutionException { ! int s; ! if (((s = awaitJoin(false, true, false, 0L)) & ABNORMAL) != 0) reportExecutionException(s); return getRawResult(); } /** --- 974,985 ---- * exception * @throws InterruptedException if the current thread is not a * member of a ForkJoinPool and was interrupted while waiting */ public final V get() throws InterruptedException, ExecutionException { ! int s = awaitDone(null, false, true, false, 0L); ! if ((s & ABNORMAL) != 0) reportExecutionException(s); return getRawResult(); } /**
*** 993,1005 **** * member of a ForkJoinPool and was interrupted while waiting * @throws TimeoutException if the wait timed out */ public final V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { ! int s; ! if ((s = awaitJoin(false, true, true, unit.toNanos(timeout))) >= 0 || ! (s & ABNORMAL) != 0) reportExecutionException(s); return getRawResult(); } /** --- 996,1008 ---- * member of a ForkJoinPool and was interrupted while waiting * @throws TimeoutException if the wait timed out */ public final V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { ! long nanos = unit.toNanos(timeout); ! int s = awaitDone(null, false, true, true, nanos); ! if (s >= 0 || (s & ABNORMAL) != 0) reportExecutionException(s); return getRawResult(); } /**
*** 1008,1028 **** * collections of tasks when some have been cancelled or otherwise * known to have aborted. */ public final void quietlyJoin() { if (status >= 0) ! awaitJoin(false, false, false, 0L); } /** * Commences performing this task and awaits its completion if * necessary, without returning its result or throwing its * exception. */ public final void quietlyInvoke() { if (doExec() >= 0) ! awaitJoin(true, false, false, 0L); } /** * Possibly executes tasks until the pool hosting the current task * {@linkplain ForkJoinPool#isQuiescent is quiescent}. This --- 1011,1062 ---- * collections of tasks when some have been cancelled or otherwise * known to have aborted. */ public final void quietlyJoin() { if (status >= 0) ! awaitDone(null, false, false, false, 0L); } + /** * Commences performing this task and awaits its completion if * necessary, without returning its result or throwing its * exception. */ public final void quietlyInvoke() { if (doExec() >= 0) ! awaitDone(null, true, false, false, 0L); ! } ! ! // Versions of join/get for pool.invoke* methods that use external, ! // possibly-non-commonPool submits ! ! final void awaitPoolInvoke(ForkJoinPool pool) { ! awaitDone(pool, false, false, false, 0L); ! } ! final void awaitPoolInvoke(ForkJoinPool pool, long nanos) { ! awaitDone(pool, false, true, true, nanos); ! } ! final V joinForPoolInvoke(ForkJoinPool pool) { ! int s = awaitDone(pool, false, false, false, 0L); ! if ((s & ABNORMAL) != 0) ! reportException(s); ! return getRawResult(); ! } ! final V getForPoolInvoke(ForkJoinPool pool) ! throws InterruptedException, ExecutionException { ! int s = awaitDone(pool, false, true, false, 0L); ! if ((s & ABNORMAL) != 0) ! reportExecutionException(s); ! return getRawResult(); ! } ! final V getForPoolInvoke(ForkJoinPool pool, long nanos) ! throws InterruptedException, ExecutionException, TimeoutException { ! int s = awaitDone(pool, false, true, true, nanos); ! if (s >= 0 || (s & ABNORMAL) != 0) ! reportExecutionException(s); ! return getRawResult(); } /** * Possibly executes tasks until the pool hosting the current task * {@linkplain ForkJoinPool#isQuiescent is quiescent}. This
< prev index next >