< prev index next >

src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java

Print this page
8259800: timeout in tck test testForkJoin(ForkJoinPool8Test)
Reviewed-by: martin


1029         }
1030 
1031         /**
1032          * Pops the given task for owner only if it is at the current top.
1033          */
1034         final boolean tryUnpush(ForkJoinTask<?> task) {
1035             int s = top, cap; ForkJoinTask<?>[] a;
1036             if ((a = array) != null && (cap = a.length) > 0 && base != s-- &&
1037                 casSlotToNull(a, (cap - 1) & s, task)) {
1038                 top = s;
1039                 return true;
1040             }
1041             return false;
1042         }
1043 
1044         /**
1045          * Locking version of tryUnpush.
1046          */
1047         final boolean externalTryUnpush(ForkJoinTask<?> task) {
1048             boolean taken = false;

1049             int s = top, cap, k; ForkJoinTask<?>[] a;
1050             if ((a = array) != null && (cap = a.length) > 0 &&
1051                 a[k = (cap - 1) & (s - 1)] == task && tryLock()) {
1052                 if (top == s && array == a &&
1053                     (taken = casSlotToNull(a, k, task)))


1054                     top = s - 1;
1055                 source = 0; // release lock






1056             }
1057             return taken;
1058         }
1059 
1060         /**
1061          * Deep form of tryUnpush: Traverses from top and removes task if
1062          * present, shifting others to fill gap.
1063          */
1064         final boolean tryRemove(ForkJoinTask<?> task, boolean owned) {
1065             boolean taken = false;
1066             int p = top, cap; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
1067             if ((a = array) != null && task != null && (cap = a.length) > 0) {
1068                 int m = cap - 1, s = p - 1, d = p - base;
1069                 for (int i = s, k; d > 0; --i, --d) {
1070                     if ((t = a[k = i & m]) == task) {
1071                         if (owned || tryLock()) {
1072                             if ((owned || (array == a && top == p)) &&
1073                                 (taken = casSlotToNull(a, k, t))) {
1074                                 for (int j = i; j != s; ) // shift down
1075                                     a[j & m] = getAndClearSlot(a, ++j & m);


1177         final int helpComplete(ForkJoinTask<?> task, boolean owned, int limit) {
1178             int status = 0, cap, k, p, s; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
1179             while (task != null && (status = task.status) >= 0 &&
1180                    (a = array) != null && (cap = a.length) > 0 &&
1181                    (t = a[k = (cap - 1) & (s = (p = top) - 1)])
1182                    instanceof CountedCompleter) {
1183                 CountedCompleter<?> f = (CountedCompleter<?>)t;
1184                 boolean taken = false;
1185                 for (;;) {     // exec if root task is a completer of t
1186                     if (f == task) {
1187                         if (owned) {
1188                             if ((taken = casSlotToNull(a, k, t)))
1189                                 top = s;
1190                         }
1191                         else if (tryLock()) {
1192                             if (top == p && array == a &&
1193                                 (taken = casSlotToNull(a, k, t)))
1194                                 top = s;
1195                             source = 0;
1196                         }




1197                         break;
1198                     }
1199                     else if ((f = f.completer) == null)
1200                         break;
1201                 }
1202                 if (!taken)
1203                     break;
1204                 t.doExec();
1205                 if (limit != 0 && --limit == 0)
1206                     break;
1207             }
1208             return status;
1209         }
1210 
1211         /**
1212          * Tries to poll and run AsynchronousCompletionTasks until
1213          * none found or blocker is released.
1214          *
1215          * @param blocker the blocker
1216          */
1217         final void helpAsyncBlocker(ManagedBlocker blocker) {
1218             int cap, b, d, k; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
1219             while (blocker != null && (d = top - (b = base)) > 0 &&
1220                    (a = array) != null && (cap = a.length) > 0 &&
1221                    (((t = getSlot(a, k = (cap - 1) & b)) == null && d > 1) ||
1222                     t instanceof
1223                     CompletableFuture.AsynchronousCompletionTask) &&
1224                    !blocker.isReleasable()) {
1225                 if (t != null && base == b++ && casSlotToNull(a, k, t)) {


1569                 break;                                // terminating
1570             else {
1571                 long nc = (v.stackPred & SP_MASK) | (UC_MASK & (c + RC_UNIT));
1572                 Thread vt = v.owner;
1573                 if (c == (c = compareAndExchangeCtl(c, nc))) {
1574                     v.phase = sp;
1575                     LockSupport.unpark(vt);           // release idle worker
1576                     break;
1577                 }
1578             }
1579         }
1580     }
1581 
1582     /**
1583      * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
1584      * See above for explanation.
1585      *
1586      * @param w caller's WorkQueue (may be null on failed initialization)
1587      */
1588     final void runWorker(WorkQueue w) {
1589         if (w != null) {                        // skip on failed init
1590             w.config |= SRC;                    // mark as valid source
1591             int r = w.stackPred, src = 0;       // use seed from registerWorker
1592             do {
1593                 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
1594             } while ((src = scan(w, src, r)) >= 0 ||
1595                      (src = awaitWork(w)) == 0);
1596         }
1597     }
1598 
1599     /**
1600      * Scans for and if found executes top-level tasks: Tries to poll
1601      * each queue starting at a random index with random stride,
1602      * returning source id or retry indicator if contended or
1603      * inconsistent.
1604      *
1605      * @param w caller's WorkQueue
1606      * @param prevSrc the previous queue stolen from in current phase, or 0
1607      * @param r random seed
1608      * @return id of queue if taken, negative if none found, prevSrc for retry
1609      */


1694                 Thread.interrupted();
1695             else if (deadline == 0L)
1696                 LockSupport.park();
1697             else if (deadline - System.currentTimeMillis() > TIMEOUT_SLOP)
1698                 LockSupport.parkUntil(deadline);
1699             else if (((int)c & SMASK) == (w.config & SMASK) &&
1700                      compareAndSetCtl(c, ((UC_MASK & (c - TC_UNIT)) |
1701                                           (prevCtl & SP_MASK)))) {
1702                 w.config |= QUIET;           // sentinel for deregisterWorker
1703                 return -1;                   // drop on timeout
1704             }
1705             else if ((deadline += keepAlive) == 0L)
1706                 deadline = 1L;               // not at head; restart timer
1707         }
1708         return 0;
1709     }
1710 
1711     // Utilities used by ForkJoinTask
1712 
1713     /**
1714      * Returns true if all workers are busy, possibly creating one if allowed
1715      */
1716     final boolean isSaturated() {
1717         int maxTotal = bounds >>> SWIDTH;
1718         for (long c;;) {
1719             if (((int)(c = ctl) & ~UNSIGNALLED) != 0)
1720                 return false;
1721             if ((short)(c >>> TC_SHIFT) >= maxTotal)
1722                 return true;
1723             long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK);
1724             if (compareAndSetCtl(c, nc))
1725                 return !createWorker();
1726         }
1727     }
1728 
1729     /**
1730      * Returns true if can start terminating if enabled, or already terminated
1731      */
1732     final boolean canStop() {
1733         outer: for (long oldSum = 0L;;) { // repeat until stable
1734             int md; WorkQueue[] qs;  long c;
1735             if ((qs = queues) == null || ((md = mode) & STOP) != 0)
1736                 return true;
1737             if ((md & SMASK) + (int)((c = ctl) >> RC_SHIFT) > 0)
1738                 break;
1739             long checkSum = c;
1740             for (int i = 1; i < qs.length; i += 2) { // scan submitters
1741                 WorkQueue q; ForkJoinTask<?>[] a; int s = 0, cap;
1742                 if ((q = qs[i]) != null && (a = q.array) != null &&
1743                     (cap = a.length) > 0 &&
1744                     ((s = q.top) != q.base || a[(cap - 1) & s] != null ||
1745                      q.source != 0))
1746                     break outer;
1747                 checkSum += (((long)i) << 32) ^ s;
1748             }
1749             if (oldSum == (oldSum = checkSum) && queues == qs)
1750                 return true;
1751         }
1752         return (mode & STOP) != 0; // recheck mode on false return
1753     }
1754 
1755     /**
1756      * Tries to decrement counts (sometimes implicitly) and possibly
1757      * arrange for a compensating worker in preparation for
1758      * blocking. May fail due to interference, in which case -1 is
1759      * returned so caller may retry. A zero return value indicates
1760      * that the caller doesn't need to re-adjust counts when later
1761      * unblocked.
1762      *
1763      * @param c incoming ctl value
1764      * @return UNCOMPENSATE: block then adjust, 0: block, -1 : retry
1765      */
1766     private int tryCompensate(long c) {
1767         Predicate<? super ForkJoinPool> sat;
1768         int b = bounds; // counts are signed; centered at parallelism level == 0

1769         int minActive = (short)(b & SMASK),
1770             maxTotal  = b >>> SWIDTH,
1771             active    = (int)(c >> RC_SHIFT),
1772             total     = (short)(c >>> TC_SHIFT),
1773             sp        = (int)c & ~UNSIGNALLED;
1774         if (total >= 0) {


1775             if (sp != 0) {                        // activate idle worker
1776                 WorkQueue[] qs; int n; WorkQueue v;
1777                 if ((qs = queues) != null && (n = qs.length) > 0 &&
1778                     (v = qs[sp & (n - 1)]) != null) {
1779                     Thread vt = v.owner;
1780                     long nc = ((long)v.stackPred & SP_MASK) | (UC_MASK & c);
1781                     if (compareAndSetCtl(c, nc)) {
1782                         v.phase = sp;
1783                         LockSupport.unpark(vt);
1784                         return UNCOMPENSATE;
1785                     }
1786                 }
1787                 return -1;                        // retry
1788             }
1789             else if (active > minActive) {        // reduce parallelism
1790                 long nc = ((RC_MASK & (c - RC_UNIT)) | (~RC_MASK & c));
1791                 return compareAndSetCtl(c, nc) ? UNCOMPENSATE : -1;
1792             }
1793         }
1794         if (total < maxTotal) {                   // expand pool


1802             return 0;
1803         else
1804             throw new RejectedExecutionException(
1805                 "Thread limit exceeded replacing blocked worker");
1806     }
1807 
1808     /**
1809      * Readjusts RC count; called from ForkJoinTask after blocking.
1810      */
1811     final void uncompensate() {
1812         getAndAddCtl(RC_UNIT);
1813     }
1814 
1815     /**
1816      * Helps if possible until the given task is done.  Scans other
1817      * queues for a task produced by one of w's stealers; returning
1818      * compensated blocking sentinel if none are found.
1819      *
1820      * @param task the task
1821      * @param w caller's WorkQueue

1822      * @return task status on exit, or UNCOMPENSATE for compensated blocking
1823      */
1824     final int helpJoin(ForkJoinTask<?> task, WorkQueue w) {
1825         int s = 0;
1826         if (task != null && w != null) {
1827             int wsrc = w.source, wid = w.config & SMASK, r = wid + 2;
1828             boolean scan = true;
1829             long c = 0L;                          // track ctl stability
1830             outer: for (;;) {
1831                 if ((s = task.status) < 0)
1832                     break;
1833                 else if (scan = !scan) {          // previous scan was empty
1834                     if (mode < 0)
1835                         ForkJoinTask.cancelIgnoringExceptions(task);
1836                     else if (c == (c = ctl) && (s = tryCompensate(c)) >= 0)
1837                         break;                    // block
1838                 }
1839                 else {                            // scan for subtasks
1840                     WorkQueue[] qs = queues;
1841                     int n = (qs == null) ? 0 : qs.length, m = n - 1;
1842                     for (int i = n; i > 0; i -= 2, r += 2) {
1843                         int j; WorkQueue q, x, y; ForkJoinTask<?>[] a;
1844                         if ((q = qs[j = r & m]) != null) {
1845                             int sq = q.source & SMASK, cap, b;
1846                             if ((a = q.array) != null && (cap = a.length) > 0) {
1847                                 int k = (cap - 1) & (b = q.base);
1848                                 int nextBase = b + 1, src = j | SRC, sx;
1849                                 ForkJoinTask<?> t = WorkQueue.getSlot(a, k);
1850                                 boolean eligible = sq == wid ||
1851                                     ((x = qs[sq & m]) != null &&   // indirect
1852                                      ((sx = (x.source & SMASK)) == wid ||
1853                                       ((y = qs[sx & m]) != null && // 2-indirect
1854                                        (y.source & SMASK) == wid)));
1855                                 if ((s = task.status) < 0)
1856                                     break outer;
1857                                 else if ((q.source & SMASK) != sq ||
1858                                          q.base != b)
1859                                     scan = true;          // inconsistent


2178             q.push(task, this);
2179         else
2180             externalPush(task);
2181         return task;
2182     }
2183 
2184     /**
2185      * Returns common pool queue for an external thread that has
2186      * possibly ever submitted a common pool task (nonzero probe), or
2187      * null if none.
2188      */
2189     static WorkQueue commonQueue() {
2190         ForkJoinPool p; WorkQueue[] qs;
2191         int r = ThreadLocalRandom.getProbe(), n;
2192         return ((p = common) != null && (qs = p.queues) != null &&
2193                 (n = qs.length) > 0 && r != 0) ?
2194             qs[(n - 1) & (r << 1)] : null;
2195     }
2196 
2197     /**










2198      * If the given executor is a ForkJoinPool, poll and execute
2199      * AsynchronousCompletionTasks from worker's queue until none are
2200      * available or blocker is released.
2201      */
2202     static void helpAsyncBlocker(Executor e, ManagedBlocker blocker) {
2203         WorkQueue w = null; Thread t; ForkJoinWorkerThread wt;
2204         if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
2205             if ((wt = (ForkJoinWorkerThread)t).pool == e)
2206                 w = wt.workQueue;
2207         }
2208         else if (e == common)
2209             w = commonQueue();
2210         if (w != null)
2211             w.helpAsyncBlocker(blocker);
2212     }
2213 
2214     /**
2215      * Returns a cheap heuristic guide for task partitioning when
2216      * programmers, frameworks, tools, or languages have little or no
2217      * idea about task granularity.  In essence, by offering this
2218      * method, we ask users only about tradeoffs in overhead vs
2219      * expected throughput and its variance, rather than how finely to
2220      * partition tasks.
2221      *
2222      * In a steady state strict (tree-structured) computation, each
2223      * thread makes available for stealing enough tasks for other
2224      * threads to remain active. Inductively, if all threads play by
2225      * the same rules, each thread should make available only a
2226      * constant number of tasks.
2227      *
2228      * The minimum useful constant is just 1. But using a value of 1
2229      * would require immediate replenishment upon each steal to


2275     /**
2276      * Possibly initiates and/or completes termination.
2277      *
2278      * @param now if true, unconditionally terminate, else only
2279      * if no work and no active workers
2280      * @param enable if true, terminate when next possible
2281      * @return true if terminating or terminated
2282      */
2283     private boolean tryTerminate(boolean now, boolean enable) {
2284         int md; // try to set SHUTDOWN, then STOP, then help terminate
2285         if (((md = mode) & SHUTDOWN) == 0) {
2286             if (!enable)
2287                 return false;
2288             md = getAndBitwiseOrMode(SHUTDOWN);
2289         }
2290         if ((md & STOP) == 0) {
2291             if (!now && !canStop())
2292                 return false;
2293             md = getAndBitwiseOrMode(STOP);
2294         }
2295         for (int k = 0; k < 2; ++k) { // twice in case of lagging qs updates
2296             for (ForkJoinTask<?> t; (t = pollScan(false)) != null; )


2297                 ForkJoinTask.cancelIgnoringExceptions(t); // help cancel

2298             WorkQueue[] qs; int n; WorkQueue q; Thread thread;
2299             if ((qs = queues) != null && (n = qs.length) > 0) {
2300                 for (int j = 1; j < n; j += 2) { // unblock other workers
2301                     if ((q = qs[j]) != null && (thread = q.owner) != null &&
2302                         !thread.isInterrupted()) {

2303                         try {
2304                             thread.interrupt();
2305                         } catch (Throwable ignore) {
2306                         }
2307                     }
2308                 }
2309             }
2310             ReentrantLock lock; Condition cond; // signal when no workers
2311             if (((md = mode) & TERMINATED) == 0 &&
2312                 (md & SMASK) + (short)(ctl >>> TC_SHIFT) <= 0 &&
2313                 (getAndBitwiseOrMode(TERMINATED) & TERMINATED) == 0 &&
2314                 (lock = registrationLock) != null) {
2315                 lock.lock();
2316                 if ((cond = termination) != null)
2317                     cond.signalAll();
2318                 lock.unlock();
2319             }






2320         }
2321         return true;
2322     }
2323 
2324     // Exported methods
2325 
2326     // Constructors
2327 
2328     /**
2329      * Creates a {@code ForkJoinPool} with parallelism equal to {@link
2330      * java.lang.Runtime#availableProcessors}, using defaults for all
2331      * other parameters (see {@link #ForkJoinPool(int,
2332      * ForkJoinWorkerThreadFactory, UncaughtExceptionHandler, boolean,
2333      * int, int, int, Predicate, long, TimeUnit)}).
2334      *
2335      * @throws SecurityException if a security manager exists and
2336      *         the caller is not permitted to modify threads
2337      *         because it does not hold {@link
2338      *         java.lang.RuntimePermission}{@code ("modifyThread")}
2339      */


2523     /**
2524      * Constructor for common pool using parameters possibly
2525      * overridden by system properties
2526      */
2527     private ForkJoinPool(byte forCommonPoolOnly) {
2528         int parallelism = Runtime.getRuntime().availableProcessors() - 1;
2529         ForkJoinWorkerThreadFactory fac = null;
2530         UncaughtExceptionHandler handler = null;
2531         try {  // ignore exceptions in accessing/parsing properties
2532             fac = (ForkJoinWorkerThreadFactory) newInstanceFromSystemProperty(
2533                 "java.util.concurrent.ForkJoinPool.common.threadFactory");
2534             handler = (UncaughtExceptionHandler) newInstanceFromSystemProperty(
2535                 "java.util.concurrent.ForkJoinPool.common.exceptionHandler");
2536             String pp = System.getProperty
2537                 ("java.util.concurrent.ForkJoinPool.common.parallelism");
2538             if (pp != null)
2539                 parallelism = Integer.parseInt(pp);
2540         } catch (Exception ignore) {
2541         }
2542         int p = this.mode = Math.min(Math.max(parallelism, 0), MAX_CAP);


2543         int size = 1 << (33 - Integer.numberOfLeadingZeros(p > 0 ? p - 1 : 1));
2544         this.factory = (fac != null) ? fac :
2545             new DefaultCommonPoolForkJoinWorkerThreadFactory();
2546         this.ueh = handler;
2547         this.keepAlive = DEFAULT_KEEPALIVE;
2548         this.saturate = null;
2549         this.workerNamePrefix = null;
2550         this.bounds = ((1 - p) & SMASK) | (COMMON_MAX_SPARES << SWIDTH);
2551         this.ctl = ((((long)(-p) << TC_SHIFT) & TC_MASK) |
2552                     (((long)(-p) << RC_SHIFT) & RC_MASK));
2553         this.queues = new WorkQueue[size];
2554         this.registrationLock = new ReentrantLock();
2555     }
2556 
2557     /**
2558      * Returns the common pool instance. This pool is statically
2559      * constructed; its run state is unaffected by attempts to {@link
2560      * #shutdown} or {@link #shutdownNow}. However this pool and any
2561      * ongoing processing are automatically terminated upon program
2562      * {@link System#exit}.  Any program that relies on asynchronous
2563      * task processing to complete before program termination should
2564      * invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence},
2565      * before exit.
2566      *
2567      * @return the common pool instance
2568      * @since 1.8
2569      */
2570     public static ForkJoinPool commonPool() {


2576 
2577     /**
2578      * Performs the given task, returning its result upon completion.
2579      * If the computation encounters an unchecked Exception or Error,
2580      * it is rethrown as the outcome of this invocation.  Rethrown
2581      * exceptions behave in the same way as regular exceptions, but,
2582      * when possible, contain stack traces (as displayed for example
2583      * using {@code ex.printStackTrace()}) of both the current thread
2584      * as well as the thread actually encountering the exception;
2585      * minimally only the latter.
2586      *
2587      * @param task the task
2588      * @param <T> the type of the task's result
2589      * @return the task's result
2590      * @throws NullPointerException if the task is null
2591      * @throws RejectedExecutionException if the task cannot be
2592      *         scheduled for execution
2593      */
2594     public <T> T invoke(ForkJoinTask<T> task) {
2595         externalSubmit(task);
2596         return task.join();
2597     }
2598 
2599     /**
2600      * Arranges for (asynchronous) execution of the given task.
2601      *
2602      * @param task the task
2603      * @throws NullPointerException if the task is null
2604      * @throws RejectedExecutionException if the task cannot be
2605      *         scheduled for execution
2606      */
2607     public void execute(ForkJoinTask<?> task) {
2608         externalSubmit(task);
2609     }
2610 
2611     // AbstractExecutorService methods
2612 
2613     /**
2614      * @throws NullPointerException if the task is null
2615      * @throws RejectedExecutionException if the task cannot be
2616      *         scheduled for execution


2668         return externalSubmit((task instanceof ForkJoinTask<?>)
2669             ? (ForkJoinTask<Void>) task // avoid re-wrap
2670             : new ForkJoinTask.AdaptedRunnableAction(task));
2671     }
2672 
2673     /**
2674      * @throws NullPointerException       {@inheritDoc}
2675      * @throws RejectedExecutionException {@inheritDoc}
2676      */
2677     @Override
2678     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
2679         ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
2680         try {
2681             for (Callable<T> t : tasks) {
2682                 ForkJoinTask<T> f =
2683                     new ForkJoinTask.AdaptedInterruptibleCallable<T>(t);
2684                 futures.add(f);
2685                 externalSubmit(f);
2686             }
2687             for (int i = futures.size() - 1; i >= 0; --i)
2688                 ((ForkJoinTask<?>)futures.get(i)).quietlyJoin();
2689             return futures;
2690         } catch (Throwable t) {
2691             for (Future<T> e : futures)
2692                 ForkJoinTask.cancelIgnoringExceptions(e);
2693             throw t;
2694         }
2695     }
2696 
2697     @Override
2698     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
2699                                          long timeout, TimeUnit unit)
2700         throws InterruptedException {
2701         long nanos = unit.toNanos(timeout);
2702         ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
2703         try {
2704             for (Callable<T> t : tasks) {
2705                 ForkJoinTask<T> f =
2706                     new ForkJoinTask.AdaptedInterruptibleCallable<T>(t);
2707                 futures.add(f);
2708                 externalSubmit(f);
2709             }
2710             long startTime = System.nanoTime(), ns = nanos;
2711             boolean timedOut = (ns < 0L);
2712             for (int i = futures.size() - 1; i >= 0; --i) {
2713                 Future<T> f = futures.get(i);
2714                 if (!f.isDone()) {
2715                     if (timedOut)
2716                         ForkJoinTask.cancelIgnoringExceptions(f);
2717                     else {
2718                         try {
2719                             f.get(ns, TimeUnit.NANOSECONDS);
2720                         } catch (CancellationException | TimeoutException |
2721                                  ExecutionException ok) {
2722                         }
2723                         if ((ns = nanos - (System.nanoTime() - startTime)) < 0L)
2724                             timedOut = true;
2725                     }
2726                 }
2727             }
2728             return futures;
2729         } catch (Throwable t) {
2730             for (Future<T> e : futures)
2731                 ForkJoinTask.cancelIgnoringExceptions(e);
2732             throw t;
2733         }
2734     }
2735 
2736     // Task to hold results from InvokeAnyTasks
2737     static final class InvokeAnyRoot<E> extends ForkJoinTask<E> {
2738         private static final long serialVersionUID = 2838392045355241008L;
2739         @SuppressWarnings("serial") // Conditionally serializable
2740         volatile E result;
2741         final AtomicInteger count;  // in case all throw
2742         final ForkJoinPool pool;    // to check shutdown while collecting
2743         InvokeAnyRoot(int n, ForkJoinPool p) {
2744             pool = p;
2745             count = new AtomicInteger(n);
2746         }
2747         final void tryComplete(Callable<E> c) { // called by InvokeAnyTasks
2748             Throwable ex = null;
2749             boolean failed = (c == null || isCancelled() ||
2750                               (pool != null && pool.mode < 0));
2751             if (!failed && !isDone()) {




2752                 try {
2753                     complete(c.call());

2754                 } catch (Throwable tx) {
2755                     ex = tx;
2756                     failed = true;
2757                 }
2758             }
2759             if ((pool != null && pool.mode < 0) ||
2760                 (failed && count.getAndDecrement() <= 1))
2761                 trySetThrown(ex != null ? ex : new CancellationException());
2762         }
2763         public final boolean exec()         { return false; } // never forked
2764         public final E getRawResult()       { return result; }
2765         public final void setRawResult(E v) { result = v; }
2766     }
2767 
2768     // Variant of AdaptedInterruptibleCallable with results in InvokeAnyRoot
2769     static final class InvokeAnyTask<E> extends ForkJoinTask<E> {
2770         private static final long serialVersionUID = 2838392045355241008L;
2771         final InvokeAnyRoot<E> root;
2772         @SuppressWarnings("serial") // Conditionally serializable
2773         final Callable<E> callable;


2800     }
2801 
2802     @Override
2803     public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
2804         throws InterruptedException, ExecutionException {
2805         int n = tasks.size();
2806         if (n <= 0)
2807             throw new IllegalArgumentException();
2808         InvokeAnyRoot<T> root = new InvokeAnyRoot<T>(n, this);
2809         ArrayList<InvokeAnyTask<T>> fs = new ArrayList<>(n);
2810         try {
2811             for (Callable<T> c : tasks) {
2812                 if (c == null)
2813                     throw new NullPointerException();
2814                 InvokeAnyTask<T> f = new InvokeAnyTask<T>(root, c);
2815                 fs.add(f);
2816                 externalSubmit(f);
2817                 if (root.isDone())
2818                     break;
2819             }
2820             return root.get();
2821         } finally {
2822             for (InvokeAnyTask<T> f : fs)
2823                 ForkJoinTask.cancelIgnoringExceptions(f);
2824         }
2825     }
2826 
2827     @Override
2828     public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
2829                            long timeout, TimeUnit unit)
2830         throws InterruptedException, ExecutionException, TimeoutException {
2831         long nanos = unit.toNanos(timeout);
2832         int n = tasks.size();
2833         if (n <= 0)
2834             throw new IllegalArgumentException();
2835         InvokeAnyRoot<T> root = new InvokeAnyRoot<T>(n, this);
2836         ArrayList<InvokeAnyTask<T>> fs = new ArrayList<>(n);
2837         try {
2838             for (Callable<T> c : tasks) {
2839                 if (c == null)
2840                     throw new NullPointerException();
2841                 InvokeAnyTask<T> f = new InvokeAnyTask<T>(root, c);
2842                 fs.add(f);
2843                 externalSubmit(f);
2844                 if (root.isDone())
2845                     break;
2846             }
2847             return root.get(nanos, TimeUnit.NANOSECONDS);
2848         } finally {
2849             for (InvokeAnyTask<T> f : fs)
2850                 ForkJoinTask.cancelIgnoringExceptions(f);
2851         }
2852     }
2853 
2854     /**
2855      * Returns the factory used for constructing new workers.
2856      *
2857      * @return the factory used for constructing new workers
2858      */
2859     public ForkJoinWorkerThreadFactory getFactory() {
2860         return factory;
2861     }
2862 
2863     /**
2864      * Returns the handler for internal worker threads that terminate
2865      * due to unrecoverable errors encountered while executing tasks.
2866      *
2867      * @return the handler, or {@code null} if none




1029         }
1030 
1031         /**
1032          * Pops the given task for owner only if it is at the current top.
1033          */
1034         final boolean tryUnpush(ForkJoinTask<?> task) {
1035             int s = top, cap; ForkJoinTask<?>[] a;
1036             if ((a = array) != null && (cap = a.length) > 0 && base != s-- &&
1037                 casSlotToNull(a, (cap - 1) & s, task)) {
1038                 top = s;
1039                 return true;
1040             }
1041             return false;
1042         }
1043 
1044         /**
1045          * Locking version of tryUnpush.
1046          */
1047         final boolean externalTryUnpush(ForkJoinTask<?> task) {
1048             boolean taken = false;
1049             for (;;) {
1050                 int s = top, cap, k; ForkJoinTask<?>[] a;
1051                 if ((a = array) == null || (cap = a.length) <= 0 ||
1052                     a[k = (cap - 1) & (s - 1)] != task)
1053                     break;
1054                 if (tryLock()) {
1055                     if (top == s && array == a) {
1056                         if (taken = casSlotToNull(a, k, task)) {
1057                             top = s - 1;
1058                             source = 0;
1059                             break;
1060                         }
1061                     }
1062                     source = 0; // release lock for retry
1063                 }
1064                 Thread.yield(); // trylock failure
1065             }
1066             return taken;
1067         }
1068 
1069         /**
1070          * Deep form of tryUnpush: Traverses from top and removes task if
1071          * present, shifting others to fill gap.
1072          */
1073         final boolean tryRemove(ForkJoinTask<?> task, boolean owned) {
1074             boolean taken = false;
1075             int p = top, cap; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
1076             if ((a = array) != null && task != null && (cap = a.length) > 0) {
1077                 int m = cap - 1, s = p - 1, d = p - base;
1078                 for (int i = s, k; d > 0; --i, --d) {
1079                     if ((t = a[k = i & m]) == task) {
1080                         if (owned || tryLock()) {
1081                             if ((owned || (array == a && top == p)) &&
1082                                 (taken = casSlotToNull(a, k, t))) {
1083                                 for (int j = i; j != s; ) // shift down
1084                                     a[j & m] = getAndClearSlot(a, ++j & m);


1186         final int helpComplete(ForkJoinTask<?> task, boolean owned, int limit) {
1187             int status = 0, cap, k, p, s; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
1188             while (task != null && (status = task.status) >= 0 &&
1189                    (a = array) != null && (cap = a.length) > 0 &&
1190                    (t = a[k = (cap - 1) & (s = (p = top) - 1)])
1191                    instanceof CountedCompleter) {
1192                 CountedCompleter<?> f = (CountedCompleter<?>)t;
1193                 boolean taken = false;
1194                 for (;;) {     // exec if root task is a completer of t
1195                     if (f == task) {
1196                         if (owned) {
1197                             if ((taken = casSlotToNull(a, k, t)))
1198                                 top = s;
1199                         }
1200                         else if (tryLock()) {
1201                             if (top == p && array == a &&
1202                                 (taken = casSlotToNull(a, k, t)))
1203                                 top = s;
1204                             source = 0;
1205                         }
1206                         if (taken)
1207                             t.doExec();
1208                         else if (!owned)
1209                             Thread.yield(); // tryLock failure
1210                         break;
1211                     }
1212                     else if ((f = f.completer) == null)
1213                         break;
1214                 }
1215                 if (taken && limit != 0 && --limit == 0)



1216                     break;
1217             }
1218             return status;
1219         }
1220 
1221         /**
1222          * Tries to poll and run AsynchronousCompletionTasks until
1223          * none found or blocker is released.
1224          *
1225          * @param blocker the blocker
1226          */
1227         final void helpAsyncBlocker(ManagedBlocker blocker) {
1228             int cap, b, d, k; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
1229             while (blocker != null && (d = top - (b = base)) > 0 &&
1230                    (a = array) != null && (cap = a.length) > 0 &&
1231                    (((t = getSlot(a, k = (cap - 1) & b)) == null && d > 1) ||
1232                     t instanceof
1233                     CompletableFuture.AsynchronousCompletionTask) &&
1234                    !blocker.isReleasable()) {
1235                 if (t != null && base == b++ && casSlotToNull(a, k, t)) {


1579                 break;                                // terminating
1580             else {
1581                 long nc = (v.stackPred & SP_MASK) | (UC_MASK & (c + RC_UNIT));
1582                 Thread vt = v.owner;
1583                 if (c == (c = compareAndExchangeCtl(c, nc))) {
1584                     v.phase = sp;
1585                     LockSupport.unpark(vt);           // release idle worker
1586                     break;
1587                 }
1588             }
1589         }
1590     }
1591 
1592     /**
1593      * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
1594      * See above for explanation.
1595      *
1596      * @param w caller's WorkQueue (may be null on failed initialization)
1597      */
1598     final void runWorker(WorkQueue w) {
1599         if (mode >= 0 && w != null) {           // skip on failed init
1600             w.config |= SRC;                    // mark as valid source
1601             int r = w.stackPred, src = 0;       // use seed from registerWorker
1602             do {
1603                 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
1604             } while ((src = scan(w, src, r)) >= 0 ||
1605                      (src = awaitWork(w)) == 0);
1606         }
1607     }
1608 
1609     /**
1610      * Scans for and if found executes top-level tasks: Tries to poll
1611      * each queue starting at a random index with random stride,
1612      * returning source id or retry indicator if contended or
1613      * inconsistent.
1614      *
1615      * @param w caller's WorkQueue
1616      * @param prevSrc the previous queue stolen from in current phase, or 0
1617      * @param r random seed
1618      * @return id of queue if taken, negative if none found, prevSrc for retry
1619      */


1704                 Thread.interrupted();
1705             else if (deadline == 0L)
1706                 LockSupport.park();
1707             else if (deadline - System.currentTimeMillis() > TIMEOUT_SLOP)
1708                 LockSupport.parkUntil(deadline);
1709             else if (((int)c & SMASK) == (w.config & SMASK) &&
1710                      compareAndSetCtl(c, ((UC_MASK & (c - TC_UNIT)) |
1711                                           (prevCtl & SP_MASK)))) {
1712                 w.config |= QUIET;           // sentinel for deregisterWorker
1713                 return -1;                   // drop on timeout
1714             }
1715             else if ((deadline += keepAlive) == 0L)
1716                 deadline = 1L;               // not at head; restart timer
1717         }
1718         return 0;
1719     }
1720 
1721     // Utilities used by ForkJoinTask
1722 
1723     /**
















1724      * Returns true if can start terminating if enabled, or already terminated
1725      */
1726     final boolean canStop() {
1727         outer: for (long oldSum = 0L;;) { // repeat until stable
1728             int md; WorkQueue[] qs;  long c;
1729             if ((qs = queues) == null || ((md = mode) & STOP) != 0)
1730                 return true;
1731             if ((md & SMASK) + (int)((c = ctl) >> RC_SHIFT) > 0)
1732                 break;
1733             long checkSum = c;
1734             for (int i = 1; i < qs.length; i += 2) { // scan submitters
1735                 WorkQueue q; ForkJoinTask<?>[] a; int s = 0, cap;
1736                 if ((q = qs[i]) != null && (a = q.array) != null &&
1737                     (cap = a.length) > 0 &&
1738                     ((s = q.top) != q.base || a[(cap - 1) & s] != null ||
1739                      q.source != 0))
1740                     break outer;
1741                 checkSum += (((long)i) << 32) ^ s;
1742             }
1743             if (oldSum == (oldSum = checkSum) && queues == qs)
1744                 return true;
1745         }
1746         return (mode & STOP) != 0; // recheck mode on false return
1747     }
1748 
1749     /**
1750      * Tries to decrement counts (sometimes implicitly) and possibly
1751      * arrange for a compensating worker in preparation for
1752      * blocking. May fail due to interference, in which case -1 is
1753      * returned so caller may retry. A zero return value indicates
1754      * that the caller doesn't need to re-adjust counts when later
1755      * unblocked.
1756      *
1757      * @param c incoming ctl value
1758      * @return UNCOMPENSATE: block then adjust, 0: block, -1 : retry
1759      */
1760     private int tryCompensate(long c) {
1761         Predicate<? super ForkJoinPool> sat;
1762         int md = mode, b = bounds;
1763         // counts are signed; centered at parallelism level == 0
1764         int minActive = (short)(b & SMASK),
1765             maxTotal  = b >>> SWIDTH,
1766             active    = (int)(c >> RC_SHIFT),
1767             total     = (short)(c >>> TC_SHIFT),
1768             sp        = (int)c & ~UNSIGNALLED;
1769         if ((md & SMASK) == 0)
1770             return 0;                  // cannot compensate if parallelism zero
1771         else if (total >= 0) {
1772             if (sp != 0) {                        // activate idle worker
1773                 WorkQueue[] qs; int n; WorkQueue v;
1774                 if ((qs = queues) != null && (n = qs.length) > 0 &&
1775                     (v = qs[sp & (n - 1)]) != null) {
1776                     Thread vt = v.owner;
1777                     long nc = ((long)v.stackPred & SP_MASK) | (UC_MASK & c);
1778                     if (compareAndSetCtl(c, nc)) {
1779                         v.phase = sp;
1780                         LockSupport.unpark(vt);
1781                         return UNCOMPENSATE;
1782                     }
1783                 }
1784                 return -1;                        // retry
1785             }
1786             else if (active > minActive) {        // reduce parallelism
1787                 long nc = ((RC_MASK & (c - RC_UNIT)) | (~RC_MASK & c));
1788                 return compareAndSetCtl(c, nc) ? UNCOMPENSATE : -1;
1789             }
1790         }
1791         if (total < maxTotal) {                   // expand pool


1799             return 0;
1800         else
1801             throw new RejectedExecutionException(
1802                 "Thread limit exceeded replacing blocked worker");
1803     }
1804 
1805     /**
1806      * Readjusts RC count; called from ForkJoinTask after blocking.
1807      */
1808     final void uncompensate() {
1809         getAndAddCtl(RC_UNIT);
1810     }
1811 
1812     /**
1813      * Helps if possible until the given task is done.  Scans other
1814      * queues for a task produced by one of w's stealers; returning
1815      * compensated blocking sentinel if none are found.
1816      *
1817      * @param task the task
1818      * @param w caller's WorkQueue
1819      * @param canHelp if false, compensate only
1820      * @return task status on exit, or UNCOMPENSATE for compensated blocking
1821      */
1822     final int helpJoin(ForkJoinTask<?> task, WorkQueue w, boolean canHelp) {
1823         int s = 0;
1824         if (task != null && w != null) {
1825             int wsrc = w.source, wid = w.config & SMASK, r = wid + 2;
1826             boolean scan = true;
1827             long c = 0L;                          // track ctl stability
1828             outer: for (;;) {
1829                 if ((s = task.status) < 0)
1830                     break;
1831                 else if (scan = !scan) {          // previous scan was empty
1832                     if (mode < 0)
1833                         ForkJoinTask.cancelIgnoringExceptions(task);
1834                     else if (c == (c = ctl) && (s = tryCompensate(c)) >= 0)
1835                         break;                    // block
1836                 }
1837                 else if (canHelp) {               // scan for subtasks
1838                     WorkQueue[] qs = queues;
1839                     int n = (qs == null) ? 0 : qs.length, m = n - 1;
1840                     for (int i = n; i > 0; i -= 2, r += 2) {
1841                         int j; WorkQueue q, x, y; ForkJoinTask<?>[] a;
1842                         if ((q = qs[j = r & m]) != null) {
1843                             int sq = q.source & SMASK, cap, b;
1844                             if ((a = q.array) != null && (cap = a.length) > 0) {
1845                                 int k = (cap - 1) & (b = q.base);
1846                                 int nextBase = b + 1, src = j | SRC, sx;
1847                                 ForkJoinTask<?> t = WorkQueue.getSlot(a, k);
1848                                 boolean eligible = sq == wid ||
1849                                     ((x = qs[sq & m]) != null &&   // indirect
1850                                      ((sx = (x.source & SMASK)) == wid ||
1851                                       ((y = qs[sx & m]) != null && // 2-indirect
1852                                        (y.source & SMASK) == wid)));
1853                                 if ((s = task.status) < 0)
1854                                     break outer;
1855                                 else if ((q.source & SMASK) != sq ||
1856                                          q.base != b)
1857                                     scan = true;          // inconsistent


2176             q.push(task, this);
2177         else
2178             externalPush(task);
2179         return task;
2180     }
2181 
2182     /**
2183      * Returns common pool queue for an external thread that has
2184      * possibly ever submitted a common pool task (nonzero probe), or
2185      * null if none.
2186      */
2187     static WorkQueue commonQueue() {
2188         ForkJoinPool p; WorkQueue[] qs;
2189         int r = ThreadLocalRandom.getProbe(), n;
2190         return ((p = common) != null && (qs = p.queues) != null &&
2191                 (n = qs.length) > 0 && r != 0) ?
2192             qs[(n - 1) & (r << 1)] : null;
2193     }
2194 
2195     /**
2196      * Returns queue for an external thread, if one exists
2197      */
2198     final WorkQueue externalQueue() {
2199         WorkQueue[] qs;
2200         int r = ThreadLocalRandom.getProbe(), n;
2201         return ((qs = queues) != null && (n = qs.length) > 0 && r != 0) ?
2202             qs[(n - 1) & (r << 1)] : null;
2203     }
2204 
2205     /**
2206      * If the given executor is a ForkJoinPool, poll and execute
2207      * AsynchronousCompletionTasks from worker's queue until none are
2208      * available or blocker is released.
2209      */
2210     static void helpAsyncBlocker(Executor e, ManagedBlocker blocker) {
2211         WorkQueue w = null; Thread t; ForkJoinWorkerThread wt;
2212         if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
2213             if ((wt = (ForkJoinWorkerThread)t).pool == e)
2214                 w = wt.workQueue;
2215         }
2216         else if (e instanceof ForkJoinPool)
2217             w = ((ForkJoinPool)e).externalQueue();
2218         if (w != null)
2219             w.helpAsyncBlocker(blocker);
2220     }
2221 
2222     /**
2223      * Returns a cheap heuristic guide for task partitioning when
2224      * programmers, frameworks, tools, or languages have little or no
2225      * idea about task granularity.  In essence, by offering this
2226      * method, we ask users only about tradeoffs in overhead vs
2227      * expected throughput and its variance, rather than how finely to
2228      * partition tasks.
2229      *
2230      * In a steady state strict (tree-structured) computation, each
2231      * thread makes available for stealing enough tasks for other
2232      * threads to remain active. Inductively, if all threads play by
2233      * the same rules, each thread should make available only a
2234      * constant number of tasks.
2235      *
2236      * The minimum useful constant is just 1. But using a value of 1
2237      * would require immediate replenishment upon each steal to


2283     /**
2284      * Possibly initiates and/or completes termination.
2285      *
2286      * @param now if true, unconditionally terminate, else only
2287      * if no work and no active workers
2288      * @param enable if true, terminate when next possible
2289      * @return true if terminating or terminated
2290      */
2291     private boolean tryTerminate(boolean now, boolean enable) {
2292         int md; // try to set SHUTDOWN, then STOP, then help terminate
2293         if (((md = mode) & SHUTDOWN) == 0) {
2294             if (!enable)
2295                 return false;
2296             md = getAndBitwiseOrMode(SHUTDOWN);
2297         }
2298         if ((md & STOP) == 0) {
2299             if (!now && !canStop())
2300                 return false;
2301             md = getAndBitwiseOrMode(STOP);
2302         }
2303         for (boolean rescan = true;;) { // repeat until no changes
2304             boolean changed = false;
2305             for (ForkJoinTask<?> t; (t = pollScan(false)) != null; ) {
2306                 changed = true;
2307                 ForkJoinTask.cancelIgnoringExceptions(t); // help cancel
2308             }
2309             WorkQueue[] qs; int n; WorkQueue q; Thread thread;
2310             if ((qs = queues) != null && (n = qs.length) > 0) {
2311                 for (int j = 1; j < n; j += 2) { // unblock other workers
2312                     if ((q = qs[j]) != null && (thread = q.owner) != null &&
2313                         !thread.isInterrupted()) {
2314                         changed = true;
2315                         try {
2316                             thread.interrupt();
2317                         } catch (Throwable ignore) {
2318                         }
2319                     }
2320                 }
2321             }
2322             ReentrantLock lock; Condition cond; // signal when no workers
2323             if (((md = mode) & TERMINATED) == 0 &&
2324                 (md & SMASK) + (short)(ctl >>> TC_SHIFT) <= 0 &&
2325                 (getAndBitwiseOrMode(TERMINATED) & TERMINATED) == 0 &&
2326                 (lock = registrationLock) != null) {
2327                 lock.lock();
2328                 if ((cond = termination) != null)
2329                     cond.signalAll();
2330                 lock.unlock();
2331             }
2332             if (changed)
2333                 rescan = true;
2334             else if (rescan)
2335                 rescan = false;
2336             else
2337                 break;
2338         }
2339         return true;
2340     }
2341 
2342     // Exported methods
2343 
2344     // Constructors
2345 
2346     /**
2347      * Creates a {@code ForkJoinPool} with parallelism equal to {@link
2348      * java.lang.Runtime#availableProcessors}, using defaults for all
2349      * other parameters (see {@link #ForkJoinPool(int,
2350      * ForkJoinWorkerThreadFactory, UncaughtExceptionHandler, boolean,
2351      * int, int, int, Predicate, long, TimeUnit)}).
2352      *
2353      * @throws SecurityException if a security manager exists and
2354      *         the caller is not permitted to modify threads
2355      *         because it does not hold {@link
2356      *         java.lang.RuntimePermission}{@code ("modifyThread")}
2357      */


2541     /**
2542      * Constructor for common pool using parameters possibly
2543      * overridden by system properties
2544      */
2545     private ForkJoinPool(byte forCommonPoolOnly) {
2546         int parallelism = Runtime.getRuntime().availableProcessors() - 1;
2547         ForkJoinWorkerThreadFactory fac = null;
2548         UncaughtExceptionHandler handler = null;
2549         try {  // ignore exceptions in accessing/parsing properties
2550             fac = (ForkJoinWorkerThreadFactory) newInstanceFromSystemProperty(
2551                 "java.util.concurrent.ForkJoinPool.common.threadFactory");
2552             handler = (UncaughtExceptionHandler) newInstanceFromSystemProperty(
2553                 "java.util.concurrent.ForkJoinPool.common.exceptionHandler");
2554             String pp = System.getProperty
2555                 ("java.util.concurrent.ForkJoinPool.common.parallelism");
2556             if (pp != null)
2557                 parallelism = Integer.parseInt(pp);
2558         } catch (Exception ignore) {
2559         }
2560         int p = this.mode = Math.min(Math.max(parallelism, 0), MAX_CAP);
2561         int maxSpares = (p == 0) ? 0 : COMMON_MAX_SPARES;
2562         int bnds = ((1 - p) & SMASK) | (maxSpares << SWIDTH);
2563         int size = 1 << (33 - Integer.numberOfLeadingZeros(p > 0 ? p - 1 : 1));
2564         this.factory = (fac != null) ? fac :
2565             new DefaultCommonPoolForkJoinWorkerThreadFactory();
2566         this.ueh = handler;
2567         this.keepAlive = DEFAULT_KEEPALIVE;
2568         this.saturate = null;
2569         this.workerNamePrefix = null;
2570         this.bounds = bnds;
2571         this.ctl = ((((long)(-p) << TC_SHIFT) & TC_MASK) |
2572                     (((long)(-p) << RC_SHIFT) & RC_MASK));
2573         this.queues = new WorkQueue[size];
2574         this.registrationLock = new ReentrantLock();
2575     }
2576 
2577     /**
2578      * Returns the common pool instance. This pool is statically
2579      * constructed; its run state is unaffected by attempts to {@link
2580      * #shutdown} or {@link #shutdownNow}. However this pool and any
2581      * ongoing processing are automatically terminated upon program
2582      * {@link System#exit}.  Any program that relies on asynchronous
2583      * task processing to complete before program termination should
2584      * invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence},
2585      * before exit.
2586      *
2587      * @return the common pool instance
2588      * @since 1.8
2589      */
2590     public static ForkJoinPool commonPool() {


2596 
2597     /**
2598      * Performs the given task, returning its result upon completion.
2599      * If the computation encounters an unchecked Exception or Error,
2600      * it is rethrown as the outcome of this invocation.  Rethrown
2601      * exceptions behave in the same way as regular exceptions, but,
2602      * when possible, contain stack traces (as displayed for example
2603      * using {@code ex.printStackTrace()}) of both the current thread
2604      * as well as the thread actually encountering the exception;
2605      * minimally only the latter.
2606      *
2607      * @param task the task
2608      * @param <T> the type of the task's result
2609      * @return the task's result
2610      * @throws NullPointerException if the task is null
2611      * @throws RejectedExecutionException if the task cannot be
2612      *         scheduled for execution
2613      */
2614     public <T> T invoke(ForkJoinTask<T> task) {
2615         externalSubmit(task);
2616         return task.joinForPoolInvoke(this);
2617     }
2618 
2619     /**
2620      * Arranges for (asynchronous) execution of the given task.
2621      *
2622      * @param task the task
2623      * @throws NullPointerException if the task is null
2624      * @throws RejectedExecutionException if the task cannot be
2625      *         scheduled for execution
2626      */
2627     public void execute(ForkJoinTask<?> task) {
2628         externalSubmit(task);
2629     }
2630 
2631     // AbstractExecutorService methods
2632 
2633     /**
2634      * @throws NullPointerException if the task is null
2635      * @throws RejectedExecutionException if the task cannot be
2636      *         scheduled for execution


2688         return externalSubmit((task instanceof ForkJoinTask<?>)
2689             ? (ForkJoinTask<Void>) task // avoid re-wrap
2690             : new ForkJoinTask.AdaptedRunnableAction(task));
2691     }
2692 
2693     /**
2694      * @throws NullPointerException       {@inheritDoc}
2695      * @throws RejectedExecutionException {@inheritDoc}
2696      */
2697     @Override
2698     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
2699         ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
2700         try {
2701             for (Callable<T> t : tasks) {
2702                 ForkJoinTask<T> f =
2703                     new ForkJoinTask.AdaptedInterruptibleCallable<T>(t);
2704                 futures.add(f);
2705                 externalSubmit(f);
2706             }
2707             for (int i = futures.size() - 1; i >= 0; --i)
2708                 ((ForkJoinTask<?>)futures.get(i)).awaitPoolInvoke(this);
2709             return futures;
2710         } catch (Throwable t) {
2711             for (Future<T> e : futures)
2712                 ForkJoinTask.cancelIgnoringExceptions(e);
2713             throw t;
2714         }
2715     }
2716 
2717     @Override
2718     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
2719                                          long timeout, TimeUnit unit)
2720         throws InterruptedException {
2721         long nanos = unit.toNanos(timeout);
2722         ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
2723         try {
2724             for (Callable<T> t : tasks) {
2725                 ForkJoinTask<T> f =
2726                     new ForkJoinTask.AdaptedInterruptibleCallable<T>(t);
2727                 futures.add(f);
2728                 externalSubmit(f);
2729             }
2730             long startTime = System.nanoTime(), ns = nanos;
2731             boolean timedOut = (ns < 0L);
2732             for (int i = futures.size() - 1; i >= 0; --i) {
2733                 Future<T> f = futures.get(i);
2734                 if (!f.isDone()) {
2735                     if (timedOut)
2736                         ForkJoinTask.cancelIgnoringExceptions(f);
2737                     else {
2738                         ((ForkJoinTask<T>)f).awaitPoolInvoke(this, ns);




2739                         if ((ns = nanos - (System.nanoTime() - startTime)) < 0L)
2740                             timedOut = true;
2741                     }
2742                 }
2743             }
2744             return futures;
2745         } catch (Throwable t) {
2746             for (Future<T> e : futures)
2747                 ForkJoinTask.cancelIgnoringExceptions(e);
2748             throw t;
2749         }
2750     }
2751 
2752     // Task to hold results from InvokeAnyTasks
2753     static final class InvokeAnyRoot<E> extends ForkJoinTask<E> {
2754         private static final long serialVersionUID = 2838392045355241008L;
2755         @SuppressWarnings("serial") // Conditionally serializable
2756         volatile E result;
2757         final AtomicInteger count;  // in case all throw
2758         final ForkJoinPool pool;    // to check shutdown while collecting
2759         InvokeAnyRoot(int n, ForkJoinPool p) {
2760             pool = p;
2761             count = new AtomicInteger(n);
2762         }
2763         final void tryComplete(Callable<E> c) { // called by InvokeAnyTasks
2764             Throwable ex = null;
2765             boolean failed;
2766             if (c == null || Thread.interrupted() ||
2767                 (pool != null && pool.mode < 0))
2768                 failed = true;
2769             else if (isDone())
2770                 failed = false;
2771             else {
2772                 try {
2773                     complete(c.call());
2774                     failed = false;
2775                 } catch (Throwable tx) {
2776                     ex = tx;
2777                     failed = true;
2778                 }
2779             }
2780             if ((pool != null && pool.mode < 0) ||
2781                 (failed && count.getAndDecrement() <= 1))
2782                 trySetThrown(ex != null ? ex : new CancellationException());
2783         }
2784         public final boolean exec()         { return false; } // never forked
2785         public final E getRawResult()       { return result; }
2786         public final void setRawResult(E v) { result = v; }
2787     }
2788 
2789     // Variant of AdaptedInterruptibleCallable with results in InvokeAnyRoot
2790     static final class InvokeAnyTask<E> extends ForkJoinTask<E> {
2791         private static final long serialVersionUID = 2838392045355241008L;
2792         final InvokeAnyRoot<E> root;
2793         @SuppressWarnings("serial") // Conditionally serializable
2794         final Callable<E> callable;


2821     }
2822 
2823     @Override
2824     public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
2825         throws InterruptedException, ExecutionException {
2826         int n = tasks.size();
2827         if (n <= 0)
2828             throw new IllegalArgumentException();
2829         InvokeAnyRoot<T> root = new InvokeAnyRoot<T>(n, this);
2830         ArrayList<InvokeAnyTask<T>> fs = new ArrayList<>(n);
2831         try {
2832             for (Callable<T> c : tasks) {
2833                 if (c == null)
2834                     throw new NullPointerException();
2835                 InvokeAnyTask<T> f = new InvokeAnyTask<T>(root, c);
2836                 fs.add(f);
2837                 externalSubmit(f);
2838                 if (root.isDone())
2839                     break;
2840             }
2841             return root.getForPoolInvoke(this);
2842         } finally {
2843             for (InvokeAnyTask<T> f : fs)
2844                 ForkJoinTask.cancelIgnoringExceptions(f);
2845         }
2846     }
2847 
2848     @Override
2849     public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
2850                            long timeout, TimeUnit unit)
2851         throws InterruptedException, ExecutionException, TimeoutException {
2852         long nanos = unit.toNanos(timeout);
2853         int n = tasks.size();
2854         if (n <= 0)
2855             throw new IllegalArgumentException();
2856         InvokeAnyRoot<T> root = new InvokeAnyRoot<T>(n, this);
2857         ArrayList<InvokeAnyTask<T>> fs = new ArrayList<>(n);
2858         try {
2859             for (Callable<T> c : tasks) {
2860                 if (c == null)
2861                     throw new NullPointerException();
2862                 InvokeAnyTask<T> f = new InvokeAnyTask<T>(root, c);
2863                 fs.add(f);
2864                 externalSubmit(f);
2865                 if (root.isDone())
2866                     break;
2867             }
2868             return root.getForPoolInvoke(this, nanos);
2869         } finally {
2870             for (InvokeAnyTask<T> f : fs)
2871                 ForkJoinTask.cancelIgnoringExceptions(f);
2872         }
2873     }
2874 
2875     /**
2876      * Returns the factory used for constructing new workers.
2877      *
2878      * @return the factory used for constructing new workers
2879      */
2880     public ForkJoinWorkerThreadFactory getFactory() {
2881         return factory;
2882     }
2883 
2884     /**
2885      * Returns the handler for internal worker threads that terminate
2886      * due to unrecoverable errors encountered while executing tasks.
2887      *
2888      * @return the handler, or {@code null} if none


< prev index next >