1029 }
1030
1031 /**
1032 * Pops the given task for owner only if it is at the current top.
1033 */
1034 final boolean tryUnpush(ForkJoinTask<?> task) {
1035 int s = top, cap; ForkJoinTask<?>[] a;
1036 if ((a = array) != null && (cap = a.length) > 0 && base != s-- &&
1037 casSlotToNull(a, (cap - 1) & s, task)) {
1038 top = s;
1039 return true;
1040 }
1041 return false;
1042 }
1043
1044 /**
1045 * Locking version of tryUnpush.
1046 */
1047 final boolean externalTryUnpush(ForkJoinTask<?> task) {
1048 boolean taken = false;
1049 int s = top, cap, k; ForkJoinTask<?>[] a;
1050 if ((a = array) != null && (cap = a.length) > 0 &&
1051 a[k = (cap - 1) & (s - 1)] == task && tryLock()) {
1052 if (top == s && array == a &&
1053 (taken = casSlotToNull(a, k, task)))
1054 top = s - 1;
1055 source = 0; // release lock
1056 }
1057 return taken;
1058 }
1059
1060 /**
1061 * Deep form of tryUnpush: Traverses from top and removes task if
1062 * present, shifting others to fill gap.
1063 */
1064 final boolean tryRemove(ForkJoinTask<?> task, boolean owned) {
1065 boolean taken = false;
1066 int p = top, cap; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
1067 if ((a = array) != null && task != null && (cap = a.length) > 0) {
1068 int m = cap - 1, s = p - 1, d = p - base;
1069 for (int i = s, k; d > 0; --i, --d) {
1070 if ((t = a[k = i & m]) == task) {
1071 if (owned || tryLock()) {
1072 if ((owned || (array == a && top == p)) &&
1073 (taken = casSlotToNull(a, k, t))) {
1074 for (int j = i; j != s; ) // shift down
1075 a[j & m] = getAndClearSlot(a, ++j & m);
1177 final int helpComplete(ForkJoinTask<?> task, boolean owned, int limit) {
1178 int status = 0, cap, k, p, s; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
1179 while (task != null && (status = task.status) >= 0 &&
1180 (a = array) != null && (cap = a.length) > 0 &&
1181 (t = a[k = (cap - 1) & (s = (p = top) - 1)])
1182 instanceof CountedCompleter) {
1183 CountedCompleter<?> f = (CountedCompleter<?>)t;
1184 boolean taken = false;
1185 for (;;) { // exec if root task is a completer of t
1186 if (f == task) {
1187 if (owned) {
1188 if ((taken = casSlotToNull(a, k, t)))
1189 top = s;
1190 }
1191 else if (tryLock()) {
1192 if (top == p && array == a &&
1193 (taken = casSlotToNull(a, k, t)))
1194 top = s;
1195 source = 0;
1196 }
1197 break;
1198 }
1199 else if ((f = f.completer) == null)
1200 break;
1201 }
1202 if (!taken)
1203 break;
1204 t.doExec();
1205 if (limit != 0 && --limit == 0)
1206 break;
1207 }
1208 return status;
1209 }
1210
1211 /**
1212 * Tries to poll and run AsynchronousCompletionTasks until
1213 * none found or blocker is released.
1214 *
1215 * @param blocker the blocker
1216 */
1217 final void helpAsyncBlocker(ManagedBlocker blocker) {
1218 int cap, b, d, k; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
1219 while (blocker != null && (d = top - (b = base)) > 0 &&
1220 (a = array) != null && (cap = a.length) > 0 &&
1221 (((t = getSlot(a, k = (cap - 1) & b)) == null && d > 1) ||
1222 t instanceof
1223 CompletableFuture.AsynchronousCompletionTask) &&
1224 !blocker.isReleasable()) {
1225 if (t != null && base == b++ && casSlotToNull(a, k, t)) {
1569 break; // terminating
1570 else {
1571 long nc = (v.stackPred & SP_MASK) | (UC_MASK & (c + RC_UNIT));
1572 Thread vt = v.owner;
1573 if (c == (c = compareAndExchangeCtl(c, nc))) {
1574 v.phase = sp;
1575 LockSupport.unpark(vt); // release idle worker
1576 break;
1577 }
1578 }
1579 }
1580 }
1581
1582 /**
1583 * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
1584 * See above for explanation.
1585 *
1586 * @param w caller's WorkQueue (may be null on failed initialization)
1587 */
1588 final void runWorker(WorkQueue w) {
1589 if (w != null) { // skip on failed init
1590 w.config |= SRC; // mark as valid source
1591 int r = w.stackPred, src = 0; // use seed from registerWorker
1592 do {
1593 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
1594 } while ((src = scan(w, src, r)) >= 0 ||
1595 (src = awaitWork(w)) == 0);
1596 }
1597 }
1598
1599 /**
1600 * Scans for and if found executes top-level tasks: Tries to poll
1601 * each queue starting at a random index with random stride,
1602 * returning source id or retry indicator if contended or
1603 * inconsistent.
1604 *
1605 * @param w caller's WorkQueue
1606 * @param prevSrc the previous queue stolen from in current phase, or 0
1607 * @param r random seed
1608 * @return id of queue if taken, negative if none found, prevSrc for retry
1609 */
1694 Thread.interrupted();
1695 else if (deadline == 0L)
1696 LockSupport.park();
1697 else if (deadline - System.currentTimeMillis() > TIMEOUT_SLOP)
1698 LockSupport.parkUntil(deadline);
1699 else if (((int)c & SMASK) == (w.config & SMASK) &&
1700 compareAndSetCtl(c, ((UC_MASK & (c - TC_UNIT)) |
1701 (prevCtl & SP_MASK)))) {
1702 w.config |= QUIET; // sentinel for deregisterWorker
1703 return -1; // drop on timeout
1704 }
1705 else if ((deadline += keepAlive) == 0L)
1706 deadline = 1L; // not at head; restart timer
1707 }
1708 return 0;
1709 }
1710
1711 // Utilities used by ForkJoinTask
1712
1713 /**
1714 * Returns true if all workers are busy, possibly creating one if allowed
1715 */
1716 final boolean isSaturated() {
1717 int maxTotal = bounds >>> SWIDTH;
1718 for (long c;;) {
1719 if (((int)(c = ctl) & ~UNSIGNALLED) != 0)
1720 return false;
1721 if ((short)(c >>> TC_SHIFT) >= maxTotal)
1722 return true;
1723 long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK);
1724 if (compareAndSetCtl(c, nc))
1725 return !createWorker();
1726 }
1727 }
1728
1729 /**
1730 * Returns true if can start terminating if enabled, or already terminated
1731 */
1732 final boolean canStop() {
1733 outer: for (long oldSum = 0L;;) { // repeat until stable
1734 int md; WorkQueue[] qs; long c;
1735 if ((qs = queues) == null || ((md = mode) & STOP) != 0)
1736 return true;
1737 if ((md & SMASK) + (int)((c = ctl) >> RC_SHIFT) > 0)
1738 break;
1739 long checkSum = c;
1740 for (int i = 1; i < qs.length; i += 2) { // scan submitters
1741 WorkQueue q; ForkJoinTask<?>[] a; int s = 0, cap;
1742 if ((q = qs[i]) != null && (a = q.array) != null &&
1743 (cap = a.length) > 0 &&
1744 ((s = q.top) != q.base || a[(cap - 1) & s] != null ||
1745 q.source != 0))
1746 break outer;
1747 checkSum += (((long)i) << 32) ^ s;
1748 }
1749 if (oldSum == (oldSum = checkSum) && queues == qs)
1750 return true;
1751 }
1752 return (mode & STOP) != 0; // recheck mode on false return
1753 }
1754
1755 /**
1756 * Tries to decrement counts (sometimes implicitly) and possibly
1757 * arrange for a compensating worker in preparation for
1758 * blocking. May fail due to interference, in which case -1 is
1759 * returned so caller may retry. A zero return value indicates
1760 * that the caller doesn't need to re-adjust counts when later
1761 * unblocked.
1762 *
1763 * @param c incoming ctl value
1764 * @return UNCOMPENSATE: block then adjust, 0: block, -1 : retry
1765 */
1766 private int tryCompensate(long c) {
1767 Predicate<? super ForkJoinPool> sat;
1768 int b = bounds; // counts are signed; centered at parallelism level == 0
1769 int minActive = (short)(b & SMASK),
1770 maxTotal = b >>> SWIDTH,
1771 active = (int)(c >> RC_SHIFT),
1772 total = (short)(c >>> TC_SHIFT),
1773 sp = (int)c & ~UNSIGNALLED;
1774 if (total >= 0) {
1775 if (sp != 0) { // activate idle worker
1776 WorkQueue[] qs; int n; WorkQueue v;
1777 if ((qs = queues) != null && (n = qs.length) > 0 &&
1778 (v = qs[sp & (n - 1)]) != null) {
1779 Thread vt = v.owner;
1780 long nc = ((long)v.stackPred & SP_MASK) | (UC_MASK & c);
1781 if (compareAndSetCtl(c, nc)) {
1782 v.phase = sp;
1783 LockSupport.unpark(vt);
1784 return UNCOMPENSATE;
1785 }
1786 }
1787 return -1; // retry
1788 }
1789 else if (active > minActive) { // reduce parallelism
1790 long nc = ((RC_MASK & (c - RC_UNIT)) | (~RC_MASK & c));
1791 return compareAndSetCtl(c, nc) ? UNCOMPENSATE : -1;
1792 }
1793 }
1794 if (total < maxTotal) { // expand pool
1802 return 0;
1803 else
1804 throw new RejectedExecutionException(
1805 "Thread limit exceeded replacing blocked worker");
1806 }
1807
1808 /**
1809 * Readjusts RC count; called from ForkJoinTask after blocking.
1810 */
1811 final void uncompensate() {
1812 getAndAddCtl(RC_UNIT);
1813 }
1814
1815 /**
1816 * Helps if possible until the given task is done. Scans other
1817 * queues for a task produced by one of w's stealers; returning
1818 * compensated blocking sentinel if none are found.
1819 *
1820 * @param task the task
1821 * @param w caller's WorkQueue
1822 * @return task status on exit, or UNCOMPENSATE for compensated blocking
1823 */
1824 final int helpJoin(ForkJoinTask<?> task, WorkQueue w) {
1825 int s = 0;
1826 if (task != null && w != null) {
1827 int wsrc = w.source, wid = w.config & SMASK, r = wid + 2;
1828 boolean scan = true;
1829 long c = 0L; // track ctl stability
1830 outer: for (;;) {
1831 if ((s = task.status) < 0)
1832 break;
1833 else if (scan = !scan) { // previous scan was empty
1834 if (mode < 0)
1835 ForkJoinTask.cancelIgnoringExceptions(task);
1836 else if (c == (c = ctl) && (s = tryCompensate(c)) >= 0)
1837 break; // block
1838 }
1839 else { // scan for subtasks
1840 WorkQueue[] qs = queues;
1841 int n = (qs == null) ? 0 : qs.length, m = n - 1;
1842 for (int i = n; i > 0; i -= 2, r += 2) {
1843 int j; WorkQueue q, x, y; ForkJoinTask<?>[] a;
1844 if ((q = qs[j = r & m]) != null) {
1845 int sq = q.source & SMASK, cap, b;
1846 if ((a = q.array) != null && (cap = a.length) > 0) {
1847 int k = (cap - 1) & (b = q.base);
1848 int nextBase = b + 1, src = j | SRC, sx;
1849 ForkJoinTask<?> t = WorkQueue.getSlot(a, k);
1850 boolean eligible = sq == wid ||
1851 ((x = qs[sq & m]) != null && // indirect
1852 ((sx = (x.source & SMASK)) == wid ||
1853 ((y = qs[sx & m]) != null && // 2-indirect
1854 (y.source & SMASK) == wid)));
1855 if ((s = task.status) < 0)
1856 break outer;
1857 else if ((q.source & SMASK) != sq ||
1858 q.base != b)
1859 scan = true; // inconsistent
2178 q.push(task, this);
2179 else
2180 externalPush(task);
2181 return task;
2182 }
2183
2184 /**
2185 * Returns common pool queue for an external thread that has
2186 * possibly ever submitted a common pool task (nonzero probe), or
2187 * null if none.
2188 */
2189 static WorkQueue commonQueue() {
2190 ForkJoinPool p; WorkQueue[] qs;
2191 int r = ThreadLocalRandom.getProbe(), n;
2192 return ((p = common) != null && (qs = p.queues) != null &&
2193 (n = qs.length) > 0 && r != 0) ?
2194 qs[(n - 1) & (r << 1)] : null;
2195 }
2196
2197 /**
2198 * If the given executor is a ForkJoinPool, poll and execute
2199 * AsynchronousCompletionTasks from worker's queue until none are
2200 * available or blocker is released.
2201 */
2202 static void helpAsyncBlocker(Executor e, ManagedBlocker blocker) {
2203 WorkQueue w = null; Thread t; ForkJoinWorkerThread wt;
2204 if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
2205 if ((wt = (ForkJoinWorkerThread)t).pool == e)
2206 w = wt.workQueue;
2207 }
2208 else if (e == common)
2209 w = commonQueue();
2210 if (w != null)
2211 w.helpAsyncBlocker(blocker);
2212 }
2213
2214 /**
2215 * Returns a cheap heuristic guide for task partitioning when
2216 * programmers, frameworks, tools, or languages have little or no
2217 * idea about task granularity. In essence, by offering this
2218 * method, we ask users only about tradeoffs in overhead vs
2219 * expected throughput and its variance, rather than how finely to
2220 * partition tasks.
2221 *
2222 * In a steady state strict (tree-structured) computation, each
2223 * thread makes available for stealing enough tasks for other
2224 * threads to remain active. Inductively, if all threads play by
2225 * the same rules, each thread should make available only a
2226 * constant number of tasks.
2227 *
2228 * The minimum useful constant is just 1. But using a value of 1
2229 * would require immediate replenishment upon each steal to
2275 /**
2276 * Possibly initiates and/or completes termination.
2277 *
2278 * @param now if true, unconditionally terminate, else only
2279 * if no work and no active workers
2280 * @param enable if true, terminate when next possible
2281 * @return true if terminating or terminated
2282 */
2283 private boolean tryTerminate(boolean now, boolean enable) {
2284 int md; // try to set SHUTDOWN, then STOP, then help terminate
2285 if (((md = mode) & SHUTDOWN) == 0) {
2286 if (!enable)
2287 return false;
2288 md = getAndBitwiseOrMode(SHUTDOWN);
2289 }
2290 if ((md & STOP) == 0) {
2291 if (!now && !canStop())
2292 return false;
2293 md = getAndBitwiseOrMode(STOP);
2294 }
2295 for (int k = 0; k < 2; ++k) { // twice in case of lagging qs updates
2296 for (ForkJoinTask<?> t; (t = pollScan(false)) != null; )
2297 ForkJoinTask.cancelIgnoringExceptions(t); // help cancel
2298 WorkQueue[] qs; int n; WorkQueue q; Thread thread;
2299 if ((qs = queues) != null && (n = qs.length) > 0) {
2300 for (int j = 1; j < n; j += 2) { // unblock other workers
2301 if ((q = qs[j]) != null && (thread = q.owner) != null &&
2302 !thread.isInterrupted()) {
2303 try {
2304 thread.interrupt();
2305 } catch (Throwable ignore) {
2306 }
2307 }
2308 }
2309 }
2310 ReentrantLock lock; Condition cond; // signal when no workers
2311 if (((md = mode) & TERMINATED) == 0 &&
2312 (md & SMASK) + (short)(ctl >>> TC_SHIFT) <= 0 &&
2313 (getAndBitwiseOrMode(TERMINATED) & TERMINATED) == 0 &&
2314 (lock = registrationLock) != null) {
2315 lock.lock();
2316 if ((cond = termination) != null)
2317 cond.signalAll();
2318 lock.unlock();
2319 }
2320 }
2321 return true;
2322 }
2323
2324 // Exported methods
2325
2326 // Constructors
2327
2328 /**
2329 * Creates a {@code ForkJoinPool} with parallelism equal to {@link
2330 * java.lang.Runtime#availableProcessors}, using defaults for all
2331 * other parameters (see {@link #ForkJoinPool(int,
2332 * ForkJoinWorkerThreadFactory, UncaughtExceptionHandler, boolean,
2333 * int, int, int, Predicate, long, TimeUnit)}).
2334 *
2335 * @throws SecurityException if a security manager exists and
2336 * the caller is not permitted to modify threads
2337 * because it does not hold {@link
2338 * java.lang.RuntimePermission}{@code ("modifyThread")}
2339 */
2523 /**
2524 * Constructor for common pool using parameters possibly
2525 * overridden by system properties
2526 */
2527 private ForkJoinPool(byte forCommonPoolOnly) {
2528 int parallelism = Runtime.getRuntime().availableProcessors() - 1;
2529 ForkJoinWorkerThreadFactory fac = null;
2530 UncaughtExceptionHandler handler = null;
2531 try { // ignore exceptions in accessing/parsing properties
2532 fac = (ForkJoinWorkerThreadFactory) newInstanceFromSystemProperty(
2533 "java.util.concurrent.ForkJoinPool.common.threadFactory");
2534 handler = (UncaughtExceptionHandler) newInstanceFromSystemProperty(
2535 "java.util.concurrent.ForkJoinPool.common.exceptionHandler");
2536 String pp = System.getProperty
2537 ("java.util.concurrent.ForkJoinPool.common.parallelism");
2538 if (pp != null)
2539 parallelism = Integer.parseInt(pp);
2540 } catch (Exception ignore) {
2541 }
2542 int p = this.mode = Math.min(Math.max(parallelism, 0), MAX_CAP);
2543 int size = 1 << (33 - Integer.numberOfLeadingZeros(p > 0 ? p - 1 : 1));
2544 this.factory = (fac != null) ? fac :
2545 new DefaultCommonPoolForkJoinWorkerThreadFactory();
2546 this.ueh = handler;
2547 this.keepAlive = DEFAULT_KEEPALIVE;
2548 this.saturate = null;
2549 this.workerNamePrefix = null;
2550 this.bounds = ((1 - p) & SMASK) | (COMMON_MAX_SPARES << SWIDTH);
2551 this.ctl = ((((long)(-p) << TC_SHIFT) & TC_MASK) |
2552 (((long)(-p) << RC_SHIFT) & RC_MASK));
2553 this.queues = new WorkQueue[size];
2554 this.registrationLock = new ReentrantLock();
2555 }
2556
2557 /**
2558 * Returns the common pool instance. This pool is statically
2559 * constructed; its run state is unaffected by attempts to {@link
2560 * #shutdown} or {@link #shutdownNow}. However this pool and any
2561 * ongoing processing are automatically terminated upon program
2562 * {@link System#exit}. Any program that relies on asynchronous
2563 * task processing to complete before program termination should
2564 * invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence},
2565 * before exit.
2566 *
2567 * @return the common pool instance
2568 * @since 1.8
2569 */
2570 public static ForkJoinPool commonPool() {
2576
2577 /**
2578 * Performs the given task, returning its result upon completion.
2579 * If the computation encounters an unchecked Exception or Error,
2580 * it is rethrown as the outcome of this invocation. Rethrown
2581 * exceptions behave in the same way as regular exceptions, but,
2582 * when possible, contain stack traces (as displayed for example
2583 * using {@code ex.printStackTrace()}) of both the current thread
2584 * as well as the thread actually encountering the exception;
2585 * minimally only the latter.
2586 *
2587 * @param task the task
2588 * @param <T> the type of the task's result
2589 * @return the task's result
2590 * @throws NullPointerException if the task is null
2591 * @throws RejectedExecutionException if the task cannot be
2592 * scheduled for execution
2593 */
2594 public <T> T invoke(ForkJoinTask<T> task) {
2595 externalSubmit(task);
2596 return task.join();
2597 }
2598
2599 /**
2600 * Arranges for (asynchronous) execution of the given task.
2601 *
2602 * @param task the task
2603 * @throws NullPointerException if the task is null
2604 * @throws RejectedExecutionException if the task cannot be
2605 * scheduled for execution
2606 */
2607 public void execute(ForkJoinTask<?> task) {
2608 externalSubmit(task);
2609 }
2610
2611 // AbstractExecutorService methods
2612
2613 /**
2614 * @throws NullPointerException if the task is null
2615 * @throws RejectedExecutionException if the task cannot be
2616 * scheduled for execution
2668 return externalSubmit((task instanceof ForkJoinTask<?>)
2669 ? (ForkJoinTask<Void>) task // avoid re-wrap
2670 : new ForkJoinTask.AdaptedRunnableAction(task));
2671 }
2672
2673 /**
2674 * @throws NullPointerException {@inheritDoc}
2675 * @throws RejectedExecutionException {@inheritDoc}
2676 */
2677 @Override
2678 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
2679 ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
2680 try {
2681 for (Callable<T> t : tasks) {
2682 ForkJoinTask<T> f =
2683 new ForkJoinTask.AdaptedInterruptibleCallable<T>(t);
2684 futures.add(f);
2685 externalSubmit(f);
2686 }
2687 for (int i = futures.size() - 1; i >= 0; --i)
2688 ((ForkJoinTask<?>)futures.get(i)).quietlyJoin();
2689 return futures;
2690 } catch (Throwable t) {
2691 for (Future<T> e : futures)
2692 ForkJoinTask.cancelIgnoringExceptions(e);
2693 throw t;
2694 }
2695 }
2696
2697 @Override
2698 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
2699 long timeout, TimeUnit unit)
2700 throws InterruptedException {
2701 long nanos = unit.toNanos(timeout);
2702 ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
2703 try {
2704 for (Callable<T> t : tasks) {
2705 ForkJoinTask<T> f =
2706 new ForkJoinTask.AdaptedInterruptibleCallable<T>(t);
2707 futures.add(f);
2708 externalSubmit(f);
2709 }
2710 long startTime = System.nanoTime(), ns = nanos;
2711 boolean timedOut = (ns < 0L);
2712 for (int i = futures.size() - 1; i >= 0; --i) {
2713 Future<T> f = futures.get(i);
2714 if (!f.isDone()) {
2715 if (timedOut)
2716 ForkJoinTask.cancelIgnoringExceptions(f);
2717 else {
2718 try {
2719 f.get(ns, TimeUnit.NANOSECONDS);
2720 } catch (CancellationException | TimeoutException |
2721 ExecutionException ok) {
2722 }
2723 if ((ns = nanos - (System.nanoTime() - startTime)) < 0L)
2724 timedOut = true;
2725 }
2726 }
2727 }
2728 return futures;
2729 } catch (Throwable t) {
2730 for (Future<T> e : futures)
2731 ForkJoinTask.cancelIgnoringExceptions(e);
2732 throw t;
2733 }
2734 }
2735
2736 // Task to hold results from InvokeAnyTasks
2737 static final class InvokeAnyRoot<E> extends ForkJoinTask<E> {
2738 private static final long serialVersionUID = 2838392045355241008L;
2739 @SuppressWarnings("serial") // Conditionally serializable
2740 volatile E result;
2741 final AtomicInteger count; // in case all throw
2742 final ForkJoinPool pool; // to check shutdown while collecting
2743 InvokeAnyRoot(int n, ForkJoinPool p) {
2744 pool = p;
2745 count = new AtomicInteger(n);
2746 }
2747 final void tryComplete(Callable<E> c) { // called by InvokeAnyTasks
2748 Throwable ex = null;
2749 boolean failed = (c == null || isCancelled() ||
2750 (pool != null && pool.mode < 0));
2751 if (!failed && !isDone()) {
2752 try {
2753 complete(c.call());
2754 } catch (Throwable tx) {
2755 ex = tx;
2756 failed = true;
2757 }
2758 }
2759 if ((pool != null && pool.mode < 0) ||
2760 (failed && count.getAndDecrement() <= 1))
2761 trySetThrown(ex != null ? ex : new CancellationException());
2762 }
2763 public final boolean exec() { return false; } // never forked
2764 public final E getRawResult() { return result; }
2765 public final void setRawResult(E v) { result = v; }
2766 }
2767
2768 // Variant of AdaptedInterruptibleCallable with results in InvokeAnyRoot
2769 static final class InvokeAnyTask<E> extends ForkJoinTask<E> {
2770 private static final long serialVersionUID = 2838392045355241008L;
2771 final InvokeAnyRoot<E> root;
2772 @SuppressWarnings("serial") // Conditionally serializable
2773 final Callable<E> callable;
2800 }
2801
2802 @Override
2803 public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
2804 throws InterruptedException, ExecutionException {
2805 int n = tasks.size();
2806 if (n <= 0)
2807 throw new IllegalArgumentException();
2808 InvokeAnyRoot<T> root = new InvokeAnyRoot<T>(n, this);
2809 ArrayList<InvokeAnyTask<T>> fs = new ArrayList<>(n);
2810 try {
2811 for (Callable<T> c : tasks) {
2812 if (c == null)
2813 throw new NullPointerException();
2814 InvokeAnyTask<T> f = new InvokeAnyTask<T>(root, c);
2815 fs.add(f);
2816 externalSubmit(f);
2817 if (root.isDone())
2818 break;
2819 }
2820 return root.get();
2821 } finally {
2822 for (InvokeAnyTask<T> f : fs)
2823 ForkJoinTask.cancelIgnoringExceptions(f);
2824 }
2825 }
2826
2827 @Override
2828 public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
2829 long timeout, TimeUnit unit)
2830 throws InterruptedException, ExecutionException, TimeoutException {
2831 long nanos = unit.toNanos(timeout);
2832 int n = tasks.size();
2833 if (n <= 0)
2834 throw new IllegalArgumentException();
2835 InvokeAnyRoot<T> root = new InvokeAnyRoot<T>(n, this);
2836 ArrayList<InvokeAnyTask<T>> fs = new ArrayList<>(n);
2837 try {
2838 for (Callable<T> c : tasks) {
2839 if (c == null)
2840 throw new NullPointerException();
2841 InvokeAnyTask<T> f = new InvokeAnyTask<T>(root, c);
2842 fs.add(f);
2843 externalSubmit(f);
2844 if (root.isDone())
2845 break;
2846 }
2847 return root.get(nanos, TimeUnit.NANOSECONDS);
2848 } finally {
2849 for (InvokeAnyTask<T> f : fs)
2850 ForkJoinTask.cancelIgnoringExceptions(f);
2851 }
2852 }
2853
2854 /**
2855 * Returns the factory used for constructing new workers.
2856 *
2857 * @return the factory used for constructing new workers
2858 */
2859 public ForkJoinWorkerThreadFactory getFactory() {
2860 return factory;
2861 }
2862
2863 /**
2864 * Returns the handler for internal worker threads that terminate
2865 * due to unrecoverable errors encountered while executing tasks.
2866 *
2867 * @return the handler, or {@code null} if none
|
1029 }
1030
1031 /**
1032 * Pops the given task for owner only if it is at the current top.
1033 */
1034 final boolean tryUnpush(ForkJoinTask<?> task) {
1035 int s = top, cap; ForkJoinTask<?>[] a;
1036 if ((a = array) != null && (cap = a.length) > 0 && base != s-- &&
1037 casSlotToNull(a, (cap - 1) & s, task)) {
1038 top = s;
1039 return true;
1040 }
1041 return false;
1042 }
1043
1044 /**
1045 * Locking version of tryUnpush.
1046 */
1047 final boolean externalTryUnpush(ForkJoinTask<?> task) {
1048 boolean taken = false;
1049 for (;;) {
1050 int s = top, cap, k; ForkJoinTask<?>[] a;
1051 if ((a = array) == null || (cap = a.length) <= 0 ||
1052 a[k = (cap - 1) & (s - 1)] != task)
1053 break;
1054 if (tryLock()) {
1055 if (top == s && array == a) {
1056 if (taken = casSlotToNull(a, k, task)) {
1057 top = s - 1;
1058 source = 0;
1059 break;
1060 }
1061 }
1062 source = 0; // release lock for retry
1063 }
1064 Thread.yield(); // trylock failure
1065 }
1066 return taken;
1067 }
1068
1069 /**
1070 * Deep form of tryUnpush: Traverses from top and removes task if
1071 * present, shifting others to fill gap.
1072 */
1073 final boolean tryRemove(ForkJoinTask<?> task, boolean owned) {
1074 boolean taken = false;
1075 int p = top, cap; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
1076 if ((a = array) != null && task != null && (cap = a.length) > 0) {
1077 int m = cap - 1, s = p - 1, d = p - base;
1078 for (int i = s, k; d > 0; --i, --d) {
1079 if ((t = a[k = i & m]) == task) {
1080 if (owned || tryLock()) {
1081 if ((owned || (array == a && top == p)) &&
1082 (taken = casSlotToNull(a, k, t))) {
1083 for (int j = i; j != s; ) // shift down
1084 a[j & m] = getAndClearSlot(a, ++j & m);
1186 final int helpComplete(ForkJoinTask<?> task, boolean owned, int limit) {
1187 int status = 0, cap, k, p, s; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
1188 while (task != null && (status = task.status) >= 0 &&
1189 (a = array) != null && (cap = a.length) > 0 &&
1190 (t = a[k = (cap - 1) & (s = (p = top) - 1)])
1191 instanceof CountedCompleter) {
1192 CountedCompleter<?> f = (CountedCompleter<?>)t;
1193 boolean taken = false;
1194 for (;;) { // exec if root task is a completer of t
1195 if (f == task) {
1196 if (owned) {
1197 if ((taken = casSlotToNull(a, k, t)))
1198 top = s;
1199 }
1200 else if (tryLock()) {
1201 if (top == p && array == a &&
1202 (taken = casSlotToNull(a, k, t)))
1203 top = s;
1204 source = 0;
1205 }
1206 if (taken)
1207 t.doExec();
1208 else if (!owned)
1209 Thread.yield(); // tryLock failure
1210 break;
1211 }
1212 else if ((f = f.completer) == null)
1213 break;
1214 }
1215 if (taken && limit != 0 && --limit == 0)
1216 break;
1217 }
1218 return status;
1219 }
1220
1221 /**
1222 * Tries to poll and run AsynchronousCompletionTasks until
1223 * none found or blocker is released.
1224 *
1225 * @param blocker the blocker
1226 */
1227 final void helpAsyncBlocker(ManagedBlocker blocker) {
1228 int cap, b, d, k; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
1229 while (blocker != null && (d = top - (b = base)) > 0 &&
1230 (a = array) != null && (cap = a.length) > 0 &&
1231 (((t = getSlot(a, k = (cap - 1) & b)) == null && d > 1) ||
1232 t instanceof
1233 CompletableFuture.AsynchronousCompletionTask) &&
1234 !blocker.isReleasable()) {
1235 if (t != null && base == b++ && casSlotToNull(a, k, t)) {
1579 break; // terminating
1580 else {
1581 long nc = (v.stackPred & SP_MASK) | (UC_MASK & (c + RC_UNIT));
1582 Thread vt = v.owner;
1583 if (c == (c = compareAndExchangeCtl(c, nc))) {
1584 v.phase = sp;
1585 LockSupport.unpark(vt); // release idle worker
1586 break;
1587 }
1588 }
1589 }
1590 }
1591
1592 /**
1593 * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
1594 * See above for explanation.
1595 *
1596 * @param w caller's WorkQueue (may be null on failed initialization)
1597 */
1598 final void runWorker(WorkQueue w) {
1599 if (mode >= 0 && w != null) { // skip on failed init
1600 w.config |= SRC; // mark as valid source
1601 int r = w.stackPred, src = 0; // use seed from registerWorker
1602 do {
1603 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
1604 } while ((src = scan(w, src, r)) >= 0 ||
1605 (src = awaitWork(w)) == 0);
1606 }
1607 }
1608
1609 /**
1610 * Scans for and if found executes top-level tasks: Tries to poll
1611 * each queue starting at a random index with random stride,
1612 * returning source id or retry indicator if contended or
1613 * inconsistent.
1614 *
1615 * @param w caller's WorkQueue
1616 * @param prevSrc the previous queue stolen from in current phase, or 0
1617 * @param r random seed
1618 * @return id of queue if taken, negative if none found, prevSrc for retry
1619 */
1704 Thread.interrupted();
1705 else if (deadline == 0L)
1706 LockSupport.park();
1707 else if (deadline - System.currentTimeMillis() > TIMEOUT_SLOP)
1708 LockSupport.parkUntil(deadline);
1709 else if (((int)c & SMASK) == (w.config & SMASK) &&
1710 compareAndSetCtl(c, ((UC_MASK & (c - TC_UNIT)) |
1711 (prevCtl & SP_MASK)))) {
1712 w.config |= QUIET; // sentinel for deregisterWorker
1713 return -1; // drop on timeout
1714 }
1715 else if ((deadline += keepAlive) == 0L)
1716 deadline = 1L; // not at head; restart timer
1717 }
1718 return 0;
1719 }
1720
1721 // Utilities used by ForkJoinTask
1722
1723 /**
1724 * Returns true if can start terminating if enabled, or already terminated
1725 */
1726 final boolean canStop() {
1727 outer: for (long oldSum = 0L;;) { // repeat until stable
1728 int md; WorkQueue[] qs; long c;
1729 if ((qs = queues) == null || ((md = mode) & STOP) != 0)
1730 return true;
1731 if ((md & SMASK) + (int)((c = ctl) >> RC_SHIFT) > 0)
1732 break;
1733 long checkSum = c;
1734 for (int i = 1; i < qs.length; i += 2) { // scan submitters
1735 WorkQueue q; ForkJoinTask<?>[] a; int s = 0, cap;
1736 if ((q = qs[i]) != null && (a = q.array) != null &&
1737 (cap = a.length) > 0 &&
1738 ((s = q.top) != q.base || a[(cap - 1) & s] != null ||
1739 q.source != 0))
1740 break outer;
1741 checkSum += (((long)i) << 32) ^ s;
1742 }
1743 if (oldSum == (oldSum = checkSum) && queues == qs)
1744 return true;
1745 }
1746 return (mode & STOP) != 0; // recheck mode on false return
1747 }
1748
1749 /**
1750 * Tries to decrement counts (sometimes implicitly) and possibly
1751 * arrange for a compensating worker in preparation for
1752 * blocking. May fail due to interference, in which case -1 is
1753 * returned so caller may retry. A zero return value indicates
1754 * that the caller doesn't need to re-adjust counts when later
1755 * unblocked.
1756 *
1757 * @param c incoming ctl value
1758 * @return UNCOMPENSATE: block then adjust, 0: block, -1 : retry
1759 */
1760 private int tryCompensate(long c) {
1761 Predicate<? super ForkJoinPool> sat;
1762 int md = mode, b = bounds;
1763 // counts are signed; centered at parallelism level == 0
1764 int minActive = (short)(b & SMASK),
1765 maxTotal = b >>> SWIDTH,
1766 active = (int)(c >> RC_SHIFT),
1767 total = (short)(c >>> TC_SHIFT),
1768 sp = (int)c & ~UNSIGNALLED;
1769 if ((md & SMASK) == 0)
1770 return 0; // cannot compensate if parallelism zero
1771 else if (total >= 0) {
1772 if (sp != 0) { // activate idle worker
1773 WorkQueue[] qs; int n; WorkQueue v;
1774 if ((qs = queues) != null && (n = qs.length) > 0 &&
1775 (v = qs[sp & (n - 1)]) != null) {
1776 Thread vt = v.owner;
1777 long nc = ((long)v.stackPred & SP_MASK) | (UC_MASK & c);
1778 if (compareAndSetCtl(c, nc)) {
1779 v.phase = sp;
1780 LockSupport.unpark(vt);
1781 return UNCOMPENSATE;
1782 }
1783 }
1784 return -1; // retry
1785 }
1786 else if (active > minActive) { // reduce parallelism
1787 long nc = ((RC_MASK & (c - RC_UNIT)) | (~RC_MASK & c));
1788 return compareAndSetCtl(c, nc) ? UNCOMPENSATE : -1;
1789 }
1790 }
1791 if (total < maxTotal) { // expand pool
1799 return 0;
1800 else
1801 throw new RejectedExecutionException(
1802 "Thread limit exceeded replacing blocked worker");
1803 }
1804
1805 /**
1806 * Readjusts RC count; called from ForkJoinTask after blocking.
1807 */
1808 final void uncompensate() {
1809 getAndAddCtl(RC_UNIT);
1810 }
1811
1812 /**
1813 * Helps if possible until the given task is done. Scans other
1814 * queues for a task produced by one of w's stealers; returning
1815 * compensated blocking sentinel if none are found.
1816 *
1817 * @param task the task
1818 * @param w caller's WorkQueue
1819 * @param canHelp if false, compensate only
1820 * @return task status on exit, or UNCOMPENSATE for compensated blocking
1821 */
1822 final int helpJoin(ForkJoinTask<?> task, WorkQueue w, boolean canHelp) {
1823 int s = 0;
1824 if (task != null && w != null) {
1825 int wsrc = w.source, wid = w.config & SMASK, r = wid + 2;
1826 boolean scan = true;
1827 long c = 0L; // track ctl stability
1828 outer: for (;;) {
1829 if ((s = task.status) < 0)
1830 break;
1831 else if (scan = !scan) { // previous scan was empty
1832 if (mode < 0)
1833 ForkJoinTask.cancelIgnoringExceptions(task);
1834 else if (c == (c = ctl) && (s = tryCompensate(c)) >= 0)
1835 break; // block
1836 }
1837 else if (canHelp) { // scan for subtasks
1838 WorkQueue[] qs = queues;
1839 int n = (qs == null) ? 0 : qs.length, m = n - 1;
1840 for (int i = n; i > 0; i -= 2, r += 2) {
1841 int j; WorkQueue q, x, y; ForkJoinTask<?>[] a;
1842 if ((q = qs[j = r & m]) != null) {
1843 int sq = q.source & SMASK, cap, b;
1844 if ((a = q.array) != null && (cap = a.length) > 0) {
1845 int k = (cap - 1) & (b = q.base);
1846 int nextBase = b + 1, src = j | SRC, sx;
1847 ForkJoinTask<?> t = WorkQueue.getSlot(a, k);
1848 boolean eligible = sq == wid ||
1849 ((x = qs[sq & m]) != null && // indirect
1850 ((sx = (x.source & SMASK)) == wid ||
1851 ((y = qs[sx & m]) != null && // 2-indirect
1852 (y.source & SMASK) == wid)));
1853 if ((s = task.status) < 0)
1854 break outer;
1855 else if ((q.source & SMASK) != sq ||
1856 q.base != b)
1857 scan = true; // inconsistent
2176 q.push(task, this);
2177 else
2178 externalPush(task);
2179 return task;
2180 }
2181
2182 /**
2183 * Returns common pool queue for an external thread that has
2184 * possibly ever submitted a common pool task (nonzero probe), or
2185 * null if none.
2186 */
2187 static WorkQueue commonQueue() {
2188 ForkJoinPool p; WorkQueue[] qs;
2189 int r = ThreadLocalRandom.getProbe(), n;
2190 return ((p = common) != null && (qs = p.queues) != null &&
2191 (n = qs.length) > 0 && r != 0) ?
2192 qs[(n - 1) & (r << 1)] : null;
2193 }
2194
2195 /**
2196 * Returns queue for an external thread, if one exists
2197 */
2198 final WorkQueue externalQueue() {
2199 WorkQueue[] qs;
2200 int r = ThreadLocalRandom.getProbe(), n;
2201 return ((qs = queues) != null && (n = qs.length) > 0 && r != 0) ?
2202 qs[(n - 1) & (r << 1)] : null;
2203 }
2204
2205 /**
2206 * If the given executor is a ForkJoinPool, poll and execute
2207 * AsynchronousCompletionTasks from worker's queue until none are
2208 * available or blocker is released.
2209 */
2210 static void helpAsyncBlocker(Executor e, ManagedBlocker blocker) {
2211 WorkQueue w = null; Thread t; ForkJoinWorkerThread wt;
2212 if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
2213 if ((wt = (ForkJoinWorkerThread)t).pool == e)
2214 w = wt.workQueue;
2215 }
2216 else if (e instanceof ForkJoinPool)
2217 w = ((ForkJoinPool)e).externalQueue();
2218 if (w != null)
2219 w.helpAsyncBlocker(blocker);
2220 }
2221
2222 /**
2223 * Returns a cheap heuristic guide for task partitioning when
2224 * programmers, frameworks, tools, or languages have little or no
2225 * idea about task granularity. In essence, by offering this
2226 * method, we ask users only about tradeoffs in overhead vs
2227 * expected throughput and its variance, rather than how finely to
2228 * partition tasks.
2229 *
2230 * In a steady state strict (tree-structured) computation, each
2231 * thread makes available for stealing enough tasks for other
2232 * threads to remain active. Inductively, if all threads play by
2233 * the same rules, each thread should make available only a
2234 * constant number of tasks.
2235 *
2236 * The minimum useful constant is just 1. But using a value of 1
2237 * would require immediate replenishment upon each steal to
2283 /**
2284 * Possibly initiates and/or completes termination.
2285 *
2286 * @param now if true, unconditionally terminate, else only
2287 * if no work and no active workers
2288 * @param enable if true, terminate when next possible
2289 * @return true if terminating or terminated
2290 */
2291 private boolean tryTerminate(boolean now, boolean enable) {
2292 int md; // try to set SHUTDOWN, then STOP, then help terminate
2293 if (((md = mode) & SHUTDOWN) == 0) {
2294 if (!enable)
2295 return false;
2296 md = getAndBitwiseOrMode(SHUTDOWN);
2297 }
2298 if ((md & STOP) == 0) {
2299 if (!now && !canStop())
2300 return false;
2301 md = getAndBitwiseOrMode(STOP);
2302 }
2303 for (boolean rescan = true;;) { // repeat until no changes
2304 boolean changed = false;
2305 for (ForkJoinTask<?> t; (t = pollScan(false)) != null; ) {
2306 changed = true;
2307 ForkJoinTask.cancelIgnoringExceptions(t); // help cancel
2308 }
2309 WorkQueue[] qs; int n; WorkQueue q; Thread thread;
2310 if ((qs = queues) != null && (n = qs.length) > 0) {
2311 for (int j = 1; j < n; j += 2) { // unblock other workers
2312 if ((q = qs[j]) != null && (thread = q.owner) != null &&
2313 !thread.isInterrupted()) {
2314 changed = true;
2315 try {
2316 thread.interrupt();
2317 } catch (Throwable ignore) {
2318 }
2319 }
2320 }
2321 }
2322 ReentrantLock lock; Condition cond; // signal when no workers
2323 if (((md = mode) & TERMINATED) == 0 &&
2324 (md & SMASK) + (short)(ctl >>> TC_SHIFT) <= 0 &&
2325 (getAndBitwiseOrMode(TERMINATED) & TERMINATED) == 0 &&
2326 (lock = registrationLock) != null) {
2327 lock.lock();
2328 if ((cond = termination) != null)
2329 cond.signalAll();
2330 lock.unlock();
2331 }
2332 if (changed)
2333 rescan = true;
2334 else if (rescan)
2335 rescan = false;
2336 else
2337 break;
2338 }
2339 return true;
2340 }
2341
2342 // Exported methods
2343
2344 // Constructors
2345
2346 /**
2347 * Creates a {@code ForkJoinPool} with parallelism equal to {@link
2348 * java.lang.Runtime#availableProcessors}, using defaults for all
2349 * other parameters (see {@link #ForkJoinPool(int,
2350 * ForkJoinWorkerThreadFactory, UncaughtExceptionHandler, boolean,
2351 * int, int, int, Predicate, long, TimeUnit)}).
2352 *
2353 * @throws SecurityException if a security manager exists and
2354 * the caller is not permitted to modify threads
2355 * because it does not hold {@link
2356 * java.lang.RuntimePermission}{@code ("modifyThread")}
2357 */
2541 /**
2542 * Constructor for common pool using parameters possibly
2543 * overridden by system properties
2544 */
2545 private ForkJoinPool(byte forCommonPoolOnly) {
2546 int parallelism = Runtime.getRuntime().availableProcessors() - 1;
2547 ForkJoinWorkerThreadFactory fac = null;
2548 UncaughtExceptionHandler handler = null;
2549 try { // ignore exceptions in accessing/parsing properties
2550 fac = (ForkJoinWorkerThreadFactory) newInstanceFromSystemProperty(
2551 "java.util.concurrent.ForkJoinPool.common.threadFactory");
2552 handler = (UncaughtExceptionHandler) newInstanceFromSystemProperty(
2553 "java.util.concurrent.ForkJoinPool.common.exceptionHandler");
2554 String pp = System.getProperty
2555 ("java.util.concurrent.ForkJoinPool.common.parallelism");
2556 if (pp != null)
2557 parallelism = Integer.parseInt(pp);
2558 } catch (Exception ignore) {
2559 }
2560 int p = this.mode = Math.min(Math.max(parallelism, 0), MAX_CAP);
2561 int maxSpares = (p == 0) ? 0 : COMMON_MAX_SPARES;
2562 int bnds = ((1 - p) & SMASK) | (maxSpares << SWIDTH);
2563 int size = 1 << (33 - Integer.numberOfLeadingZeros(p > 0 ? p - 1 : 1));
2564 this.factory = (fac != null) ? fac :
2565 new DefaultCommonPoolForkJoinWorkerThreadFactory();
2566 this.ueh = handler;
2567 this.keepAlive = DEFAULT_KEEPALIVE;
2568 this.saturate = null;
2569 this.workerNamePrefix = null;
2570 this.bounds = bnds;
2571 this.ctl = ((((long)(-p) << TC_SHIFT) & TC_MASK) |
2572 (((long)(-p) << RC_SHIFT) & RC_MASK));
2573 this.queues = new WorkQueue[size];
2574 this.registrationLock = new ReentrantLock();
2575 }
2576
2577 /**
2578 * Returns the common pool instance. This pool is statically
2579 * constructed; its run state is unaffected by attempts to {@link
2580 * #shutdown} or {@link #shutdownNow}. However this pool and any
2581 * ongoing processing are automatically terminated upon program
2582 * {@link System#exit}. Any program that relies on asynchronous
2583 * task processing to complete before program termination should
2584 * invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence},
2585 * before exit.
2586 *
2587 * @return the common pool instance
2588 * @since 1.8
2589 */
2590 public static ForkJoinPool commonPool() {
2596
2597 /**
2598 * Performs the given task, returning its result upon completion.
2599 * If the computation encounters an unchecked Exception or Error,
2600 * it is rethrown as the outcome of this invocation. Rethrown
2601 * exceptions behave in the same way as regular exceptions, but,
2602 * when possible, contain stack traces (as displayed for example
2603 * using {@code ex.printStackTrace()}) of both the current thread
2604 * as well as the thread actually encountering the exception;
2605 * minimally only the latter.
2606 *
2607 * @param task the task
2608 * @param <T> the type of the task's result
2609 * @return the task's result
2610 * @throws NullPointerException if the task is null
2611 * @throws RejectedExecutionException if the task cannot be
2612 * scheduled for execution
2613 */
2614 public <T> T invoke(ForkJoinTask<T> task) {
2615 externalSubmit(task);
2616 return task.joinForPoolInvoke(this);
2617 }
2618
2619 /**
2620 * Arranges for (asynchronous) execution of the given task.
2621 *
2622 * @param task the task
2623 * @throws NullPointerException if the task is null
2624 * @throws RejectedExecutionException if the task cannot be
2625 * scheduled for execution
2626 */
2627 public void execute(ForkJoinTask<?> task) {
2628 externalSubmit(task);
2629 }
2630
2631 // AbstractExecutorService methods
2632
2633 /**
2634 * @throws NullPointerException if the task is null
2635 * @throws RejectedExecutionException if the task cannot be
2636 * scheduled for execution
2688 return externalSubmit((task instanceof ForkJoinTask<?>)
2689 ? (ForkJoinTask<Void>) task // avoid re-wrap
2690 : new ForkJoinTask.AdaptedRunnableAction(task));
2691 }
2692
2693 /**
2694 * @throws NullPointerException {@inheritDoc}
2695 * @throws RejectedExecutionException {@inheritDoc}
2696 */
2697 @Override
2698 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
2699 ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
2700 try {
2701 for (Callable<T> t : tasks) {
2702 ForkJoinTask<T> f =
2703 new ForkJoinTask.AdaptedInterruptibleCallable<T>(t);
2704 futures.add(f);
2705 externalSubmit(f);
2706 }
2707 for (int i = futures.size() - 1; i >= 0; --i)
2708 ((ForkJoinTask<?>)futures.get(i)).awaitPoolInvoke(this);
2709 return futures;
2710 } catch (Throwable t) {
2711 for (Future<T> e : futures)
2712 ForkJoinTask.cancelIgnoringExceptions(e);
2713 throw t;
2714 }
2715 }
2716
2717 @Override
2718 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
2719 long timeout, TimeUnit unit)
2720 throws InterruptedException {
2721 long nanos = unit.toNanos(timeout);
2722 ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
2723 try {
2724 for (Callable<T> t : tasks) {
2725 ForkJoinTask<T> f =
2726 new ForkJoinTask.AdaptedInterruptibleCallable<T>(t);
2727 futures.add(f);
2728 externalSubmit(f);
2729 }
2730 long startTime = System.nanoTime(), ns = nanos;
2731 boolean timedOut = (ns < 0L);
2732 for (int i = futures.size() - 1; i >= 0; --i) {
2733 Future<T> f = futures.get(i);
2734 if (!f.isDone()) {
2735 if (timedOut)
2736 ForkJoinTask.cancelIgnoringExceptions(f);
2737 else {
2738 ((ForkJoinTask<T>)f).awaitPoolInvoke(this, ns);
2739 if ((ns = nanos - (System.nanoTime() - startTime)) < 0L)
2740 timedOut = true;
2741 }
2742 }
2743 }
2744 return futures;
2745 } catch (Throwable t) {
2746 for (Future<T> e : futures)
2747 ForkJoinTask.cancelIgnoringExceptions(e);
2748 throw t;
2749 }
2750 }
2751
2752 // Task to hold results from InvokeAnyTasks
2753 static final class InvokeAnyRoot<E> extends ForkJoinTask<E> {
2754 private static final long serialVersionUID = 2838392045355241008L;
2755 @SuppressWarnings("serial") // Conditionally serializable
2756 volatile E result;
2757 final AtomicInteger count; // in case all throw
2758 final ForkJoinPool pool; // to check shutdown while collecting
2759 InvokeAnyRoot(int n, ForkJoinPool p) {
2760 pool = p;
2761 count = new AtomicInteger(n);
2762 }
2763 final void tryComplete(Callable<E> c) { // called by InvokeAnyTasks
2764 Throwable ex = null;
2765 boolean failed;
2766 if (c == null || Thread.interrupted() ||
2767 (pool != null && pool.mode < 0))
2768 failed = true;
2769 else if (isDone())
2770 failed = false;
2771 else {
2772 try {
2773 complete(c.call());
2774 failed = false;
2775 } catch (Throwable tx) {
2776 ex = tx;
2777 failed = true;
2778 }
2779 }
2780 if ((pool != null && pool.mode < 0) ||
2781 (failed && count.getAndDecrement() <= 1))
2782 trySetThrown(ex != null ? ex : new CancellationException());
2783 }
2784 public final boolean exec() { return false; } // never forked
2785 public final E getRawResult() { return result; }
2786 public final void setRawResult(E v) { result = v; }
2787 }
2788
2789 // Variant of AdaptedInterruptibleCallable with results in InvokeAnyRoot
2790 static final class InvokeAnyTask<E> extends ForkJoinTask<E> {
2791 private static final long serialVersionUID = 2838392045355241008L;
2792 final InvokeAnyRoot<E> root;
2793 @SuppressWarnings("serial") // Conditionally serializable
2794 final Callable<E> callable;
2821 }
2822
2823 @Override
2824 public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
2825 throws InterruptedException, ExecutionException {
2826 int n = tasks.size();
2827 if (n <= 0)
2828 throw new IllegalArgumentException();
2829 InvokeAnyRoot<T> root = new InvokeAnyRoot<T>(n, this);
2830 ArrayList<InvokeAnyTask<T>> fs = new ArrayList<>(n);
2831 try {
2832 for (Callable<T> c : tasks) {
2833 if (c == null)
2834 throw new NullPointerException();
2835 InvokeAnyTask<T> f = new InvokeAnyTask<T>(root, c);
2836 fs.add(f);
2837 externalSubmit(f);
2838 if (root.isDone())
2839 break;
2840 }
2841 return root.getForPoolInvoke(this);
2842 } finally {
2843 for (InvokeAnyTask<T> f : fs)
2844 ForkJoinTask.cancelIgnoringExceptions(f);
2845 }
2846 }
2847
2848 @Override
2849 public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
2850 long timeout, TimeUnit unit)
2851 throws InterruptedException, ExecutionException, TimeoutException {
2852 long nanos = unit.toNanos(timeout);
2853 int n = tasks.size();
2854 if (n <= 0)
2855 throw new IllegalArgumentException();
2856 InvokeAnyRoot<T> root = new InvokeAnyRoot<T>(n, this);
2857 ArrayList<InvokeAnyTask<T>> fs = new ArrayList<>(n);
2858 try {
2859 for (Callable<T> c : tasks) {
2860 if (c == null)
2861 throw new NullPointerException();
2862 InvokeAnyTask<T> f = new InvokeAnyTask<T>(root, c);
2863 fs.add(f);
2864 externalSubmit(f);
2865 if (root.isDone())
2866 break;
2867 }
2868 return root.getForPoolInvoke(this, nanos);
2869 } finally {
2870 for (InvokeAnyTask<T> f : fs)
2871 ForkJoinTask.cancelIgnoringExceptions(f);
2872 }
2873 }
2874
2875 /**
2876 * Returns the factory used for constructing new workers.
2877 *
2878 * @return the factory used for constructing new workers
2879 */
2880 public ForkJoinWorkerThreadFactory getFactory() {
2881 return factory;
2882 }
2883
2884 /**
2885 * Returns the handler for internal worker threads that terminate
2886 * due to unrecoverable errors encountered while executing tasks.
2887 *
2888 * @return the handler, or {@code null} if none
|