< prev index next >

src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java

Print this page
8259800: timeout in tck test testForkJoin(ForkJoinPool8Test)
Reviewed-by: martin

@@ -1044,17 +1044,26 @@
         /**
          * Locking version of tryUnpush.
          */
         final boolean externalTryUnpush(ForkJoinTask<?> task) {
             boolean taken = false;
+            for (;;) {
             int s = top, cap, k; ForkJoinTask<?>[] a;
-            if ((a = array) != null && (cap = a.length) > 0 &&
-                a[k = (cap - 1) & (s - 1)] == task && tryLock()) {
-                if (top == s && array == a &&
-                    (taken = casSlotToNull(a, k, task)))
+                if ((a = array) == null || (cap = a.length) <= 0 ||
+                    a[k = (cap - 1) & (s - 1)] != task)
+                    break;
+                if (tryLock()) {
+                    if (top == s && array == a) {
+                        if (taken = casSlotToNull(a, k, task)) {
                     top = s - 1;
-                source = 0; // release lock
+                            source = 0;
+                            break;
+                        }
+                    }
+                    source = 0; // release lock for retry
+                }
+                Thread.yield(); // trylock failure
             }
             return taken;
         }
 
         /**

@@ -1192,19 +1201,20 @@
                             if (top == p && array == a &&
                                 (taken = casSlotToNull(a, k, t)))
                                 top = s;
                             source = 0;
                         }
+                        if (taken)
+                            t.doExec();
+                        else if (!owned)
+                            Thread.yield(); // tryLock failure
                         break;
                     }
                     else if ((f = f.completer) == null)
                         break;
                 }
-                if (!taken)
-                    break;
-                t.doExec();
-                if (limit != 0 && --limit == 0)
+                if (taken && limit != 0 && --limit == 0)
                     break;
             }
             return status;
         }
 

@@ -1584,11 +1594,11 @@
      * See above for explanation.
      *
      * @param w caller's WorkQueue (may be null on failed initialization)
      */
     final void runWorker(WorkQueue w) {
-        if (w != null) {                        // skip on failed init
+        if (mode >= 0 && w != null) {           // skip on failed init
             w.config |= SRC;                    // mark as valid source
             int r = w.stackPred, src = 0;       // use seed from registerWorker
             do {
                 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
             } while ((src = scan(w, src, r)) >= 0 ||

@@ -1709,26 +1719,10 @@
     }
 
     // Utilities used by ForkJoinTask
 
     /**
-     * Returns true if all workers are busy, possibly creating one if allowed
-     */
-    final boolean isSaturated() {
-        int maxTotal = bounds >>> SWIDTH;
-        for (long c;;) {
-            if (((int)(c = ctl) & ~UNSIGNALLED) != 0)
-                return false;
-            if ((short)(c >>> TC_SHIFT) >= maxTotal)
-                return true;
-            long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK);
-            if (compareAndSetCtl(c, nc))
-                return !createWorker();
-        }
-    }
-
-    /**
      * Returns true if can start terminating if enabled, or already terminated
      */
     final boolean canStop() {
         outer: for (long oldSum = 0L;;) { // repeat until stable
             int md; WorkQueue[] qs;  long c;

@@ -1763,17 +1757,20 @@
      * @param c incoming ctl value
      * @return UNCOMPENSATE: block then adjust, 0: block, -1 : retry
      */
     private int tryCompensate(long c) {
         Predicate<? super ForkJoinPool> sat;
-        int b = bounds; // counts are signed; centered at parallelism level == 0
+        int md = mode, b = bounds;
+        // counts are signed; centered at parallelism level == 0
         int minActive = (short)(b & SMASK),
             maxTotal  = b >>> SWIDTH,
             active    = (int)(c >> RC_SHIFT),
             total     = (short)(c >>> TC_SHIFT),
             sp        = (int)c & ~UNSIGNALLED;
-        if (total >= 0) {
+        if ((md & SMASK) == 0)
+            return 0;                  // cannot compensate if parallelism zero
+        else if (total >= 0) {
             if (sp != 0) {                        // activate idle worker
                 WorkQueue[] qs; int n; WorkQueue v;
                 if ((qs = queues) != null && (n = qs.length) > 0 &&
                     (v = qs[sp & (n - 1)]) != null) {
                     Thread vt = v.owner;

@@ -1817,13 +1814,14 @@
      * queues for a task produced by one of w's stealers; returning
      * compensated blocking sentinel if none are found.
      *
      * @param task the task
      * @param w caller's WorkQueue
+     * @param canHelp if false, compensate only
      * @return task status on exit, or UNCOMPENSATE for compensated blocking
      */
-    final int helpJoin(ForkJoinTask<?> task, WorkQueue w) {
+    final int helpJoin(ForkJoinTask<?> task, WorkQueue w, boolean canHelp) {
         int s = 0;
         if (task != null && w != null) {
             int wsrc = w.source, wid = w.config & SMASK, r = wid + 2;
             boolean scan = true;
             long c = 0L;                          // track ctl stability

@@ -1834,11 +1832,11 @@
                     if (mode < 0)
                         ForkJoinTask.cancelIgnoringExceptions(task);
                     else if (c == (c = ctl) && (s = tryCompensate(c)) >= 0)
                         break;                    // block
                 }
-                else {                            // scan for subtasks
+                else if (canHelp) {               // scan for subtasks
                     WorkQueue[] qs = queues;
                     int n = (qs == null) ? 0 : qs.length, m = n - 1;
                     for (int i = n; i > 0; i -= 2, r += 2) {
                         int j; WorkQueue q, x, y; ForkJoinTask<?>[] a;
                         if ((q = qs[j = r & m]) != null) {

@@ -2193,22 +2191,32 @@
                 (n = qs.length) > 0 && r != 0) ?
             qs[(n - 1) & (r << 1)] : null;
     }
 
     /**
+     * Returns queue for an external thread, if one exists
+     */
+    final WorkQueue externalQueue() {
+        WorkQueue[] qs;
+        int r = ThreadLocalRandom.getProbe(), n;
+        return ((qs = queues) != null && (n = qs.length) > 0 && r != 0) ?
+            qs[(n - 1) & (r << 1)] : null;
+    }
+
+    /**
      * If the given executor is a ForkJoinPool, poll and execute
      * AsynchronousCompletionTasks from worker's queue until none are
      * available or blocker is released.
      */
     static void helpAsyncBlocker(Executor e, ManagedBlocker blocker) {
         WorkQueue w = null; Thread t; ForkJoinWorkerThread wt;
         if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
             if ((wt = (ForkJoinWorkerThread)t).pool == e)
                 w = wt.workQueue;
         }
-        else if (e == common)
-            w = commonQueue();
+        else if (e instanceof ForkJoinPool)
+            w = ((ForkJoinPool)e).externalQueue();
         if (w != null)
             w.helpAsyncBlocker(blocker);
     }
 
     /**

@@ -2290,18 +2298,22 @@
         if ((md & STOP) == 0) {
             if (!now && !canStop())
                 return false;
             md = getAndBitwiseOrMode(STOP);
         }
-        for (int k = 0; k < 2; ++k) { // twice in case of lagging qs updates
-            for (ForkJoinTask<?> t; (t = pollScan(false)) != null; )
+        for (boolean rescan = true;;) { // repeat until no changes
+            boolean changed = false;
+            for (ForkJoinTask<?> t; (t = pollScan(false)) != null; ) {
+                changed = true;
                 ForkJoinTask.cancelIgnoringExceptions(t); // help cancel
+            }
             WorkQueue[] qs; int n; WorkQueue q; Thread thread;
             if ((qs = queues) != null && (n = qs.length) > 0) {
                 for (int j = 1; j < n; j += 2) { // unblock other workers
                     if ((q = qs[j]) != null && (thread = q.owner) != null &&
                         !thread.isInterrupted()) {
+                        changed = true;
                         try {
                             thread.interrupt();
                         } catch (Throwable ignore) {
                         }
                     }

@@ -2315,10 +2327,16 @@
                 lock.lock();
                 if ((cond = termination) != null)
                     cond.signalAll();
                 lock.unlock();
             }
+            if (changed)
+                rescan = true;
+            else if (rescan)
+                rescan = false;
+            else
+                break;
         }
         return true;
     }
 
     // Exported methods

@@ -2538,18 +2556,20 @@
             if (pp != null)
                 parallelism = Integer.parseInt(pp);
         } catch (Exception ignore) {
         }
         int p = this.mode = Math.min(Math.max(parallelism, 0), MAX_CAP);
+        int maxSpares = (p == 0) ? 0 : COMMON_MAX_SPARES;
+        int bnds = ((1 - p) & SMASK) | (maxSpares << SWIDTH);
         int size = 1 << (33 - Integer.numberOfLeadingZeros(p > 0 ? p - 1 : 1));
         this.factory = (fac != null) ? fac :
             new DefaultCommonPoolForkJoinWorkerThreadFactory();
         this.ueh = handler;
         this.keepAlive = DEFAULT_KEEPALIVE;
         this.saturate = null;
         this.workerNamePrefix = null;
-        this.bounds = ((1 - p) & SMASK) | (COMMON_MAX_SPARES << SWIDTH);
+        this.bounds = bnds;
         this.ctl = ((((long)(-p) << TC_SHIFT) & TC_MASK) |
                     (((long)(-p) << RC_SHIFT) & RC_MASK));
         this.queues = new WorkQueue[size];
         this.registrationLock = new ReentrantLock();
     }

@@ -2591,11 +2611,11 @@
      * @throws RejectedExecutionException if the task cannot be
      *         scheduled for execution
      */
     public <T> T invoke(ForkJoinTask<T> task) {
         externalSubmit(task);
-        return task.join();
+        return task.joinForPoolInvoke(this);
     }
 
     /**
      * Arranges for (asynchronous) execution of the given task.
      *

@@ -2683,11 +2703,11 @@
                     new ForkJoinTask.AdaptedInterruptibleCallable<T>(t);
                 futures.add(f);
                 externalSubmit(f);
             }
             for (int i = futures.size() - 1; i >= 0; --i)
-                ((ForkJoinTask<?>)futures.get(i)).quietlyJoin();
+                ((ForkJoinTask<?>)futures.get(i)).awaitPoolInvoke(this);
             return futures;
         } catch (Throwable t) {
             for (Future<T> e : futures)
                 ForkJoinTask.cancelIgnoringExceptions(e);
             throw t;

@@ -2713,15 +2733,11 @@
                 Future<T> f = futures.get(i);
                 if (!f.isDone()) {
                     if (timedOut)
                         ForkJoinTask.cancelIgnoringExceptions(f);
                     else {
-                        try {
-                            f.get(ns, TimeUnit.NANOSECONDS);
-                        } catch (CancellationException | TimeoutException |
-                                 ExecutionException ok) {
-                        }
+                        ((ForkJoinTask<T>)f).awaitPoolInvoke(this, ns);
                         if ((ns = nanos - (System.nanoTime() - startTime)) < 0L)
                             timedOut = true;
                     }
                 }
             }

@@ -2744,15 +2760,20 @@
             pool = p;
             count = new AtomicInteger(n);
         }
         final void tryComplete(Callable<E> c) { // called by InvokeAnyTasks
             Throwable ex = null;
-            boolean failed = (c == null || isCancelled() ||
-                              (pool != null && pool.mode < 0));
-            if (!failed && !isDone()) {
+            boolean failed;
+            if (c == null || Thread.interrupted() ||
+                (pool != null && pool.mode < 0))
+                failed = true;
+            else if (isDone())
+                failed = false;
+            else {
                 try {
                     complete(c.call());
+                    failed = false;
                 } catch (Throwable tx) {
                     ex = tx;
                     failed = true;
                 }
             }

@@ -2815,11 +2836,11 @@
                 fs.add(f);
                 externalSubmit(f);
                 if (root.isDone())
                     break;
             }
-            return root.get();
+            return root.getForPoolInvoke(this);
         } finally {
             for (InvokeAnyTask<T> f : fs)
                 ForkJoinTask.cancelIgnoringExceptions(f);
         }
     }

@@ -2842,11 +2863,11 @@
                 fs.add(f);
                 externalSubmit(f);
                 if (root.isDone())
                     break;
             }
-            return root.get(nanos, TimeUnit.NANOSECONDS);
+            return root.getForPoolInvoke(this, nanos);
         } finally {
             for (InvokeAnyTask<T> f : fs)
                 ForkJoinTask.cancelIgnoringExceptions(f);
         }
     }
< prev index next >