src/share/classes/java/util/concurrent/ForkJoinPool.java

Print this page

        

@@ -524,11 +524,11 @@
      * worker thread. (Bits 16-31 are unused.)
      */
     private volatile long eventWaiters;
 
     private static final int  EVENT_COUNT_SHIFT = 32;
-    private static final long WAITER_ID_MASK    = (1L << 16) - 1L;
+    private static final int WAITER_ID_MASK    = (1 << 16) - 1;
 
     /**
      * A counter for events that may wake up worker threads:
      *   - Submission of a new task to the pool
      *   - A worker pushing a task on an empty queue

@@ -613,22 +613,32 @@
 
     // Utilities for CASing fields. Note that most of these
     // are usually manually inlined by callers
 
     /**
-     * Increments running count part of workerCounts
+     * Increments running count part of workerCounts.
      */
     final void incrementRunningCount() {
         int c;
         do {} while (!UNSAFE.compareAndSwapInt(this, workerCountsOffset,
                                                c = workerCounts,
                                                c + ONE_RUNNING));
     }
 
     /**
-     * Tries to decrement running count unless already zero
+     * Tries to increment running count part of workerCounts.
      */
+    final boolean tryIncrementRunningCount() {
+        int c;
+        return UNSAFE.compareAndSwapInt(this, workerCountsOffset,
+                                        c = workerCounts,
+                                        c + ONE_RUNNING);
+    }
+
+    /**
+     * Tries to decrement running count unless already zero.
+     */
     final boolean tryDecrementRunningCount() {
         int wc = workerCounts;
         if ((wc & RUNNING_COUNT_MASK) == 0)
             return false;
         return UNSAFE.compareAndSwapInt(this, workerCountsOffset,

@@ -696,14 +706,15 @@
             int n = ws.length;
             if (k < 0 || k >= n || ws[k] != null) {
                 for (k = 0; k < n && ws[k] != null; ++k)
                     ;
                 if (k == n)
-                    ws = Arrays.copyOf(ws, n << 1);
+                    ws = workers = Arrays.copyOf(ws, n << 1);
             }
             ws[k] = w;
-            workers = ws; // volatile array write ensures slot visibility
+            int c = eventCount; // advance event count to ensure visibility
+            UNSAFE.compareAndSwapInt(this, eventCountOffset, c, c+1);
         } finally {
             lock.unlock();
         }
         return k;
     }

@@ -732,11 +743,11 @@
      *
      * @param w the worker
      */
     final void workerTerminated(ForkJoinWorkerThread w) {
         forgetWorker(w);
-        decrementWorkerCounts(w.isTrimmed()? 0 : ONE_RUNNING, ONE_TOTAL);
+        decrementWorkerCounts(w.isTrimmed() ? 0 : ONE_RUNNING, ONE_TOTAL);
         while (w.stealCount != 0) // collect final count
             tryAccumulateStealCount(w);
         tryTerminate(false);
     }
 

@@ -744,28 +755,27 @@
 
     /**
      * Releases workers blocked on a count not equal to current count.
      * Normally called after precheck that eventWaiters isn't zero to
      * avoid wasted array checks. Gives up upon a change in count or
-     * upon releasing two workers, letting others take over.
+     * upon releasing four workers, letting others take over.
      */
     private void releaseEventWaiters() {
         ForkJoinWorkerThread[] ws = workers;
         int n = ws.length;
         long h = eventWaiters;
         int ec = eventCount;
-        boolean releasedOne = false;
+        int releases = 4;
         ForkJoinWorkerThread w; int id;
-        while ((id = ((int)(h & WAITER_ID_MASK)) - 1) >= 0 &&
+        while ((id = (((int)h) & WAITER_ID_MASK) - 1) >= 0 &&
                (int)(h >>> EVENT_COUNT_SHIFT) != ec &&
                id < n && (w = ws[id]) != null) {
             if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
                                           h,  w.nextWaiter)) {
                 LockSupport.unpark(w);
-                if (releasedOne) // exit on second release
+                if (--releases == 0)
                     break;
-                releasedOne = true;
             }
             if (eventCount != ec)
                 break;
             h = eventWaiters;
         }

