< prev index next >

src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java

Print this page
8259800: timeout in tck test testForkJoin(ForkJoinPool8Test)
Reviewed-by: martin

@@ -274,11 +274,11 @@
     private static final VarHandle AUX;
     private int getAndBitwiseOrStatus(int v) {
         return (int)STATUS.getAndBitwiseOr(this, v);
     }
     private boolean casStatus(int c, int v) {
-        return STATUS.weakCompareAndSet(this, c, v);
+        return STATUS.compareAndSet(this, c, v);
     }
     private boolean casAux(Aux c, Aux v) {
         return AUX.compareAndSet(this, c, v);
     }
 

@@ -294,88 +294,10 @@
             }
         }
     }
 
     /**
-     * Possibly blocks until task is done or interrupted or timed out.
-     *
-     * @param interruptible true if wait can be cancelled by interrupt
-     * @param deadline if non-zero use timed waits and possibly timeout
-     * @param pool if nonnull pool to uncompensate after unblocking
-     * @return status on exit, or ABNORMAL if interrupted while waiting
-     */
-    private int awaitDone(boolean interruptible, long deadline,
-                          ForkJoinPool pool) {
-        int s;
-        boolean interrupted = false, queued = false, parked = false;
-        Aux node = null;
-        while ((s = status) >= 0) {
-            Aux a; long ns;
-            if (parked && Thread.interrupted()) {
-                if (interruptible) {
-                    s = ABNORMAL;
-                    break;
-                }
-                interrupted = true;
-            }
-            else if (queued) {
-                if (deadline != 0L) {
-                    if ((ns = deadline - System.nanoTime()) <= 0L)
-                        break;
-                    LockSupport.parkNanos(ns);
-                }
-                else
-                    LockSupport.park();
-                parked = true;
-            }
-            else if (node != null) {
-                if ((a = aux) != null && a.ex != null)
-                    Thread.onSpinWait();     // exception in progress
-                else if (queued = casAux(node.next = a, node))
-                    LockSupport.setCurrentBlocker(this);
-            }
-            else {
-                try {
-                    node = new Aux(Thread.currentThread(), null);
-                } catch (Throwable ex) {     // try to cancel if cannot create
-                    casStatus(s, s | (DONE | ABNORMAL));
-                }
-            }
-        }
-        if (pool != null)
-            pool.uncompensate();
-
-        if (queued) {
-            LockSupport.setCurrentBlocker(null);
-            if (s >= 0) { // cancellation similar to AbstractQueuedSynchronizer
-                outer: for (Aux a; (a = aux) != null && a.ex == null; ) {
-                    for (Aux trail = null;;) {
-                        Aux next = a.next;
-                        if (a == node) {
-                            if (trail != null)
-                                trail.casNext(trail, next);
-                            else if (casAux(a, next))
-                                break outer; // cannot be re-encountered
-                            break;           // restart
-                        } else {
-                            trail = a;
-                            if ((a = next) == null)
-                                break outer;
-                        }
-                    }
-                }
-            }
-            else {
-                signalWaiters();             // help clean or signal
-                if (interrupted)
-                    Thread.currentThread().interrupt();
-            }
-        }
-        return s;
-    }
-
-    /**
      * Sets DONE status and wakes up threads waiting to join this task.
      * @return status on exit
      */
     private int setDone() {
         int s = getAndBitwiseOrStatus(DONE) | DONE;

@@ -461,56 +383,137 @@
 
     /**
      * Helps and/or waits for completion from join, get, or invoke;
      * called from either internal or external threads.
      *
+     * @param pool if nonnull, known submitted pool, else assumes current pool
      * @param ran true if task known to have been exec'd
      * @param interruptible true if park interruptibly when external
      * @param timed true if use timed wait
      * @param nanos if timed, timeout value
      * @return ABNORMAL if interrupted, else status on exit
      */
-    private int awaitJoin(boolean ran, boolean interruptible, boolean timed,
+    private int awaitDone(ForkJoinPool pool, boolean ran,
+                          boolean interruptible, boolean timed,
                           long nanos) {
-        boolean internal; ForkJoinPool p; ForkJoinPool.WorkQueue q; int s;
-        Thread t; ForkJoinWorkerThread wt;
-        if (internal = ((t = Thread.currentThread())
-                        instanceof ForkJoinWorkerThread)) {
-            p = (wt = (ForkJoinWorkerThread)t).pool;
+        ForkJoinPool p; boolean internal; int s; Thread t;
+        ForkJoinPool.WorkQueue q = null;
+        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
+            ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
+            p = wt.pool;
+            if (pool == null)
+                pool = p;
+            if (internal = (pool == p))
             q = wt.workQueue;
         }
         else {
+            internal = false;
             p = ForkJoinPool.common;
-            q = ForkJoinPool.commonQueue();
+            if (pool == null)
+                pool = p;
+            if (pool == p && p != null)
+                q = p.externalQueue();
+        }
             if (interruptible && Thread.interrupted())
                 return ABNORMAL;
-        }
         if ((s = status) < 0)
             return s;
         long deadline = 0L;
         if (timed) {
             if (nanos <= 0L)
                 return 0;
             else if ((deadline = nanos + System.nanoTime()) == 0L)
                 deadline = 1L;
         }
-        ForkJoinPool uncompensate = null;
+        boolean uncompensate = false;
         if (q != null && p != null) {            // try helping
-            if ((!timed || p.isSaturated()) &&
-                ((this instanceof CountedCompleter) ?
-                 (s = p.helpComplete(this, q, internal)) < 0 :
-                 (q.tryRemove(this, internal) && (s = doExec()) < 0)))
+            // help even in timed mode if pool has no parallelism
+            boolean canHelp = !timed || (p.mode & SMASK) == 0;
+            if (canHelp) {
+                if ((this instanceof CountedCompleter) &&
+                    (s = p.helpComplete(this, q, internal)) < 0)
+                    return s;
+                if (!ran && ((!internal && q.externalTryUnpush(this)) ||
+                             q.tryRemove(this, internal)) && (s = doExec()) < 0)
                 return s;
+            }
             if (internal) {
-                if ((s = p.helpJoin(this, q)) < 0)
+                if ((s = p.helpJoin(this, q, canHelp)) < 0)
                     return s;
                 if (s == UNCOMPENSATE)
-                    uncompensate = p;
-                interruptible = false;
+                    uncompensate = true;
+            }
+        }
+        // block until done or cancelled wait
+        boolean interrupted = false, queued = false;
+        boolean parked = false, fail = false;
+        Aux node = null;
+        while ((s = status) >= 0) {
+            Aux a; long ns;
+            if (fail || (fail = (pool != null && pool.mode < 0)))
+                casStatus(s, s | (DONE | ABNORMAL)); // try to cancel
+            else if (parked && Thread.interrupted()) {
+                if (interruptible) {
+                    s = ABNORMAL;
+                    break;
+                }
+                interrupted = true;
+            }
+            else if (queued) {
+                if (deadline != 0L) {
+                    if ((ns = deadline - System.nanoTime()) <= 0L)
+                        break;
+                    LockSupport.parkNanos(ns);
+                }
+                else
+                    LockSupport.park();
+                parked = true;
+            }
+            else if (node != null) {
+                if ((a = aux) != null && a.ex != null)
+                    Thread.onSpinWait();     // exception in progress
+                else if (queued = casAux(node.next = a, node))
+                    LockSupport.setCurrentBlocker(this);
+            }
+            else {
+                try {
+                    node = new Aux(Thread.currentThread(), null);
+                } catch (Throwable ex) {     // cannot create
+                    fail = true;
             }
         }
-        return awaitDone(interruptible, deadline, uncompensate);
+        }
+        if (pool != null && uncompensate)
+            pool.uncompensate();
+
+        if (queued) {
+            LockSupport.setCurrentBlocker(null);
+            if (s >= 0) { // cancellation similar to AbstractQueuedSynchronizer
+                outer: for (Aux a; (a = aux) != null && a.ex == null; ) {
+                    for (Aux trail = null;;) {
+                        Aux next = a.next;
+                        if (a == node) {
+                            if (trail != null)
+                                trail.casNext(trail, next);
+                            else if (casAux(a, next))
+                                break outer; // cannot be re-encountered
+                            break;           // restart
+                        } else {
+                            trail = a;
+                            if ((a = next) == null)
+                                break outer;
+                        }
+                    }
+                }
+            }
+            else {
+                signalWaiters();             // help clean or signal
+                if (interrupted)
+                    Thread.currentThread().interrupt();
+            }
+        }
+        return s;
     }
 
     /**
      * Cancels, ignoring any exceptions thrown by cancel.  Cancel is
      * spec'ed not to throw any exceptions, but if it does anyway, we

@@ -662,11 +665,11 @@
      * @return the computed result
      */
     public final V join() {
         int s;
         if ((s = status) >= 0)
-            s = awaitJoin(false, false, false, 0L);
+            s = awaitDone(null, false, false, false, 0L);
         if ((s & ABNORMAL) != 0)
             reportException(s);
         return getRawResult();
     }
 

@@ -679,11 +682,11 @@
      * @return the computed result
      */
     public final V invoke() {
         int s;
         if ((s = doExec()) >= 0)
-            s = awaitJoin(true, false, false, 0L);
+            s = awaitDone(null, true, false, false, 0L);
         if ((s & ABNORMAL) != 0)
             reportException(s);
         return getRawResult();
     }
 

@@ -708,16 +711,16 @@
         int s1, s2;
         if (t1 == null || t2 == null)
             throw new NullPointerException();
         t2.fork();
         if ((s1 = t1.doExec()) >= 0)
-            s1 = t1.awaitJoin(true, false, false, 0L);
+            s1 = t1.awaitDone(null, true, false, false, 0L);
         if ((s1 & ABNORMAL) != 0) {
             cancelIgnoringExceptions(t2);
             t1.reportException(s1);
         }
-        else if (((s2 = t2.awaitJoin(false, false, false, 0L)) & ABNORMAL) != 0)
+        else if (((s2 = t2.awaitDone(null, false, false, false, 0L)) & ABNORMAL) != 0)
             t2.reportException(s2);
     }
 
     /**
      * Forks the given tasks, returning when {@code isDone} holds for

@@ -744,11 +747,11 @@
                 break;
             }
             if (i == 0) {
                 int s;
                 if ((s = t.doExec()) >= 0)
-                    s = t.awaitJoin(true, false, false, 0L);
+                    s = t.awaitDone(null, true, false, false, 0L);
                 if ((s & ABNORMAL) != 0)
                     ex = t.getException(s);
                 break;
             }
             t.fork();

@@ -757,11 +760,11 @@
             for (int i = 1; i <= last; ++i) {
                 ForkJoinTask<?> t;
                 if ((t = tasks[i]) != null) {
                     int s;
                     if ((s = t.status) >= 0)
-                        s = t.awaitJoin(false, false, false, 0L);
+                        s = t.awaitDone(null, false, false, false, 0L);
                     if ((s & ABNORMAL) != 0 && (ex = t.getException(s)) != null)
                         break;
                 }
             }
         }

@@ -807,11 +810,11 @@
                 break;
             }
             if (i == 0) {
                 int s;
                 if ((s = t.doExec()) >= 0)
-                    s = t.awaitJoin(true, false, false, 0L);
+                    s = t.awaitDone(null, true, false, false, 0L);
                 if ((s & ABNORMAL) != 0)
                     ex = t.getException(s);
                 break;
             }
             t.fork();

@@ -820,11 +823,11 @@
             for (int i = 1; i <= last; ++i) {
                 ForkJoinTask<?> t;
                 if ((t = ts.get(i)) != null) {
                     int s;
                     if ((s = t.status) >= 0)
-                        s = t.awaitJoin(false, false, false, 0L);
+                        s = t.awaitDone(null, false, false, false, 0L);
                     if ((s & ABNORMAL) != 0 && (ex = t.getException(s)) != null)
                         break;
                 }
             }
         }

@@ -971,12 +974,12 @@
      * exception
      * @throws InterruptedException if the current thread is not a
      * member of a ForkJoinPool and was interrupted while waiting
      */
     public final V get() throws InterruptedException, ExecutionException {
-        int s;
-        if (((s = awaitJoin(false, true, false, 0L)) & ABNORMAL) != 0)
+        int s = awaitDone(null, false, true, false, 0L);
+        if ((s & ABNORMAL) != 0)
             reportExecutionException(s);
         return getRawResult();
     }
 
     /**

@@ -993,13 +996,13 @@
      * member of a ForkJoinPool and was interrupted while waiting
      * @throws TimeoutException if the wait timed out
      */
     public final V get(long timeout, TimeUnit unit)
         throws InterruptedException, ExecutionException, TimeoutException {
-        int s;
-        if ((s = awaitJoin(false, true, true, unit.toNanos(timeout))) >= 0 ||
-            (s & ABNORMAL) != 0)
+        long nanos = unit.toNanos(timeout);
+        int s = awaitDone(null, false, true, true, nanos);
+        if (s >= 0 || (s & ABNORMAL) != 0)
             reportExecutionException(s);
         return getRawResult();
     }
 
     /**

@@ -1008,21 +1011,52 @@
      * collections of tasks when some have been cancelled or otherwise
      * known to have aborted.
      */
     public final void quietlyJoin() {
         if (status >= 0)
-            awaitJoin(false, false, false, 0L);
+            awaitDone(null, false, false, false, 0L);
     }
 
+
     /**
      * Commences performing this task and awaits its completion if
      * necessary, without returning its result or throwing its
      * exception.
      */
     public final void quietlyInvoke() {
         if (doExec() >= 0)
-            awaitJoin(true, false, false, 0L);
+            awaitDone(null, true, false, false, 0L);
+    }
+
+    // Versions of join/get for pool.invoke* methods that use external,
+    // possibly-non-commonPool submits
+
+    final void awaitPoolInvoke(ForkJoinPool pool) {
+        awaitDone(pool, false, false, false, 0L);
+    }
+    final void awaitPoolInvoke(ForkJoinPool pool, long nanos) {
+        awaitDone(pool, false, true, true, nanos);
+    }
+    final V joinForPoolInvoke(ForkJoinPool pool) {
+        int s = awaitDone(pool, false, false, false, 0L);
+        if ((s & ABNORMAL) != 0)
+            reportException(s);
+        return getRawResult();
+    }
+    final V getForPoolInvoke(ForkJoinPool pool)
+        throws InterruptedException, ExecutionException {
+        int s = awaitDone(pool, false, true, false, 0L);
+        if ((s & ABNORMAL) != 0)
+            reportExecutionException(s);
+        return getRawResult();
+    }
+    final V getForPoolInvoke(ForkJoinPool pool, long nanos)
+        throws InterruptedException, ExecutionException, TimeoutException {
+        int s = awaitDone(pool, false, true, true, nanos);
+        if (s >= 0 || (s & ABNORMAL) != 0)
+            reportExecutionException(s);
+        return getRawResult();
     }
 
     /**
      * Possibly executes tasks until the pool hosting the current task
      * {@linkplain ForkJoinPool#isQuiescent is quiescent}.  This
< prev index next >