@@ -791,11 +801,11 @@
      */
     private void eventSync(ForkJoinWorkerThread w, int ec) {
         long nh = (((long)ec) << EVENT_COUNT_SHIFT) | ((long)(w.poolIndex+1));
         long h;
         while ((runState < SHUTDOWN || !tryTerminate(false)) &&
-               (((int)((h = eventWaiters) & WAITER_ID_MASK)) == 0 ||
+               (((int)(h = eventWaiters) & WAITER_ID_MASK) == 0 ||
                 (int)(h >>> EVENT_COUNT_SHIFT) == ec) &&
                eventCount == ec) {
             if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
                                           w.nextWaiter = h, nh)) {
                 awaitEvent(w, ec);

@@ -818,13 +828,13 @@
     private void awaitEvent(ForkJoinWorkerThread w, int ec) {
         while (eventCount == ec) {
             if (tryAccumulateStealCount(w)) { // transfer while idle
                 boolean untimed = (w.nextWaiter != 0L ||
                                    (workerCounts & RUNNING_COUNT_MASK) <= 1);
-                long startTime = untimed? 0 : System.nanoTime();
+                long startTime = untimed ? 0 : System.nanoTime();
                 Thread.interrupted();         // clear/ignore interrupt
-                if (eventCount != ec || w.isTerminating())
+                if (w.isTerminating() || eventCount != ec)
                     break;                    // recheck after clear
                 if (untimed)
                     LockSupport.park(w);
                 else {
                     LockSupport.parkNanos(w, SHRINK_RATE_NANOS);

@@ -858,11 +868,12 @@
         int n = ws.length;
         ForkJoinWorkerThread w;
         if ((sw = spareWaiters) != 0 &&
             (id = (sw & SPARE_ID_MASK) - 1) >= 0 &&
             id < n && (w = ws[id]) != null &&
-            (workerCounts & RUNNING_COUNT_MASK) < parallelism &&
+            (runState >= TERMINATING ||
+             (workerCounts & RUNNING_COUNT_MASK) < parallelism) &&
             spareWaiters == sw &&
             UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
                                      sw, w.nextSpare)) {
             int c; // increment running count before resume
             do {} while (!UNSAFE.compareAndSwapInt

@@ -912,18 +923,14 @@
                           ForkJoinWorkerThread))
                         UNSAFE.throwException(fail);
                     break;
                 }
                 w.start(recordWorker(w), ueh);
-                if ((workerCounts >>> TOTAL_COUNT_SHIFT) >= pc) {
-                    int c; // advance event count
-                    UNSAFE.compareAndSwapInt(this, eventCountOffset,
-                                             c = eventCount, c+1);
+                if ((workerCounts >>> TOTAL_COUNT_SHIFT) >= pc)
                     break; // add at most one unless total below target
                 }
             }
-        }
         if (eventWaiters != 0L)
             releaseEventWaiters();
     }
 
     /**

@@ -953,11 +960,11 @@
                                              sw, w.nextSpare))
                     shutdown = true;
             }
             else if ((h = eventWaiters) != 0L) {
                 long nh;
-                int id = ((int)(h & WAITER_ID_MASK)) - 1;
+                int id = (((int)h) & WAITER_ID_MASK) - 1;
                 if (id >= 0 && id < n && (w = ws[id]) != null &&
                     (nh = w.nextWaiter) != 0L && // keep at least one worker
                     UNSAFE.compareAndSwapLong(this, eventWaitersOffset, h, nh))
                     shutdown = true;
             }

@@ -1006,23 +1013,30 @@
             if (rs >= TERMINATING) { // propagate shutdown
                 w.shutdown();
                 break;
             }
             if ((inactivate || (active && (rs & ACTIVE_COUNT_MASK) >= pc)) &&
-                UNSAFE.compareAndSwapInt(this, runStateOffset, rs, rs - 1))
+                UNSAFE.compareAndSwapInt(this, runStateOffset, rs, --rs)) {
                 inactivate = active = w.active = false;
-            int wc = workerCounts;
+                if (rs == SHUTDOWN) {          // all inactive and shut down
+                    tryTerminate(false);
+                    continue;
+                }
+            }
+            int wc = workerCounts;             // try to suspend as spare
             if ((wc & RUNNING_COUNT_MASK) > pc) {
                 if (!(inactivate |= active) && // must inactivate to suspend
-                    workerCounts == wc &&      // try to suspend as spare
+                    workerCounts == wc &&
                     UNSAFE.compareAndSwapInt(this, workerCountsOffset,
                                              wc, wc - ONE_RUNNING))
                     w.suspendAsSpare();
             }
             else if ((wc >>> TOTAL_COUNT_SHIFT) < pc)
                 helpMaintainParallelism();     // not enough workers
-            else if (!ran) {
+            else if (ran)
+                break;
+            else {
                 long h = eventWaiters;
                 int ec = eventCount;
                 if (h != 0L && (int)(h >>> EVENT_COUNT_SHIFT) != ec)
                     releaseEventWaiters();     // release others before waiting
                 else if (ec != wec) {

@@ -1030,62 +1044,93 @@
                     break;
                 }
                 else if (!(inactivate |= active))
                     eventSync(w, wec);         // must inactivate before sync
             }
-            else
-                break;
         }
     }
 
     /**
      * Helps and/or blocks awaiting join of the given task.
      * See above for explanation.
      *
      * @param joinMe the task to join
      * @param worker the current worker thread
+     * @param timed true if wait should time out
+     * @param nanos timeout value if timed
      */
-    final void awaitJoin(ForkJoinTask<?> joinMe, ForkJoinWorkerThread worker) {
+    final void awaitJoin(ForkJoinTask<?> joinMe, ForkJoinWorkerThread worker,
+                         boolean timed, long nanos) {
+        long startTime = timed ? System.nanoTime() : 0L;
         int retries = 2 + (parallelism >> 2); // #helpJoins before blocking
+        boolean running = true;               // false when count decremented
         while (joinMe.status >= 0) {
-            int wc;
-            worker.helpJoinTask(joinMe);
+            if (runState >= TERMINATING) {
+                joinMe.cancelIgnoringExceptions();
+                break;
+            }
+            running = worker.helpJoinTask(joinMe, running);
             if (joinMe.status < 0)
                 break;
-            else if (retries > 0)
+            if (retries > 0) {
                 --retries;
-            else if (((wc = workerCounts) & RUNNING_COUNT_MASK) != 0 &&
-                     UNSAFE.compareAndSwapInt(this, workerCountsOffset,
-                                              wc, wc - ONE_RUNNING)) {
-                int stat, c; long h;
-                while ((stat = joinMe.status) >= 0 &&
-                       (h = eventWaiters) != 0L && // help release others
-                       (int)(h >>> EVENT_COUNT_SHIFT) != eventCount)
+                continue;
+            }
+            int wc = workerCounts;
+            if ((wc & RUNNING_COUNT_MASK) != 0) {
+                if (running) {
+                    if (!UNSAFE.compareAndSwapInt(this, workerCountsOffset,
+                                                  wc, wc - ONE_RUNNING))
+                        continue;
+                    running = false;
+                }
+                long h = eventWaiters;
+                if (h != 0L && (int)(h >>> EVENT_COUNT_SHIFT) != eventCount)
                     releaseEventWaiters();
-                if (stat >= 0 &&
-                    ((workerCounts & RUNNING_COUNT_MASK) == 0 ||
-                     (stat =
-                      joinMe.internalAwaitDone(JOIN_TIMEOUT_MILLIS)) >= 0))
-                    helpMaintainParallelism(); // timeout or no running workers
+                if ((workerCounts & RUNNING_COUNT_MASK) != 0) {
+                    long ms; int ns;
+                    if (!timed) {
+                        ms = JOIN_TIMEOUT_MILLIS;
+                        ns = 0;
+                    }
+                    else { // at most JOIN_TIMEOUT_MILLIS per wait
+                        long nt = nanos - (System.nanoTime() - startTime);
+                        if (nt <= 0L)
+                            break;
+                        ms = nt / 1000000;
+                        if (ms > JOIN_TIMEOUT_MILLIS) {
+                            ms = JOIN_TIMEOUT_MILLIS;
+                            ns = 0;
+                        }
+                        else
+                            ns = (int) (nt % 1000000);
+                    }
+                    joinMe.internalAwaitDone(ms, ns);
+                }
+                if (joinMe.status < 0)
+                    break;
+            }
+            helpMaintainParallelism();
+        }
+        if (!running) {
+            int c;
                 do {} while (!UNSAFE.compareAndSwapInt
                              (this, workerCountsOffset,
                               c = workerCounts, c + ONE_RUNNING));
-                if (stat < 0)
-                    break;   // else restart
             }
         }
-    }
 
     /**
      * Same idea as awaitJoin, but no helping, retries, or timeouts.
      */
     final void awaitBlocker(ManagedBlocker blocker)
         throws InterruptedException {
         while (!blocker.isReleasable()) {
             int wc = workerCounts;
-            if ((wc & RUNNING_COUNT_MASK) != 0 &&
-                UNSAFE.compareAndSwapInt(this, workerCountsOffset,
+            if ((wc & RUNNING_COUNT_MASK) == 0)
+                helpMaintainParallelism();
+            else if (UNSAFE.compareAndSwapInt(this, workerCountsOffset,
                                          wc, wc - ONE_RUNNING)) {
                 try {
                     while (!blocker.isReleasable()) {
                         long h = eventWaiters;
                         if (h != 0L &&

@@ -1127,16 +1172,15 @@
             startTerminating();
 
         // Finish now if all threads terminated; else in some subsequent call
         if ((workerCounts >>> TOTAL_COUNT_SHIFT) == 0) {
             advanceRunLevel(TERMINATED);
-            termination.arrive();
+            termination.forceTermination();
         }
         return true;
     }
 
-
     /**
      * Actions on transition to TERMINATING
      *
      * Runs up to four passes through workers: (0) shutting down each
      * (without waking up if parked) to quickly spread notifications

@@ -1323,21 +1367,17 @@
     }
 
     // Execution methods
 
     /**
-     * Common code for execute, invoke and submit
+     * Submits task and creates, starts, or resumes some workers if necessary
      */
     private <T> void doSubmit(ForkJoinTask<T> task) {
-        if (task == null)
-            throw new NullPointerException();
-        if (runState >= SHUTDOWN)
-            throw new RejectedExecutionException();
         submissionQueue.offer(task);
         int c; // try to increment event count -- CAS failure OK
         UNSAFE.compareAndSwapInt(this, eventCountOffset, c = eventCount, c+1);
-        helpMaintainParallelism(); // create, start, or resume some workers
+        helpMaintainParallelism();
     }
 
     /**
      * Performs the given task, returning its result upon completion.
      *

@@ -1346,24 +1386,51 @@
      * @throws NullPointerException if the task is null
      * @throws RejectedExecutionException if the task cannot be
      *         scheduled for execution
      */
     public <T> T invoke(ForkJoinTask<T> task) {
+        if (task == null)
+            throw new NullPointerException();
+        if (runState >= SHUTDOWN)
+            throw new RejectedExecutionException();
+        Thread t = Thread.currentThread();
+        if ((t instanceof ForkJoinWorkerThread) &&
+            ((ForkJoinWorkerThread)t).pool == this)
+            return task.invoke();  // bypass submit if in same pool
+        else {
         doSubmit(task);
         return task.join();
     }
+    }
 
     /**
+     * Unless terminating, forks task if within an ongoing FJ
+     * computation in the current pool, else submits as external task.
+     */
+    private <T> void forkOrSubmit(ForkJoinTask<T> task) {
+        if (runState >= SHUTDOWN)
+            throw new RejectedExecutionException();
+        Thread t = Thread.currentThread();
+        if ((t instanceof ForkJoinWorkerThread) &&
+            ((ForkJoinWorkerThread)t).pool == this)
+            task.fork();
+        else
+            doSubmit(task);
+    }
+
+    /**
      * Arranges for (asynchronous) execution of the given task.
      *
      * @param task the task
      * @throws NullPointerException if the task is null
      * @throws RejectedExecutionException if the task cannot be
      *         scheduled for execution
      */
     public void execute(ForkJoinTask<?> task) {
-        doSubmit(task);
+        if (task == null)
+            throw new NullPointerException();
+        forkOrSubmit(task);
     }
 
     // AbstractExecutorService methods
 
     /**

@@ -1370,16 +1437,18 @@
      * @throws NullPointerException if the task is null
      * @throws RejectedExecutionException if the task cannot be
      *         scheduled for execution
      */
     public void execute(Runnable task) {
+        if (task == null)
+            throw new NullPointerException();
         ForkJoinTask<?> job;
         if (task instanceof ForkJoinTask<?>) // avoid re-wrap
             job = (ForkJoinTask<?>) task;
         else
             job = ForkJoinTask.adapt(task, null);
-        doSubmit(job);
+        forkOrSubmit(job);
     }
 
     /**
      * Submits a ForkJoinTask for execution.
      *

@@ -1388,48 +1457,56 @@
      * @throws NullPointerException if the task is null
      * @throws RejectedExecutionException if the task cannot be
      *         scheduled for execution
      */
     public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
-        doSubmit(task);
+        if (task == null)
+            throw new NullPointerException();
+        forkOrSubmit(task);
         return task;
     }
 
     /**
      * @throws NullPointerException if the task is null
      * @throws RejectedExecutionException if the task cannot be
      *         scheduled for execution
      */
     public <T> ForkJoinTask<T> submit(Callable<T> task) {
+        if (task == null)
+            throw new NullPointerException();
         ForkJoinTask<T> job = ForkJoinTask.adapt(task);
-        doSubmit(job);
+        forkOrSubmit(job);
         return job;
     }
 
     /**
      * @throws NullPointerException if the task is null
      * @throws RejectedExecutionException if the task cannot be
      *         scheduled for execution
      */
     public <T> ForkJoinTask<T> submit(Runnable task, T result) {
+        if (task == null)
+            throw new NullPointerException();
         ForkJoinTask<T> job = ForkJoinTask.adapt(task, result);
-        doSubmit(job);
+        forkOrSubmit(job);
         return job;
     }
 
     /**
      * @throws NullPointerException if the task is null
      * @throws RejectedExecutionException if the task cannot be
      *         scheduled for execution
      */
     public ForkJoinTask<?> submit(Runnable task) {
+        if (task == null)
+            throw new NullPointerException();
         ForkJoinTask<?> job;
         if (task instanceof ForkJoinTask<?>) // avoid re-wrap
             job = (ForkJoinTask<?>) task;
         else
             job = ForkJoinTask.adapt(task, null);
-        doSubmit(job);
+        forkOrSubmit(job);
         return job;
     }
 
     /**
      * @throws NullPointerException       {@inheritDoc}

@@ -1723,12 +1800,15 @@
     /**
      * Returns {@code true} if the process of termination has
      * commenced but not yet completed.  This method may be useful for
      * debugging. A return of {@code true} reported a sufficient
      * period after shutdown may indicate that submitted tasks have
-     * ignored or suppressed interruption, causing this executor not
-     * to properly terminate.
+     * ignored or suppressed interruption, or are waiting for IO,
+     * causing this executor not to properly terminate. (See the
+     * advisory notes for class {@link ForkJoinTask} stating that
+     * tasks should not normally entail blocking operations.  But if
+     * they do, they must abort them on interrupt.)
      *
      * @return {@code true} if terminating but not yet terminated
      */
     public boolean isTerminating() {
         return (runState & (TERMINATING|TERMINATED)) == TERMINATING;

@@ -1762,14 +1842,15 @@
      * @throws InterruptedException if interrupted while waiting
      */
     public boolean awaitTermination(long timeout, TimeUnit unit)
         throws InterruptedException {
         try {
-            return termination.awaitAdvanceInterruptibly(0, timeout, unit) > 0;
+            termination.awaitAdvanceInterruptibly(0, timeout, unit);
         } catch (TimeoutException ex) {
             return false;
         }
+        return true;
     }
 
     /**
      * Interface for extending managed parallelism for tasks running
      * in {@link ForkJoinPool}s.