--- old/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java 2021-01-09 11:34:56.613404361 -0800 +++ new/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java 2021-01-09 11:34:56.289406911 -0800 @@ -49,7 +49,10 @@ import java.util.Collections; import java.util.List; import java.util.function.Predicate; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.LockSupport; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.Condition; /** * An {@link ExecutorService} for running {@link ForkJoinTask}s. @@ -163,7 +166,7 @@ * using a factory that may return {@code null}. However doing so may * cause unjoined tasks to never be executed. * - *

Implementation notes: This implementation restricts the + *

Implementation notes: This implementation restricts the * maximum number of running threads to 32767. Attempts to create * pools with greater than the maximum number result in * {@code IllegalArgumentException}. @@ -230,75 +233,92 @@ * in a circular buffer: * q.array[q.top++ % length] = task; * - * (The actual code needs to null-check and size-check the array, + * The actual code needs to null-check and size-check the array, * uses masking, not mod, for indexing a power-of-two-sized array, - * adds a release fence for publication, and possibly signals - * waiting workers to start scanning -- see below.) Both a - * successful pop and poll mainly entail a CAS of a slot from - * non-null to null. - * - * The pop operation (always performed by owner) is: - * if ((the task at top slot is not null) and - * (CAS slot to null)) - * decrement top and return task; - * - * And the poll operation (usually by a stealer) is - * if ((the task at base slot is not null) and - * (CAS slot to null)) - * increment base and return task; - * - * There are several variants of each of these. Most uses occur - * within operations that also interleave contention or emptiness - * tracking or inspection of elements before extracting them, so - * must interleave these with the above code. When performed by - * owner, getAndSet is used instead of CAS (see for example method - * nextLocalTask) which is usually more efficient, and possible - * because the top index cannot independently change during the - * operation. + * enforces memory ordering, supports resizing, and possibly + * signals waiting workers to start scanning -- see below. + * + * The pop operation (always performed by owner) is of the form: + * if ((task = getAndSet(q.array, (q.top-1) % length, null)) != null) + * decrement top and return task; + * If this fails, the queue is empty. + * + * The poll operation by another stealer thread is, basically: + * if (CAS nonnull task at q.array[q.base % length] to null) + * increment base and return task; + * + * This may fail due to contention, and may be retried. + * Implementations must ensure a consistent snapshot of the base + * index and the task (by looping or trying elsewhere) before + * trying CAS. There isn't actually a method of this form, + * because failure due to inconsistency or contention is handled + * in different ways in different contexts, normally by first + * trying other queues. (For the most straightforward example, see + * method pollScan.) There are further variants for cases + * requiring inspection of elements before extracting them, so + * must interleave these with variants of this code. Also, a more + * efficient version (nextLocalTask) is used for polls by owners. + * It avoids some overhead because the queue cannot be growing + * during call. * * Memory ordering. See "Correct and Efficient Work-Stealing for * Weak Memory Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013 * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an * analysis of memory ordering requirements in work-stealing - * algorithms similar to (but different than) the one used here. - * Extracting tasks in array slots via (fully fenced) CAS provides - * primary synchronization. The base and top indices imprecisely - * guide where to extract from. We do not usually require strict - * orderings of array and index updates. Many index accesses use - * plain mode, with ordering constrained by surrounding context - * (usually with respect to element CASes or the two WorkQueue - * volatile fields source and phase). When not otherwise already - * constrained, reads of "base" by queue owners use acquire-mode, - * and some externally callable methods preface accesses with - * acquire fences. Additionally, to ensure that index update - * writes are not coalesced or postponed in loops etc, "opaque" - * mode is used in a few cases where timely writes are not - * otherwise ensured. The "locked" versions of push- and pop- - * based methods for shared queues differ from owned versions - * because locking already forces some of the ordering. + * algorithms similar to the one used here. Inserting and + * extracting tasks in array slots via volatile or atomic accesses + * or explicit fences provides primary synchronization. + * + * Operations on deque elements require reads and writes of both + * indices and slots. When possible, we allow these to occur in + * any order. Because the base and top indices (along with other + * pool or array fields accessed in many methods) only imprecisely + * guide where to extract from, we let accesses other than the + * element getAndSet/CAS/setVolatile appear in any order, using + * plain mode. But we must still preface some methods (mainly + * those that may be accessed externally) with an acquireFence to + * avoid unbounded staleness. This is equivalent to acting as if + * callers use an acquiring read of the reference to the pool or + * queue when invoking the method, even when they do not. We use + * explicit acquiring reads (getSlot) rather than plain array + * access when acquire mode is required but not otherwise ensured + * by context. To reduce stalls by other stealers, we encourage + * timely writes to the base index by immediately following + * updates with a write of a volatile field that must be updated + * anyway, or an Opaque-mode write if there is no such + * opportunity. * * Because indices and slot contents cannot always be consistent, - * a check that base == top indicates (momentary) emptiness, but - * otherwise may err on the side of possibly making the queue - * appear nonempty when a push, pop, or poll have not fully - * committed, or making it appear empty when an update of top has - * not yet been visibly written. (Method isEmpty() checks the - * case of a partially completed removal of the last element.) - * Because of this, the poll operation, considered individually, - * is not wait-free. One thief cannot successfully continue until - * another in-progress one (or, if previously empty, a push) - * visibly completes. This can stall threads when required to - * consume from a given queue (see method poll()). However, in - * the aggregate, we ensure at least probabilistic - * non-blockingness. If an attempted steal fails, a scanning - * thief chooses a different random victim target to try next. So, - * in order for one thief to progress, it suffices for any - * in-progress poll or new push on any empty queue to complete. + * the emptiness check base == top is only quiescently accurate + * (and so used where this suffices). Otherwise, it may err on the + * side of possibly making the queue appear nonempty when a push, + * pop, or poll have not fully committed, or making it appear + * empty when an update of top or base has not yet been seen. + * Similarly, the check in push for the queue array being full may + * trigger when not completely full, causing a resize earlier than + * required. + * + * Mainly because of these potential inconsistencies among slots + * vs indices, the poll operation, considered individually, is not + * wait-free. One thief cannot successfully continue until another + * in-progress one (or, if previously empty, a push) visibly + * completes. This can stall threads when required to consume + * from a given queue (which may spin). However, in the + * aggregate, we ensure probabilistic non-blockingness at least + * until checking quiescence (which is intrinsically blocking): + * If an attempted steal fails, a scanning thief chooses a + * different victim target to try next. So, in order for one thief + * to progress, it suffices for any in-progress poll or new push + * on any empty queue to complete. The worst cases occur when many + * threads are looking for tasks being produced by a stalled + * producer. * * This approach also enables support of a user mode in which * local task processing is in FIFO, not LIFO order, simply by * using poll rather than pop. This can be useful in - * message-passing frameworks in which tasks are never joined. + * message-passing frameworks in which tasks are never joined, + * although with increased contention among task producers and + * consumers. * * WorkQueues are also used in a similar way for tasks submitted * to the pool. We cannot mix these tasks in the same queues used @@ -308,13 +328,12 @@ * choosing existing queues, and may be randomly repositioned upon * contention with other submitters. In essence, submitters act * like workers except that they are restricted to executing local - * tasks that they submitted. Insertion of tasks in shared mode - * requires a lock but we use only a simple spinlock (using field - * phase), because submitters encountering a busy queue move to a - * different position to use or create other queues -- they block - * only when creating and registering new queues. Because it is - * used only as a spinlock, unlocking requires only a "releasing" - * store (using setRelease) unless otherwise signalling. + * tasks that they submitted (or when known, subtasks thereof). + * Insertion of tasks in shared mode requires a lock. We use only + * a simple spinlock (using field "source"), because submitters + * encountering a busy queue move to a different position to use + * or create other queues. They block only when registering new + * queues. * * Management * ========== @@ -322,16 +341,17 @@ * The main throughput advantages of work-stealing stem from * decentralized control -- workers mostly take tasks from * themselves or each other, at rates that can exceed a billion - * per second. The pool itself creates, activates (enables - * scanning for and running tasks), deactivates, blocks, and - * terminates threads, all with minimal central information. - * There are only a few properties that we can globally track or - * maintain, so we pack them into a small number of variables, - * often maintaining atomicity without blocking or locking. - * Nearly all essentially atomic control state is held in a few - * volatile variables that are by far most often read (not - * written) as status and consistency checks. We pack as much - * information into them as we can. + * per second. Most non-atomic control is performed by some form + * of scanning across or within queues. The pool itself creates, + * activates (enables scanning for and running tasks), + * deactivates, blocks, and terminates threads, all with minimal + * central information. There are only a few properties that we + * can globally track or maintain, so we pack them into a small + * number of variables, often maintaining atomicity without + * blocking or locking. Nearly all essentially atomic control + * state is held in a few volatile variables that are by far most + * often read (not written) as status and consistency checks. We + * pack as much information into them as we can. * * Field "ctl" contains 64 bits holding information needed to * atomically decide to add, enqueue (on an event queue), and @@ -343,30 +363,28 @@ * * Field "mode" holds configuration parameters as well as lifetime * status, atomically and monotonically setting SHUTDOWN, STOP, - * and finally TERMINATED bits. + * and finally TERMINATED bits. It is updated only via bitwise + * atomics (getAndBitwiseOr). * - * Field "workQueues" holds references to WorkQueues. It is - * updated (only during worker creation and termination) under - * lock (using field workerNamePrefix as lock), but is otherwise - * concurrently readable, and accessed directly. We also ensure - * that uses of the array reference itself never become too stale - * in case of resizing, by arranging that (re-)reads are separated - * by at least one acquiring read access. To simplify index-based - * operations, the array size is always a power of two, and all - * readers must tolerate null slots. Worker queues are at odd - * indices. Shared (submission) queues are at even indices, up to - * a maximum of 64 slots, to limit growth even if the array needs - * to expand to add more workers. Grouping them together in this - * way simplifies and speeds up task scanning. + * Array "queues" holds references to WorkQueues. It is updated + * (only during worker creation and termination) under the + * registrationLock, but is otherwise concurrently readable, and + * accessed directly (although always prefaced by acquireFences or + * other acquiring reads). To simplify index-based operations, the + * array size is always a power of two, and all readers must + * tolerate null slots. Worker queues are at odd indices. Worker + * ids masked with SMASK match their index. Shared (submission) + * queues are at even indices. Grouping them together in this way + * simplifies and speeds up task scanning. * * All worker thread creation is on-demand, triggered by task * submissions, replacement of terminated workers, and/or * compensation for blocked workers. However, all other support * code is set up to work with other policies. To ensure that we - * do not hold on to worker references that would prevent GC, all - * accesses to workQueues are via indices into the workQueues - * array (which is one source of some of the messy code - * constructions here). In essence, the workQueues array serves as + * do not hold on to worker or task references that would prevent + * GC, all accesses to workQueues are via indices into the + * queues array (which is one source of some of the messy code + * constructions here). In essence, the queues array serves as * a weak reference mechanism. Thus for example the stack top * subfield of ctl stores indices, not references. * @@ -375,31 +393,26 @@ * none can be found immediately, and we cannot start/resume * workers unless there appear to be tasks available. On the * other hand, we must quickly prod them into action when new - * tasks are submitted or generated. In many usages, ramp-up time + * tasks are submitted or generated. These latencies are mainly a + * function of JVM park/unpark (and underlying OS) performance, + * which can be slow and variable. In many usages, ramp-up time * is the main limiting factor in overall performance, which is * compounded at program start-up by JIT compilation and - * allocation. So we streamline this as much as possible. + * allocation. On the other hand, throughput degrades when too + * many threads poll for too few tasks. * - * The "ctl" field atomically maintains total worker and - * "released" worker counts, plus the head of the available worker - * queue (actually stack, represented by the lower 32bit subfield - * of ctl). Released workers are those known to be scanning for + * The "ctl" field atomically maintains total and "released" + * worker counts, plus the head of the available worker queue + * (actually stack, represented by the lower 32bit subfield of + * ctl). Released workers are those known to be scanning for * and/or running tasks. Unreleased ("available") workers are * recorded in the ctl stack. These workers are made available for - * signalling by enqueuing in ctl (see method runWorker). The + * signalling by enqueuing in ctl (see method awaitWork). The * "queue" is a form of Treiber stack. This is ideal for * activating threads in most-recently used order, and improves * performance and locality, outweighing the disadvantages of * being prone to contention and inability to release a worker - * unless it is topmost on stack. To avoid missed signal problems - * inherent in any wait/signal design, available workers rescan - * for (and if found run) tasks after enqueuing. Normally their - * release status will be updated while doing so, but the released - * worker ctl count may underestimate the number of active - * threads. (However, it is still possible to determine quiescence - * via a validation traversal -- see isQuiescent). After an - * unsuccessful rescan, available workers are blocked until - * signalled (see signalWork). The top stack state holds the + * unless it is topmost on stack. The top stack state holds the * value of the "phase" field of the worker: its index and status, * plus a version counter that, in addition to the count subfields * (also serving as version stamps) provide protection against @@ -407,109 +420,115 @@ * * Creating workers. To create a worker, we pre-increment counts * (serving as a reservation), and attempt to construct a - * ForkJoinWorkerThread via its factory. Upon construction, the - * new thread invokes registerWorker, where it constructs a - * WorkQueue and is assigned an index in the workQueues array - * (expanding the array if necessary). The thread is then started. - * Upon any exception across these steps, or null return from - * factory, deregisterWorker adjusts counts and records - * accordingly. If a null return, the pool continues running with - * fewer than the target number workers. If exceptional, the - * exception is propagated, generally to some external caller. - * Worker index assignment avoids the bias in scanning that would - * occur if entries were sequentially packed starting at the front - * of the workQueues array. We treat the array as a simple - * power-of-two hash table, expanding as needed. The seedIndex - * increment ensures no collisions until a resize is needed or a - * worker is deregistered and replaced, and thereafter keeps - * probability of collision low. We cannot use - * ThreadLocalRandom.getProbe() for similar purposes here because - * the thread has not started yet, but do so for creating - * submission queues for existing external threads (see - * externalPush). + * ForkJoinWorkerThread via its factory. On starting, the new + * thread first invokes registerWorker, where it constructs a + * WorkQueue and is assigned an index in the queues array + * (expanding the array if necessary). Upon any exception across + * these steps, or null return from factory, deregisterWorker + * adjusts counts and records accordingly. If a null return, the + * pool continues running with fewer than the target number + * workers. If exceptional, the exception is propagated, generally + * to some external caller. * * WorkQueue field "phase" is used by both workers and the pool to * manage and track whether a worker is UNSIGNALLED (possibly * blocked waiting for a signal). When a worker is enqueued its - * phase field is set. Note that phase field updates lag queue CAS - * releases so usage requires care -- seeing a negative phase does - * not guarantee that the worker is available. When queued, the - * lower 16 bits of scanState must hold its pool index. So we - * place the index there upon initialization and otherwise keep it - * there or restore it when necessary. + * phase field is set negative. Note that phase field updates lag + * queue CAS releases; seeing a negative phase does not guarantee + * that the worker is available. When queued, the lower 16 bits of + * its phase must hold its pool index. So we place the index there + * upon initialization and never modify these bits. * * The ctl field also serves as the basis for memory * synchronization surrounding activation. This uses a more * efficient version of a Dekker-like rule that task producers and * consumers sync with each other by both writing/CASing ctl (even - * if to its current value). This would be extremely costly. So - * we relax it in several ways: (1) Producers only signal when - * their queue is possibly empty at some point during a push - * operation. (2) Other workers propagate this signal - * when they find tasks in a queue with size greater than one. (3) - * Workers only enqueue after scanning (see below) and not finding - * any tasks. (4) Rather than CASing ctl to its current value in - * the common case where no action is required, we reduce write - * contention by equivalently prefacing signalWork when called by - * an external task producer using a memory access with - * full-volatile semantics or a "fullFence". - * - * Almost always, too many signals are issued, in part because a - * task producer cannot tell if some existing worker is in the - * midst of finishing one task (or already scanning) and ready to - * take another without being signalled. So the producer might - * instead activate a different worker that does not find any - * work, and then inactivates. This scarcely matters in - * steady-state computations involving all workers, but can create - * contention and bookkeeping bottlenecks during ramp-up, + * if to its current value). However, rather than CASing ctl to + * its current value in the common case where no action is + * required, we reduce write contention by ensuring that + * signalWork invocations are prefaced with a full-volatile memory + * access (which is usually needed anyway). + * + * Signalling. Signals (in signalWork) cause new or reactivated + * workers to scan for tasks. Method signalWork and its callers + * try to approximate the unattainable goal of having the right + * number of workers activated for the tasks at hand, but must err + * on the side of too many workers vs too few to avoid stalls. If + * computations are purely tree structured, it suffices for every + * worker to activate another when it pushes a task into an empty + * queue, resulting in O(log(#threads)) steps to full activation. + * If instead, tasks come in serially from only a single producer, + * each worker taking its first (since the last quiescence) task + * from a queue should signal another if there are more tasks in + * that queue. This is equivalent to, but generally faster than, + * arranging the stealer take two tasks, re-pushing one on its own + * queue, and signalling (because its queue is empty), also + * resulting in logarithmic full activation time. Because we don't + * know about usage patterns (or most commonly, mixtures), we use + * both approaches. We approximate the second rule by arranging + * that workers in scan() do not repeat signals when repeatedly + * taking tasks from any given queue, by remembering the previous + * one. There are narrow windows in which both rules may apply, + * leading to duplicate or unnecessary signals. Despite such + * limitations, these rules usually avoid slowdowns that otherwise + * occur when too many workers contend to take too few tasks, or + * when producers waste most of their time resignalling. However, + * contention and overhead effects may still occur during ramp-up, * ramp-down, and small computations involving only a few workers. * - * Scanning. Method scan (from runWorker) performs top-level - * scanning for tasks. (Similar scans appear in helpQuiesce and - * pollScan.) Each scan traverses and tries to poll from each - * queue starting at a random index. Scans are not performed in - * ideal random permutation order, to reduce cacheline - * contention. The pseudorandom generator need not have - * high-quality statistical properties in the long term, but just - * within computations; We use Marsaglia XorShifts (often via - * ThreadLocalRandom.nextSecondarySeed), which are cheap and - * suffice. Scanning also includes contention reduction: When - * scanning workers fail to extract an apparently existing task, - * they soon restart at a different pseudorandom index. This form - * of backoff improves throughput when many threads are trying to - * take tasks from few queues, which can be common in some usages. - * Scans do not otherwise explicitly take into account core - * affinities, loads, cache localities, etc, However, they do + * Scanning. Method scan performs top-level scanning for (and + * execution of) tasks. Scans by different workers and/or at + * different times are unlikely to poll queues in the same + * order. Each scan traverses and tries to poll from each queue in + * a pseudorandom permutation order by starting at a random index, + * and using a constant cyclically exhaustive stride; restarting + * upon contention. (Non-top-level scans; for example in + * helpJoin, use simpler linear probes because they do not + * systematically contend with top-level scans.) The pseudorandom + * generator need not have high-quality statistical properties in + * the long term. We use Marsaglia XorShifts, seeded with the Weyl + * sequence from ThreadLocalRandom probes, which are cheap and + * suffice. Scans do not otherwise explicitly take into account + * core affinities, loads, cache localities, etc, However, they do * exploit temporal locality (which usually approximates these) by * preferring to re-poll from the same queue after a successful - * poll before trying others (see method topLevelExec). However - * this preference is bounded (see TOP_BOUND_SHIFT) as a safeguard - * against infinitely unfair looping under unbounded user task - * recursion, and also to reduce long-term contention when many - * threads poll few queues holding many small tasks. The bound is - * high enough to avoid much impact on locality and scheduling - * overhead. + * poll before trying others (see method topLevelExec). This + * reduces fairness, which is partially counteracted by using a + * one-shot form of poll (tryPoll) that may lose to other workers. + * + * Deactivation. Method scan returns a sentinel when no tasks are + * found, leading to deactivation (see awaitWork). The count + * fields in ctl allow accurate discovery of quiescent states + * (i.e., when all workers are idle) after deactivation. However, + * this may also race with new (external) submissions, so a + * recheck is also needed to determine quiescence. Upon apparently + * triggering quiescence, awaitWork re-scans and self-signals if + * it may have missed a signal. In other cases, a missed signal + * may transiently lower parallelism because deactivation does not + * necessarily mean that there is no more work, only that that + * there were no tasks not taken by other workers. But more + * signals are generated (see above) to eventually reactivate if + * needed. * * Trimming workers. To release resources after periods of lack of * use, a worker starting to wait when the pool is quiescent will - * time out and terminate (see method runWorker) if the pool has - * remained quiescent for period given by field keepAlive. + * time out and terminate if the pool has remained quiescent for + * period given by field keepAlive. * * Shutdown and Termination. A call to shutdownNow invokes - * tryTerminate to atomically set a runState bit. The calling - * thread, as well as every other worker thereafter terminating, - * helps terminate others by cancelling their unprocessed tasks, - * and waking them up, doing so repeatedly until stable. Calls to - * non-abrupt shutdown() preface this by checking whether - * termination should commence by sweeping through queues (until - * stable) to ensure lack of in-flight submissions and workers - * about to process them before triggering the "STOP" phase of + * tryTerminate to atomically set a mode bit. The calling thread, + * as well as every other worker thereafter terminating, helps + * terminate others by cancelling their unprocessed tasks, and + * waking them up. Calls to non-abrupt shutdown() preface this by + * checking isQuiescent before triggering the "STOP" phase of * termination. * * Joining Tasks * ============= * - * Any of several actions may be taken when one worker is waiting + * Normally, the first option when joining a task that is not done + * is to try to unfork it from local queue and run it. Otherwise, + * any of several actions may be taken when one worker is waiting * to join a task stolen (or always held) by another. Because we * are multiplexing many tasks on to a pool of workers, we can't * always just let them block (as in Thread.join). We also cannot @@ -520,40 +539,62 @@ * Instead we combine two tactics: * * Helping: Arranging for the joiner to execute some task that it - * would be running if the steal had not occurred. + * could be running if the steal had not occurred. * * Compensating: Unless there are already enough live threads, * method tryCompensate() may create or re-activate a spare * thread to compensate for blocked joiners until they unblock. * - * A third form (implemented in tryRemoveAndExec) amounts to - * helping a hypothetical compensator: If we can readily tell that - * a possible action of a compensator is to steal and execute the + * A third form (implemented via tryRemove) amounts to helping a + * hypothetical compensator: If we can readily tell that a + * possible action of a compensator is to steal and execute the * task being joined, the joining thread can do so directly, - * without the need for a compensation thread. + * without the need for a compensation thread; although with a + * (rare) possibility of reduced parallelism because of a + * transient gap in the queue array. + * + * Other intermediate forms available for specific task types (for + * example helpAsyncBlocker) often avoid or postpone the need for + * blocking or compensation. * * The ManagedBlocker extension API can't use helping so relies * only on compensation in method awaitBlocker. * - * The algorithm in awaitJoin entails a form of "linear helping". - * Each worker records (in field source) the id of the queue from - * which it last stole a task. The scan in method awaitJoin uses - * these markers to try to find a worker to help (i.e., steal back - * a task from and execute it) that could hasten completion of the - * actively joined task. Thus, the joiner executes a task that - * would be on its own local deque if the to-be-joined task had - * not been stolen. This is a conservative variant of the approach - * described in Wagner & Calder "Leapfrogging: a portable + * The algorithm in helpJoin entails a form of "linear helping". + * Each worker records (in field "source") the id of the queue + * from which it last stole a task. The scan in method helpJoin + * uses these markers to try to find a worker to help (i.e., steal + * back a task from and execute it) that could hasten completion + * of the actively joined task. Thus, the joiner executes a task + * that would be on its own local deque if the to-be-joined task + * had not been stolen. This is a conservative variant of the + * approach described in Wagner & Calder "Leapfrogging: a portable * technique for implementing efficient futures" SIGPLAN Notices, * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs * mainly in that we only record queue ids, not full dependency - * links. This requires a linear scan of the workQueues array to + * links. This requires a linear scan of the queues array to * locate stealers, but isolates cost to when it is needed, rather - * than adding to per-task overhead. Searches can fail to locate - * stealers GC stalls and the like delay recording sources. - * Further, even when accurately identified, stealers might not - * ever produce a task that the joiner can in turn help with. So, - * compensation is tried upon failure to find tasks to run. + * than adding to per-task overhead. Also, searches are limited to + * direct and at most two levels of indirect stealers, after which + * there are rapidly diminishing returns on increased overhead. + * Searches can fail to locate stealers when stalls delay + * recording sources. Further, even when accurately identified, + * stealers might not ever produce a task that the joiner can in + * turn help with. So, compensation is tried upon failure to find + * tasks to run. + * + * Joining CountedCompleters (see helpComplete) differs from (and + * is generally more efficient than) other cases because task + * eligibility is determined by checking completion chains rather + * than tracking stealers. + * + * Joining under timeouts (ForkJoinTask timed get) uses a + * constrained mixture of helping and compensating in part because + * pools (actually, only the common pool) may not have any + * available threads: If the pool is saturated (all available + * workers are busy), the caller tries to remove and otherwise + * help; else it blocks under compensation so that it may time out + * independently of any tasks. * * Compensation does not by default aim to keep exactly the target * parallelism number of unblocked threads running at any given @@ -578,8 +619,8 @@ * footprint to the setup of about a dozen fields. * * When external threads submit to the common pool, they can - * perform subtask processing (see externalHelpComplete and - * related methods) upon joins. This caller-helps policy makes it + * perform subtask processing (see helpComplete and related + * methods) upon joins. This caller-helps policy makes it * sensible to set common pool parallelism level to one (or more) * less than the total number of available cores, or even zero for * pure caller-runs. We do not need to record whether external @@ -595,39 +636,61 @@ * InnocuousForkJoinWorkerThread when there is a SecurityManager * present. These workers have no permissions set, do not belong * to any user-defined ThreadGroup, and erase all ThreadLocals - * after executing any top-level task (see - * WorkQueue.afterTopLevelExec). The associated mechanics (mainly - * in ForkJoinWorkerThread) may be JVM-dependent and must access - * particular Thread class fields to achieve this effect. + * after executing any top-level task. The associated mechanics + * may be JVM-dependent and must access particular Thread class + * fields to achieve this effect. + * + * Interrupt handling + * ================== + * + * The framework is designed to manage task cancellation + * (ForkJoinTask.cancel) independently from the interrupt status + * of threads running tasks. (See the public ForkJoinTask + * documentation for rationale.) Interrupts are issued only in + * tryTerminate, when workers should be terminating and tasks + * should be cancelled anyway. Interrupts are cleared only when + * necessary to ensure that calls to LockSupport.park do not loop + * indefinitely (park returns immediately if the current thread is + * interrupted). If so, interruption is reinstated after blocking + * if status could be visible during the scope of any task. For + * cases in which task bodies are specified or desired to + * interrupt upon cancellation, ForkJoinTask.cancel can be + * overridden to do so (as is done for invoke{Any,All}). * * Memory placement * ================ * * Performance can be very sensitive to placement of instances of * ForkJoinPool and WorkQueues and their queue arrays. To reduce - * false-sharing impact, the @Contended annotation isolates - * adjacent WorkQueue instances, as well as the ForkJoinPool.ctl - * field. WorkQueue arrays are allocated (by their threads) with - * larger initial sizes than most ever need, mostly to reduce - * false sharing with current garbage collectors that use cardmark - * tables. + * false-sharing impact, the @Contended annotation isolates the + * ForkJoinPool.ctl field as well as the most heavily written + * WorkQueue fields. These mainly reduce cache traffic by scanners. + * WorkQueue arrays are presized large enough to avoid resizing + * (which transiently reduces throughput) in most tree-like + * computations, although not in some streaming usages. Initial + * sizes are not large enough to avoid secondary contention + * effects (especially for GC cardmarks) when queues are placed + * near each other in memory. This is common, but has different + * impact in different collectors and remains incompletely + * addressed. * * Style notes * =========== * - * Memory ordering relies mainly on VarHandles. This can be + * Memory ordering relies mainly on atomic operations (CAS, + * getAndSet, getAndAdd) along with explicit fences. This can be * awkward and ugly, but also reflects the need to control * outcomes across the unusual cases that arise in very racy code * with very few invariants. All fields are read into locals - * before use, and null-checked if they are references. Array - * accesses using masked indices include checks (that are always - * true) that the array length is non-zero to avoid compilers - * inserting more expensive traps. This is usually done in a - * "C"-like style of listing declarations at the heads of methods - * or blocks, and using inline assignments on first encounter. - * Nearly all explicit checks lead to bypass/return, not exception - * throws, because they may legitimately arise due to - * cancellation/revocation during shutdown. + * before use, and null-checked if they are references, even if + * they can never be null under current usages. Array accesses + * using masked indices include checks (that are always true) that + * the array length is non-zero to avoid compilers inserting more + * expensive traps. This is usually done in a "C"-like style of + * listing declarations at the heads of methods or blocks, and + * using inline assignments on first encounter. Nearly all + * explicit checks lead to bypass/return, not exception throws, + * because they may legitimately arise during shutdown. * * There is a lot of representation-level coupling among classes * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask. The @@ -652,6 +715,22 @@ * (6) Callbacks and other support for ForkJoinTask methods * (7) Exported methods * (8) Static block initializing statics in minimally dependent order + * + * Revision notes + * ============== + * + * The main sources of differences of January 2020 ForkJoin + * classes from previous version are: + * + * * ForkJoinTask now uses field "aux" to support blocking joins + * and/or record exceptions, replacing reliance on builtin + * monitors and side tables. + * * Scans probe slots (vs compare indices), along with related + * changes that reduce performance differences across most + * garbage collectors, and reduce contention. + * * Refactoring for better integration of special task types and + * other capabilities that had been incrementally tacked on. Plus + * many minor reworkings to improve consistency. */ // Static utilities @@ -666,6 +745,14 @@ security.checkPermission(modifyThreadPermission); } + static AccessControlContext contextWithPermissions(Permission ... perms) { + Permissions permissions = new Permissions(); + for (Permission perm : perms) + permissions.add(perm); + return new AccessControlContext( + new ProtectionDomain[] { new ProtectionDomain(null, permissions) }); + } + // Nested classes /** @@ -693,379 +780,389 @@ public ForkJoinWorkerThread newThread(ForkJoinPool pool); } - static AccessControlContext contextWithPermissions(Permission ... perms) { - Permissions permissions = new Permissions(); - for (Permission perm : perms) - permissions.add(perm); - return new AccessControlContext( - new ProtectionDomain[] { new ProtectionDomain(null, permissions) }); - } - /** * Default ForkJoinWorkerThreadFactory implementation; creates a * new ForkJoinWorkerThread using the system class loader as the * thread context class loader. */ - private static final class DefaultForkJoinWorkerThreadFactory + static final class DefaultForkJoinWorkerThreadFactory implements ForkJoinWorkerThreadFactory { + // ACC for access to the factory private static final AccessControlContext ACC = contextWithPermissions( new RuntimePermission("getClassLoader"), new RuntimePermission("setContextClassLoader")); - public final ForkJoinWorkerThread newThread(ForkJoinPool pool) { return AccessController.doPrivileged( new PrivilegedAction<>() { public ForkJoinWorkerThread run() { - return new ForkJoinWorkerThread( - pool, ClassLoader.getSystemClassLoader()); }}, + return new ForkJoinWorkerThread(null, pool, true, false); + }}, ACC); } } + /** + * Factory for CommonPool unless overridden by System property. + * Creates InnocuousForkJoinWorkerThreads if a security manager is + * present at time of invocation. Support requires that we break + * quite a lot of encapsulation (some via helper methods in + * ThreadLocalRandom) to access and set Thread fields. + */ + static final class DefaultCommonPoolForkJoinWorkerThreadFactory + implements ForkJoinWorkerThreadFactory { + private static final AccessControlContext ACC = contextWithPermissions( + modifyThreadPermission, + new RuntimePermission("enableContextClassLoaderOverride"), + new RuntimePermission("modifyThreadGroup"), + new RuntimePermission("getClassLoader"), + new RuntimePermission("setContextClassLoader")); + + public final ForkJoinWorkerThread newThread(ForkJoinPool pool) { + return AccessController.doPrivileged( + new PrivilegedAction<>() { + public ForkJoinWorkerThread run() { + return System.getSecurityManager() == null ? + new ForkJoinWorkerThread(null, pool, true, true): + new ForkJoinWorkerThread. + InnocuousForkJoinWorkerThread(pool); }}, + ACC); + } + } + // Constants shared across ForkJoinPool and WorkQueue // Bounds static final int SWIDTH = 16; // width of short static final int SMASK = 0xffff; // short bits == max index static final int MAX_CAP = 0x7fff; // max #workers - 1 - static final int SQMASK = 0x007e; // max 64 (even) slots // Masks and units for WorkQueue.phase and ctl sp subfield static final int UNSIGNALLED = 1 << 31; // must be negative static final int SS_SEQ = 1 << 16; // version count - static final int QLOCK = 1; // must be 1 - // Mode bits and sentinels, some also used in WorkQueue id and.source fields - static final int OWNED = 1; // queue has owner thread + // Mode bits and sentinels, some also used in WorkQueue fields static final int FIFO = 1 << 16; // fifo queue or access mode - static final int SHUTDOWN = 1 << 18; - static final int TERMINATED = 1 << 19; + static final int SRC = 1 << 17; // set for valid queue ids + static final int INNOCUOUS = 1 << 18; // set for Innocuous workers + static final int QUIET = 1 << 19; // quiescing phase or source + static final int SHUTDOWN = 1 << 24; + static final int TERMINATED = 1 << 25; static final int STOP = 1 << 31; // must be negative - static final int QUIET = 1 << 30; // not scanning or working - static final int DORMANT = QUIET | UNSIGNALLED; - - /** - * Initial capacity of work-stealing queue array. - * Must be a power of two, at least 2. - */ - static final int INITIAL_QUEUE_CAPACITY = 1 << 13; + static final int UNCOMPENSATE = 1 << 16; // tryCompensate return /** - * Maximum capacity for queue arrays. Must be a power of two less - * than or equal to 1 << (31 - width of array entry) to ensure - * lack of wraparound of index calculations, but defined to a - * value a bit less than this to help users trap runaway programs - * before saturating systems. + * Initial capacity of work-stealing queue array. Must be a power + * of two, at least 2. See above. */ - static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M - - /** - * The maximum number of top-level polls per worker before - * checking other queues, expressed as a bit shift. See above for - * rationale. - */ - static final int TOP_BOUND_SHIFT = 10; + static final int INITIAL_QUEUE_CAPACITY = 1 << 8; /** * Queues supporting work-stealing as well as external task * submission. See above for descriptions and algorithms. */ - @jdk.internal.vm.annotation.Contended static final class WorkQueue { - volatile int source; // source queue id, or sentinel - int id; // pool index, mode, tag - int base; // index of next slot for poll - int top; // index of next slot for push - volatile int phase; // versioned, negative: queued, 1: locked + volatile int phase; // versioned, negative if inactive int stackPred; // pool stack (ctl) predecessor link - int nsteals; // number of steals + int config; // index, mode, ORed with SRC after init + int base; // index of next slot for poll ForkJoinTask[] array; // the queued tasks; power of 2 size - final ForkJoinPool pool; // the containing pool (may be null) final ForkJoinWorkerThread owner; // owning thread or null if shared - WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) { - this.pool = pool; - this.owner = owner; - // Place indices in the center of array (that is not yet allocated) - base = top = INITIAL_QUEUE_CAPACITY >>> 1; + // segregate fields frequently updated but not read by scans or steals + @jdk.internal.vm.annotation.Contended("w") + int top; // index of next slot for push + @jdk.internal.vm.annotation.Contended("w") + volatile int source; // source queue id, lock, or sentinel + @jdk.internal.vm.annotation.Contended("w") + int nsteals; // number of steals from other queues + + // Support for atomic operations + private static final VarHandle QA; // for array slots + private static final VarHandle SOURCE; + private static final VarHandle BASE; + static final ForkJoinTask getSlot(ForkJoinTask[] a, int i) { + return (ForkJoinTask)QA.getAcquire(a, i); + } + static final ForkJoinTask getAndClearSlot(ForkJoinTask[] a, + int i) { + return (ForkJoinTask)QA.getAndSet(a, i, null); + } + static final void setSlotVolatile(ForkJoinTask[] a, int i, + ForkJoinTask v) { + QA.setVolatile(a, i, v); + } + static final boolean casSlotToNull(ForkJoinTask[] a, int i, + ForkJoinTask c) { + return QA.weakCompareAndSet(a, i, c, null); + } + final boolean tryLock() { + return SOURCE.compareAndSet(this, 0, 1); + } + final void setBaseOpaque(int b) { + BASE.setOpaque(this, b); } /** - * Tries to lock shared queue by CASing phase field. + * Constructor used by ForkJoinWorkerThreads. Most fields + * are initialized upon thread start, in pool.registerWorker. */ - final boolean tryLockPhase() { - return PHASE.compareAndSet(this, 0, 1); + WorkQueue(ForkJoinWorkerThread owner, boolean isInnocuous) { + this.config = (isInnocuous) ? INNOCUOUS : 0; + this.owner = owner; } - final void releasePhaseLock() { - PHASE.setRelease(this, 0); + /** + * Constructor used for external queues. + */ + WorkQueue(int config) { + array = new ForkJoinTask[INITIAL_QUEUE_CAPACITY]; + this.config = config; + owner = null; + phase = -1; } /** * Returns an exportable index (used by ForkJoinWorkerThread). */ final int getPoolIndex() { - return (id & 0xffff) >>> 1; // ignore odd/even tag bit + return (config & 0xffff) >>> 1; // ignore odd/even tag bit } /** * Returns the approximate number of tasks in the queue. */ final int queueSize() { - int n = (int)BASE.getAcquire(this) - top; - return (n >= 0) ? 0 : -n; // ignore transient negative + VarHandle.acquireFence(); // ensure fresh reads by external callers + int n = top - base; + return (n < 0) ? 0 : n; // ignore transient negative } /** - * Provides a more accurate estimate of whether this queue has - * any tasks than does queueSize, by checking whether a - * near-empty queue has at least one unclaimed task. + * Provides a more conservative estimate of whether this queue + * has any tasks than does queueSize. */ final boolean isEmpty() { - ForkJoinTask[] a; int n, cap, b; - VarHandle.acquireFence(); // needed by external callers - return ((n = (b = base) - top) >= 0 || // possibly one task - (n == -1 && ((a = array) == null || - (cap = a.length) == 0 || - a[(cap - 1) & b] == null))); + return !((source != 0 && owner == null) || top - base > 0); } /** * Pushes a task. Call only by owner in unshared queues. * * @param task the task. Caller must ensure non-null. + * @param pool (no-op if null) * @throws RejectedExecutionException if array cannot be resized */ - final void push(ForkJoinTask task) { - ForkJoinTask[] a; - int s = top, d = s - base, cap, m; - ForkJoinPool p = pool; - if ((a = array) != null && (cap = a.length) > 0) { - QA.setRelease(a, (m = cap - 1) & s, task); - top = s + 1; + final void push(ForkJoinTask task, ForkJoinPool pool) { + ForkJoinTask[] a = array; + int s = top++, d = s - base, cap, m; // skip insert if disabled + if (a != null && pool != null && (cap = a.length) > 0) { + setSlotVolatile(a, (m = cap - 1) & s, task); if (d == m) - growArray(false); - else if (QA.getAcquire(a, m & (s - 1)) == null && p != null) { - VarHandle.fullFence(); // was empty - p.signalWork(null); - } + growArray(); + if (d == m || a[m & (s - 1)] == null) + pool.signalWork(); // signal if was empty or resized } } /** - * Version of push for shared queues. Call only with phase lock held. - * @return true if should signal work + * Pushes task to a shared queue with lock already held, and unlocks. + * + * @return true if caller should signal work */ final boolean lockedPush(ForkJoinTask task) { - ForkJoinTask[] a; - boolean signal = false; - int s = top, d = s - base, cap, m; - if ((a = array) != null && (cap = a.length) > 0) { - a[(m = (cap - 1)) & s] = task; - top = s + 1; + ForkJoinTask[] a = array; + int s = top++, d = s - base, cap, m; + if (a != null && (cap = a.length) > 0) { + a[(m = cap - 1) & s] = task; if (d == m) - growArray(true); - else { - phase = 0; // full volatile unlock - if (((s - base) & ~1) == 0) // size 0 or 1 - signal = true; - } + growArray(); + source = 0; // unlock + if (d == m || a[m & (s - 1)] == null) + return true; } - return signal; + return false; } /** - * Doubles the capacity of array. Call either by owner or with - * lock held -- it is OK for base, but not top, to move while - * resizings are in progress. - */ - final void growArray(boolean locked) { - ForkJoinTask[] newA = null; - try { - ForkJoinTask[] oldA; int oldSize, newSize; - if ((oldA = array) != null && (oldSize = oldA.length) > 0 && - (newSize = oldSize << 1) <= MAXIMUM_QUEUE_CAPACITY && - newSize > 0) { - try { - newA = new ForkJoinTask[newSize]; - } catch (OutOfMemoryError ex) { - } - if (newA != null) { // poll from old array, push to new - int oldMask = oldSize - 1, newMask = newSize - 1; - for (int s = top - 1, k = oldMask; k >= 0; --k) { - ForkJoinTask x = (ForkJoinTask) - QA.getAndSet(oldA, s & oldMask, null); - if (x != null) - newA[s-- & newMask] = x; - else - break; - } - array = newA; - VarHandle.releaseFence(); - } + * Doubles the capacity of array. Called by owner or with lock + * held after pre-incrementing top, which is reverted on + * allocation failure. + */ + final void growArray() { + ForkJoinTask[] oldArray = array, newArray; + int s = top - 1, oldCap, newCap; + if (oldArray != null && (oldCap = oldArray.length) > 0 && + (newCap = oldCap << 1) > 0) { // skip if disabled + try { + newArray = new ForkJoinTask[newCap]; + } catch (Throwable ex) { + top = s; + if (owner == null) + source = 0; // unlock + throw new RejectedExecutionException( + "Queue capacity exceeded"); } - } finally { - if (locked) - phase = 0; - } - if (newA == null) - throw new RejectedExecutionException("Queue capacity exceeded"); - } - - /** - * Takes next task, if one exists, in FIFO order. - */ - final ForkJoinTask poll() { - int b, k, cap; ForkJoinTask[] a; - while ((a = array) != null && (cap = a.length) > 0 && - top - (b = base) > 0) { - ForkJoinTask t = (ForkJoinTask) - QA.getAcquire(a, k = (cap - 1) & b); - if (base == b++) { - if (t == null) - Thread.yield(); // await index advance - else if (QA.compareAndSet(a, k, t, null)) { - BASE.setOpaque(this, b); - return t; - } + int newMask = newCap - 1, oldMask = oldCap - 1; + for (int k = oldCap; k > 0; --k, --s) { + ForkJoinTask x; // poll old, push to new + if ((x = getAndClearSlot(oldArray, s & oldMask)) == null) + break; // others already taken + newArray[s & newMask] = x; } + VarHandle.releaseFence(); // fill before publish + array = newArray; } - return null; } + // Variants of pop + /** - * Takes next task, if one exists, in order specified by mode. + * Pops and returns task, or null if empty. Called only by owner. */ - final ForkJoinTask nextLocalTask() { + private ForkJoinTask pop() { ForkJoinTask t = null; - int md = id, b, s, d, cap; ForkJoinTask[] a; - if ((a = array) != null && (cap = a.length) > 0 && - (d = (s = top) - (b = base)) > 0) { - if ((md & FIFO) == 0 || d == 1) { - if ((t = (ForkJoinTask) - QA.getAndSet(a, (cap - 1) & --s, null)) != null) - TOP.setOpaque(this, s); - } - else if ((t = (ForkJoinTask) - QA.getAndSet(a, (cap - 1) & b++, null)) != null) { - BASE.setOpaque(this, b); - } - else // on contention in FIFO mode, use regular poll - t = poll(); - } + int s = top, cap; ForkJoinTask[] a; + if ((a = array) != null && (cap = a.length) > 0 && base != s-- && + (t = getAndClearSlot(a, (cap - 1) & s)) != null) + top = s; return t; } /** - * Returns next task, if one exists, in order specified by mode. + * Pops the given task for owner only if it is at the current top. */ - final ForkJoinTask peek() { - int cap; ForkJoinTask[] a; - return ((a = array) != null && (cap = a.length) > 0) ? - a[(cap - 1) & ((id & FIFO) != 0 ? base : top - 1)] : null; + final boolean tryUnpush(ForkJoinTask task) { + int s = top, cap; ForkJoinTask[] a; + if ((a = array) != null && (cap = a.length) > 0 && base != s-- && + casSlotToNull(a, (cap - 1) & s, task)) { + top = s; + return true; + } + return false; } /** - * Pops the given task only if it is at the current top. + * Locking version of tryUnpush. */ - final boolean tryUnpush(ForkJoinTask task) { - boolean popped = false; - int s, cap; ForkJoinTask[] a; + final boolean externalTryUnpush(ForkJoinTask task) { + boolean taken = false; + int s = top, cap, k; ForkJoinTask[] a; if ((a = array) != null && (cap = a.length) > 0 && - (s = top) != base && - (popped = QA.compareAndSet(a, (cap - 1) & --s, task, null))) - TOP.setOpaque(this, s); - return popped; + a[k = (cap - 1) & (s - 1)] == task && tryLock()) { + if (top == s && array == a && + (taken = casSlotToNull(a, k, task))) + top = s - 1; + source = 0; // release lock + } + return taken; } /** - * Shared version of tryUnpush. - */ - final boolean tryLockedUnpush(ForkJoinTask task) { - boolean popped = false; - int s = top - 1, k, cap; ForkJoinTask[] a; - if ((a = array) != null && (cap = a.length) > 0 && - a[k = (cap - 1) & s] == task && tryLockPhase()) { - if (top == s + 1 && array == a && - (popped = QA.compareAndSet(a, k, task, null))) - top = s; - releasePhaseLock(); + * Deep form of tryUnpush: Traverses from top and removes task if + * present, shifting others to fill gap. + */ + final boolean tryRemove(ForkJoinTask task, boolean owned) { + boolean taken = false; + int p = top, cap; ForkJoinTask[] a; ForkJoinTask t; + if ((a = array) != null && task != null && (cap = a.length) > 0) { + int m = cap - 1, s = p - 1, d = p - base; + for (int i = s, k; d > 0; --i, --d) { + if ((t = a[k = i & m]) == task) { + if (owned || tryLock()) { + if ((owned || (array == a && top == p)) && + (taken = casSlotToNull(a, k, t))) { + for (int j = i; j != s; ) // shift down + a[j & m] = getAndClearSlot(a, ++j & m); + top = s; + } + if (!owned) + source = 0; + } + break; + } + } } - return popped; + return taken; } + // variants of poll + /** - * Removes and cancels all known tasks, ignoring any exceptions. + * Tries once to poll next task in FIFO order, failing on + * inconsistency or contention. */ - final void cancelAll() { - for (ForkJoinTask t; (t = poll()) != null; ) - ForkJoinTask.cancelIgnoringExceptions(t); + final ForkJoinTask tryPoll() { + int cap, b, k; ForkJoinTask[] a; + if ((a = array) != null && (cap = a.length) > 0) { + ForkJoinTask t = getSlot(a, k = (cap - 1) & (b = base)); + if (base == b++ && t != null && casSlotToNull(a, k, t)) { + setBaseOpaque(b); + return t; + } + } + return null; } - // Specialized execution methods - /** - * Runs the given (stolen) task if nonnull, as well as - * remaining local tasks and others available from the given - * queue, up to bound n (to avoid infinite unfairness). + * Takes next task, if one exists, in order specified by mode. */ - final void topLevelExec(ForkJoinTask t, WorkQueue q, int n) { - int nstolen = 1; - for (int j = 0;;) { - if (t != null) - t.doExec(); - if (j++ <= n) - t = nextLocalTask(); - else { - j = 0; - t = null; - } - if (t == null) { - if (q != null && (t = q.poll()) != null) { - ++nstolen; - j = 0; + final ForkJoinTask nextLocalTask(int cfg) { + ForkJoinTask t = null; + int s = top, cap; ForkJoinTask[] a; + if ((a = array) != null && (cap = a.length) > 0) { + for (int b, d;;) { + if ((d = s - (b = base)) <= 0) + break; + if (d == 1 || (cfg & FIFO) == 0) { + if ((t = getAndClearSlot(a, --s & (cap - 1))) != null) + top = s; + break; } - else if (j != 0) + if ((t = getAndClearSlot(a, b++ & (cap - 1))) != null) { + setBaseOpaque(b); break; + } } } - ForkJoinWorkerThread thread = owner; - nsteals += nstolen; - source = 0; - if (thread != null) - thread.afterTopLevelExec(); + return t; } /** - * If present, removes task from queue and executes it. + * Takes next task, if one exists, using configured mode. */ - final void tryRemoveAndExec(ForkJoinTask task) { - ForkJoinTask[] a; int s, cap; - if ((a = array) != null && (cap = a.length) > 0 && - (s = top) - base > 0) { // traverse from top - for (int m = cap - 1, ns = s - 1, i = ns; ; --i) { - int index = i & m; - ForkJoinTask t = (ForkJoinTask)QA.get(a, index); - if (t == null) - break; - else if (t == task) { - if (QA.compareAndSet(a, index, t, null)) { - top = ns; // safely shift down - for (int j = i; j != ns; ++j) { - ForkJoinTask f; - int pindex = (j + 1) & m; - f = (ForkJoinTask)QA.get(a, pindex); - QA.setVolatile(a, pindex, null); - int jindex = j & m; - QA.setRelease(a, jindex, f); - } - VarHandle.releaseFence(); - t.doExec(); - } - break; - } - } + final ForkJoinTask nextLocalTask() { + return nextLocalTask(config); + } + + /** + * Returns next task, if one exists, in order specified by mode. + */ + final ForkJoinTask peek() { + VarHandle.acquireFence(); + int cap; ForkJoinTask[] a; + return ((a = array) != null && (cap = a.length) > 0) ? + a[(cap - 1) & ((config & FIFO) != 0 ? base : top - 1)] : null; + } + + // specialized execution methods + + /** + * Runs the given (stolen) task if nonnull, as well as + * remaining local tasks and/or others available from the + * given queue. + */ + final void topLevelExec(ForkJoinTask task, WorkQueue q) { + int cfg = config, nstolen = 1; + while (task != null) { + task.doExec(); + if ((task = nextLocalTask(cfg)) == null && + q != null && (task = q.tryPoll()) != null) + ++nstolen; } + nsteals += nstolen; + source = 0; + if ((cfg & INNOCUOUS) != 0) + ThreadLocalRandom.eraseThreadLocals(Thread.currentThread()); } /** @@ -1073,84 +1170,85 @@ * until done, not found, or limit exceeded. * * @param task root of CountedCompleter computation + * @param owned true if owned by a ForkJoinWorkerThread * @param limit max runs, or zero for no limit - * @param shared true if must lock to extract task * @return task status on exit */ - final int helpCC(CountedCompleter task, int limit, boolean shared) { - int status = 0; - if (task != null && (status = task.status) >= 0) { - int s, k, cap; ForkJoinTask[] a; - while ((a = array) != null && (cap = a.length) > 0 && - (s = top) - base > 0) { - CountedCompleter v = null; - ForkJoinTask o = a[k = (cap - 1) & (s - 1)]; - if (o instanceof CountedCompleter) { - CountedCompleter t = (CountedCompleter)o; - for (CountedCompleter f = t;;) { - if (f != task) { - if ((f = f.completer) == null) - break; - } - else if (shared) { - if (tryLockPhase()) { - if (top == s && array == a && - QA.compareAndSet(a, k, t, null)) { - top = s - 1; - v = t; - } - releasePhaseLock(); - } - break; - } - else { - if (QA.compareAndSet(a, k, t, null)) { - top = s - 1; - v = t; - } - break; - } + final int helpComplete(ForkJoinTask task, boolean owned, int limit) { + int status = 0, cap, k, p, s; ForkJoinTask[] a; ForkJoinTask t; + while (task != null && (status = task.status) >= 0 && + (a = array) != null && (cap = a.length) > 0 && + (t = a[k = (cap - 1) & (s = (p = top) - 1)]) + instanceof CountedCompleter) { + CountedCompleter f = (CountedCompleter)t; + boolean taken = false; + for (;;) { // exec if root task is a completer of t + if (f == task) { + if (owned) { + if ((taken = casSlotToNull(a, k, t))) + top = s; } + else if (tryLock()) { + if (top == p && array == a && + (taken = casSlotToNull(a, k, t))) + top = s; + source = 0; + } + break; } - if (v != null) - v.doExec(); - if ((status = task.status) < 0 || v == null || - (limit != 0 && --limit == 0)) + else if ((f = f.completer) == null) break; } + if (!taken) + break; + t.doExec(); + if (limit != 0 && --limit == 0) + break; } return status; } /** * Tries to poll and run AsynchronousCompletionTasks until - * none found or blocker is released + * none found or blocker is released. * * @param blocker the blocker */ final void helpAsyncBlocker(ManagedBlocker blocker) { - if (blocker != null) { - int b, k, cap; ForkJoinTask[] a; ForkJoinTask t; - while ((a = array) != null && (cap = a.length) > 0 && - top - (b = base) > 0) { - t = (ForkJoinTask)QA.getAcquire(a, k = (cap - 1) & b); - if (blocker.isReleasable()) - break; - else if (base == b++ && t != null) { - if (!(t instanceof CompletableFuture. - AsynchronousCompletionTask)) - break; - else if (QA.compareAndSet(a, k, t, null)) { - BASE.setOpaque(this, b); - t.doExec(); - } - } + int cap, b, d, k; ForkJoinTask[] a; ForkJoinTask t; + while (blocker != null && (d = top - (b = base)) > 0 && + (a = array) != null && (cap = a.length) > 0 && + (((t = getSlot(a, k = (cap - 1) & b)) == null && d > 1) || + t instanceof + CompletableFuture.AsynchronousCompletionTask) && + !blocker.isReleasable()) { + if (t != null && base == b++ && casSlotToNull(a, k, t)) { + setBaseOpaque(b); + t.doExec(); } } } + // misc + + /** AccessControlContext for innocuous workers, created on 1st use. */ + private static AccessControlContext INNOCUOUS_ACC; + + /** + * Initializes (upon registration) InnocuousForkJoinWorkerThreads. + */ + final void initializeInnocuousWorker() { + AccessControlContext acc; // racy construction OK + if ((acc = INNOCUOUS_ACC) == null) + INNOCUOUS_ACC = acc = new AccessControlContext( + new ProtectionDomain[] { new ProtectionDomain(null, null) }); + Thread t = Thread.currentThread(); + ThreadLocalRandom.setInheritedAccessControlContext(t, acc); + ThreadLocalRandom.eraseThreadLocals(t); + } + /** - * Returns true if owned and not known to be blocked. + * Returns true if owned by a worker thread and not known to be blocked. */ final boolean isApparentlyUnblocked() { Thread wt; Thread.State s; @@ -1160,16 +1258,12 @@ s != Thread.State.TIMED_WAITING); } - // VarHandle mechanics. - static final VarHandle PHASE; - static final VarHandle BASE; - static final VarHandle TOP; static { try { + QA = MethodHandles.arrayElementVarHandle(ForkJoinTask[].class); MethodHandles.Lookup l = MethodHandles.lookup(); - PHASE = l.findVarHandle(WorkQueue.class, "phase", int.class); + SOURCE = l.findVarHandle(WorkQueue.class, "source", int.class); BASE = l.findVarHandle(WorkQueue.class, "base", int.class); - TOP = l.findVarHandle(WorkQueue.class, "top", int.class); } catch (ReflectiveOperationException e) { throw new ExceptionInInitializerError(e); } @@ -1213,17 +1307,9 @@ private static final int COMMON_MAX_SPARES; /** - * Sequence number for creating workerNamePrefix. + * Sequence number for creating worker names */ - private static int poolNumberSequence; - - /** - * Returns the next sequence number. We don't expect this to - * ever contend, so use simple builtin sync. - */ - private static final synchronized int nextPoolId() { - return ++poolNumberSequence; - } + private static volatile int poolIds; // static configuration constants @@ -1248,12 +1334,6 @@ */ private static final int DEFAULT_COMMON_MAX_SPARES = 256; - /** - * Increment for seed generators. See class ThreadLocal for - * explanation. - */ - private static final int SEED_INCREMENT = 0x9e3779b9; - /* * Bits and masks for field ctl, packed with 4 16 bit subfields: * RC: Number of released (unqueued) workers minus target parallelism @@ -1271,10 +1351,10 @@ * deal with possibly negative fields, we use casts in and out of * "short" and/or signed shifts to maintain signedness. * - * Because it occupies uppermost bits, we can add one release count - * using getAndAddLong of RC_UNIT, rather than CAS, when returning - * from a blocked join. Other updates entail multiple subfields - * and masking, requiring CAS. + * Because it occupies uppermost bits, we can add one release + * count using getAndAdd of RC_UNIT, rather than CAS, when + * returning from a blocked join. Other updates entail multiple + * subfields and masking, requiring CAS. * * The limits packed in field "bounds" are also offset by the * parallelism level to make them comparable to the ctl rc and tc @@ -1298,13 +1378,16 @@ // Instance fields - volatile long stealCount; // collects worker nsteals final long keepAlive; // milliseconds before dropping if idle - int indexSeed; // next worker index + volatile long stealCount; // collects worker nsteals + int scanRover; // advances across pollScan calls + volatile int threadIds; // for worker thread names final int bounds; // min, max threads packed as shorts volatile int mode; // parallelism, runstate, queue mode - WorkQueue[] workQueues; // main registry - final String workerNamePrefix; // for worker thread string; sync lock + WorkQueue[] queues; // main registry + final ReentrantLock registrationLock; + Condition termination; // lazily constructed + final String workerNamePrefix; // null for common pool final ForkJoinWorkerThreadFactory factory; final UncaughtExceptionHandler ueh; // per-worker UEH final Predicate saturate; @@ -1312,6 +1395,30 @@ @jdk.internal.vm.annotation.Contended("fjpctl") // segregate volatile long ctl; // main pool control + // Support for atomic operations + private static final VarHandle CTL; + private static final VarHandle MODE; + private static final VarHandle THREADIDS; + private static final VarHandle POOLIDS; + private boolean compareAndSetCtl(long c, long v) { + return CTL.compareAndSet(this, c, v); + } + private long compareAndExchangeCtl(long c, long v) { + return (long)CTL.compareAndExchange(this, c, v); + } + private long getAndAddCtl(long v) { + return (long)CTL.getAndAdd(this, v); + } + private int getAndBitwiseOrMode(int v) { + return (int)MODE.getAndBitwiseOr(this, v); + } + private int getAndAddThreadIds(int x) { + return (int)THREADIDS.getAndAdd(this, x); + } + private static int getAndAddPoolIds(int x) { + return (int)POOLIDS.getAndAdd(x); + } + // Creating, registering and deregistering workers /** @@ -1338,83 +1445,63 @@ } /** - * Tries to add one worker, incrementing ctl counts before doing - * so, relying on createWorker to back out on failure. - * - * @param c incoming ctl value, with total count negative and no - * idle workers. On CAS failure, c is refreshed and retried if - * this holds (otherwise, a new worker is not needed). + * Provides a name for ForkJoinWorkerThread constructor. */ - private void tryAddWorker(long c) { - do { - long nc = ((RC_MASK & (c + RC_UNIT)) | - (TC_MASK & (c + TC_UNIT))); - if (ctl == c && CTL.compareAndSet(this, c, nc)) { - createWorker(); - break; - } - } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0); + final String nextWorkerThreadName() { + String prefix = workerNamePrefix; + int tid = getAndAddThreadIds(1) + 1; + if (prefix == null) // commonPool has no prefix + prefix = "ForkJoinPool.commonPool-worker-"; + return prefix.concat(Integer.toString(tid)); } /** - * Callback from ForkJoinWorkerThread constructor to establish and - * record its WorkQueue. - * - * @param wt the worker thread - * @return the worker's queue - */ - final WorkQueue registerWorker(ForkJoinWorkerThread wt) { - UncaughtExceptionHandler handler; - wt.setDaemon(true); // configure thread - if ((handler = ueh) != null) - wt.setUncaughtExceptionHandler(handler); - int tid = 0; // for thread name - int idbits = mode & FIFO; - String prefix = workerNamePrefix; - WorkQueue w = new WorkQueue(this, wt); - if (prefix != null) { - synchronized (prefix) { - WorkQueue[] ws = workQueues; int n; - int s = indexSeed += SEED_INCREMENT; - idbits |= (s & ~(SMASK | FIFO | DORMANT)); - if (ws != null && (n = ws.length) > 1) { - int m = n - 1; - tid = m & ((s << 1) | 1); // odd-numbered indices - for (int probes = n >>> 1;;) { // find empty slot - WorkQueue q; - if ((q = ws[tid]) == null || q.phase == QUIET) - break; - else if (--probes == 0) { - tid = n | 1; // resize below - break; - } - else - tid = (tid + 2) & m; - } - w.phase = w.id = tid | idbits; // now publishable + * Finishes initializing and records owned queue. + * + * @param w caller's WorkQueue + */ + final void registerWorker(WorkQueue w) { + ReentrantLock lock = registrationLock; + ThreadLocalRandom.localInit(); + int seed = ThreadLocalRandom.getProbe(); + if (w != null && lock != null) { + int modebits = (mode & FIFO) | w.config; + w.array = new ForkJoinTask[INITIAL_QUEUE_CAPACITY]; + w.stackPred = seed; // stash for runWorker + if ((modebits & INNOCUOUS) != 0) + w.initializeInnocuousWorker(); + int id = (seed << 1) | 1; // initial index guess + lock.lock(); + try { + WorkQueue[] qs; int n; // find queue index + if ((qs = queues) != null && (n = qs.length) > 0) { + int k = n, m = n - 1; + for (; qs[id &= m] != null && k > 0; id -= 2, k -= 2); + if (k == 0) + id = n | 1; // resize below + w.phase = w.config = id | modebits; // now publishable - if (tid < n) - ws[tid] = w; + if (id < n) + qs[id] = w; else { // expand array - int an = n << 1; + int an = n << 1, am = an - 1; WorkQueue[] as = new WorkQueue[an]; - as[tid] = w; - int am = an - 1; - for (int j = 0; j < n; ++j) { - WorkQueue v; // copy external queue - if ((v = ws[j]) != null) // position may change - as[v.id & am & SQMASK] = v; - if (++j >= n) - break; - as[j] = ws[j]; // copy worker + as[id & am] = w; + for (int j = 1; j < n; j += 2) + as[j] = qs[j]; + for (int j = 0; j < n; j += 2) { + WorkQueue q; + if ((q = qs[j]) != null) // shared queues may move + as[q.config & am] = q; } - workQueues = as; + VarHandle.releaseFence(); // fill before publish + queues = as; } } + } finally { + lock.unlock(); } - wt.setName(prefix.concat(Integer.toString(tid))); } - return w; } /** @@ -1427,319 +1514,431 @@ * @param ex the exception causing failure, or null if none */ final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) { + ReentrantLock lock = registrationLock; WorkQueue w = null; - int phase = 0; - if (wt != null && (w = wt.workQueue) != null) { - Object lock = workerNamePrefix; - int wid = w.id; - long ns = (long)w.nsteals & 0xffffffffL; - if (lock != null) { - synchronized (lock) { - WorkQueue[] ws; int n, i; // remove index from array - if ((ws = workQueues) != null && (n = ws.length) > 0 && - ws[i = wid & (n - 1)] == w) - ws[i] = null; - stealCount += ns; - } - } - phase = w.phase; - } - if (phase != QUIET) { // else pre-adjusted - long c; // decrement counts - do {} while (!CTL.weakCompareAndSet - (this, c = ctl, ((RC_MASK & (c - RC_UNIT)) | - (TC_MASK & (c - TC_UNIT)) | - (SP_MASK & c)))); + int cfg = 0; + if (wt != null && (w = wt.workQueue) != null && lock != null) { + WorkQueue[] qs; int n, i; + cfg = w.config; + long ns = w.nsteals & 0xffffffffL; + lock.lock(); // remove index from array + if ((qs = queues) != null && (n = qs.length) > 0 && + qs[i = cfg & (n - 1)] == w) + qs[i] = null; + stealCount += ns; // accumulate steals + lock.unlock(); + long c = ctl; + if ((cfg & QUIET) == 0) // unless self-signalled, decrement counts + do {} while (c != (c = compareAndExchangeCtl( + c, ((RC_MASK & (c - RC_UNIT)) | + (TC_MASK & (c - TC_UNIT)) | + (SP_MASK & c))))); + else if ((int)c == 0) // was dropped on timeout + cfg = 0; // suppress signal if last + for (ForkJoinTask t; (t = w.pop()) != null; ) + ForkJoinTask.cancelIgnoringExceptions(t); // cancel tasks } - if (w != null) - w.cancelAll(); // cancel remaining tasks - if (!tryTerminate(false, false) && // possibly replace worker - w != null && w.array != null) // avoid repeated failures - signalWork(null); - - if (ex == null) // help clean on way out - ForkJoinTask.helpExpungeStaleExceptions(); - else // rethrow + if (!tryTerminate(false, false) && w != null && (cfg & SRC) != 0) + signalWork(); // possibly replace worker + if (ex != null) ForkJoinTask.rethrow(ex); } - /** + /* * Tries to create or release a worker if too few are running. - * @param q if non-null recheck if empty on CAS failure */ - final void signalWork(WorkQueue q) { - for (;;) { - long c; int sp; WorkQueue[] ws; int i; WorkQueue v; - if ((c = ctl) >= 0L) // enough workers - break; - else if ((sp = (int)c) == 0) { // no idle workers - if ((c & ADD_WORKER) != 0L) // too few workers - tryAddWorker(c); - break; + final void signalWork() { + for (long c = ctl; c < 0L;) { + int sp, i; WorkQueue[] qs; WorkQueue v; + if ((sp = (int)c & ~UNSIGNALLED) == 0) { // no idle workers + if ((c & ADD_WORKER) == 0L) // enough total workers + break; + if (c == (c = compareAndExchangeCtl( + c, ((RC_MASK & (c + RC_UNIT)) | + (TC_MASK & (c + TC_UNIT)))))) { + createWorker(); + break; + } } - else if ((ws = workQueues) == null) + else if ((qs = queues) == null) break; // unstarted/terminated - else if (ws.length <= (i = sp & SMASK)) + else if (qs.length <= (i = sp & SMASK)) break; // terminated - else if ((v = ws[i]) == null) + else if ((v = qs[i]) == null) break; // terminating else { - int np = sp & ~UNSIGNALLED; - int vp = v.phase; long nc = (v.stackPred & SP_MASK) | (UC_MASK & (c + RC_UNIT)); Thread vt = v.owner; - if (sp == vp && CTL.compareAndSet(this, c, nc)) { - v.phase = np; - if (vt != null && v.source < 0) - LockSupport.unpark(vt); + if (c == (c = compareAndExchangeCtl(c, nc))) { + v.phase = sp; + LockSupport.unpark(vt); // release idle worker break; } - else if (q != null && q.isEmpty()) // no need to retry - break; } } } /** - * Tries to decrement counts (sometimes implicitly) and possibly - * arrange for a compensating worker in preparation for blocking: - * If not all core workers yet exist, creates one, else if any are - * unreleased (possibly including caller) releases one, else if - * fewer than the minimum allowed number of workers running, - * checks to see that they are all active, and if so creates an - * extra worker unless over maximum limit and policy is to - * saturate. Most of these steps can fail due to interference, in - * which case 0 is returned so caller will retry. A negative - * return value indicates that the caller doesn't need to - * re-adjust counts when later unblocked. + * Top-level runloop for workers, called by ForkJoinWorkerThread.run. + * See above for explanation. * - * @return 1: block then adjust, -1: block without adjust, 0 : retry + * @param w caller's WorkQueue (may be null on failed initialization) */ - private int tryCompensate(WorkQueue w) { - int t, n, sp; - long c = ctl; - WorkQueue[] ws = workQueues; - if ((t = (short)(c >>> TC_SHIFT)) >= 0) { - if (ws == null || (n = ws.length) <= 0 || w == null) - return 0; // disabled - else if ((sp = (int)c) != 0) { // replace or release - WorkQueue v = ws[sp & (n - 1)]; - int wp = w.phase; - long uc = UC_MASK & ((wp < 0) ? c + RC_UNIT : c); - int np = sp & ~UNSIGNALLED; - if (v != null) { - int vp = v.phase; - Thread vt = v.owner; - long nc = ((long)v.stackPred & SP_MASK) | uc; - if (vp == sp && CTL.compareAndSet(this, c, nc)) { - v.phase = np; - if (vt != null && v.source < 0) - LockSupport.unpark(vt); - return (wp < 0) ? -1 : 1; - } - } - return 0; - } - else if ((int)(c >> RC_SHIFT) - // reduce parallelism - (short)(bounds & SMASK) > 0) { - long nc = ((RC_MASK & (c - RC_UNIT)) | (~RC_MASK & c)); - return CTL.compareAndSet(this, c, nc) ? 1 : 0; - } - else { // validate - int md = mode, pc = md & SMASK, tc = pc + t, bc = 0; - boolean unstable = false; - for (int i = 1; i < n; i += 2) { - WorkQueue q; Thread wt; Thread.State ts; - if ((q = ws[i]) != null) { - if (q.source == 0) { - unstable = true; - break; - } - else { - --tc; - if ((wt = q.owner) != null && - ((ts = wt.getState()) == Thread.State.BLOCKED || - ts == Thread.State.WAITING)) - ++bc; // worker is blocking - } - } + final void runWorker(WorkQueue w) { + if (w != null) { // skip on failed init + w.config |= SRC; // mark as valid source + int r = w.stackPred, src = 0; // use seed from registerWorker + do { + r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift + } while ((src = scan(w, src, r)) >= 0 || + (src = awaitWork(w)) == 0); + } + } + + /** + * Scans for and if found executes top-level tasks: Tries to poll + * each queue starting at a random index with random stride, + * returning source id or retry indicator if contended or + * inconsistent. + * + * @param w caller's WorkQueue + * @param prevSrc the previous queue stolen from in current phase, or 0 + * @param r random seed + * @return id of queue if taken, negative if none found, prevSrc for retry + */ + private int scan(WorkQueue w, int prevSrc, int r) { + WorkQueue[] qs = queues; + int n = (w == null || qs == null) ? 0 : qs.length; + for (int step = (r >>> 16) | 1, i = n; i > 0; --i, r += step) { + int j, cap, b; WorkQueue q; ForkJoinTask[] a; + if ((q = qs[j = r & (n - 1)]) != null && // poll at qs[j].array[k] + (a = q.array) != null && (cap = a.length) > 0) { + int k = (cap - 1) & (b = q.base), nextBase = b + 1; + int nextIndex = (cap - 1) & nextBase, src = j | SRC; + ForkJoinTask t = WorkQueue.getSlot(a, k); + if (q.base != b) // inconsistent + return prevSrc; + else if (t != null && WorkQueue.casSlotToNull(a, k, t)) { + q.base = nextBase; + ForkJoinTask next = a[nextIndex]; + if ((w.source = src) != prevSrc && next != null) + signalWork(); // propagate + w.topLevelExec(t, q); + return src; + } + else if (a[nextIndex] != null) // revisit + return prevSrc; + } + } + return (queues != qs) ? prevSrc: -1; // possibly resized + } + + /** + * Advances worker phase, pushes onto ctl stack, and awaits signal + * or reports termination. + * + * @return negative if terminated, else 0 + */ + private int awaitWork(WorkQueue w) { + if (w == null) + return -1; // already terminated + int phase = (w.phase + SS_SEQ) & ~UNSIGNALLED; + w.phase = phase | UNSIGNALLED; // advance phase + long prevCtl = ctl, c; // enqueue + do { + w.stackPred = (int)prevCtl; + c = ((prevCtl - RC_UNIT) & UC_MASK) | (phase & SP_MASK); + } while (prevCtl != (prevCtl = compareAndExchangeCtl(prevCtl, c))); + + Thread.interrupted(); // clear status + LockSupport.setCurrentBlocker(this); // prepare to block (exit also OK) + long deadline = 0L; // nonzero if possibly quiescent + int ac = (int)(c >> RC_SHIFT), md; + if ((md = mode) < 0) // pool is terminating + return -1; + else if ((md & SMASK) + ac <= 0) { + boolean checkTermination = (md & SHUTDOWN) != 0; + if ((deadline = System.currentTimeMillis() + keepAlive) == 0L) + deadline = 1L; // avoid zero + WorkQueue[] qs = queues; // check for racing submission + int n = (qs == null) ? 0 : qs.length; + for (int i = 0; i < n; i += 2) { + WorkQueue q; ForkJoinTask[] a; int cap, b; + if (ctl != c) { // already signalled + checkTermination = false; + break; } - if (unstable || tc != 0 || ctl != c) - return 0; // inconsistent - else if (t + pc >= MAX_CAP || t >= (bounds >>> SWIDTH)) { - Predicate sat; - if ((sat = saturate) != null && sat.test(this)) - return -1; - else if (bc < pc) { // lagging - Thread.yield(); // for retry spins - return 0; - } - else - throw new RejectedExecutionException( - "Thread limit exceeded replacing blocked worker"); + else if ((q = qs[i]) != null && + (a = q.array) != null && (cap = a.length) > 0 && + ((b = q.base) != q.top || a[(cap - 1) & b] != null || + q.source != 0)) { + if (compareAndSetCtl(c, prevCtl)) + w.phase = phase; // self-signal + checkTermination = false; + break; } } + if (checkTermination && tryTerminate(false, false)) + return -1; // trigger quiescent termination } - long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK); // expand pool - return CTL.compareAndSet(this, c, nc) && createWorker() ? 1 : 0; + for (boolean alt = false;;) { // await activation or termination + if (w.phase >= 0) + break; + else if (mode < 0) + return -1; + else if ((c = ctl) == prevCtl) + Thread.onSpinWait(); // signal in progress + else if (!(alt = !alt)) // check between park calls + Thread.interrupted(); + else if (deadline == 0L) + LockSupport.park(); + else if (deadline - System.currentTimeMillis() > TIMEOUT_SLOP) + LockSupport.parkUntil(deadline); + else if (((int)c & SMASK) == (w.config & SMASK) && + compareAndSetCtl(c, ((UC_MASK & (c - TC_UNIT)) | + (prevCtl & SP_MASK)))) { + w.config |= QUIET; // sentinel for deregisterWorker + return -1; // drop on timeout + } + else if ((deadline += keepAlive) == 0L) + deadline = 1L; // not at head; restart timer + } + return 0; } + // Utilities used by ForkJoinTask + /** - * Top-level runloop for workers, called by ForkJoinWorkerThread.run. - * See above for explanation. + * Returns true if all workers are busy, possibly creating one if allowed */ - final void runWorker(WorkQueue w) { - int r = (w.id ^ ThreadLocalRandom.nextSecondarySeed()) | FIFO; // rng - w.array = new ForkJoinTask[INITIAL_QUEUE_CAPACITY]; // initialize - for (;;) { - int phase; - if (scan(w, r)) { // scan until apparently empty - r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // move (xorshift) - } - else if ((phase = w.phase) >= 0) { // enqueue, then rescan - long np = (w.phase = (phase + SS_SEQ) | UNSIGNALLED) & SP_MASK; - long c, nc; - do { - w.stackPred = (int)(c = ctl); - nc = ((c - RC_UNIT) & UC_MASK) | np; - } while (!CTL.weakCompareAndSet(this, c, nc)); - } - else { // already queued - int pred = w.stackPred; - Thread.interrupted(); // clear before park - w.source = DORMANT; // enable signal - long c = ctl; - int md = mode, rc = (md & SMASK) + (int)(c >> RC_SHIFT); - if (md < 0) // terminating - break; - else if (rc <= 0 && (md & SHUTDOWN) != 0 && - tryTerminate(false, false)) - break; // quiescent shutdown - else if (w.phase < 0) { - if (rc <= 0 && pred != 0 && phase == (int)c) { - long nc = (UC_MASK & (c - TC_UNIT)) | (SP_MASK & pred); - long d = keepAlive + System.currentTimeMillis(); - LockSupport.parkUntil(this, d); - if (ctl == c && // drop on timeout if all idle - d - System.currentTimeMillis() <= TIMEOUT_SLOP && - CTL.compareAndSet(this, c, nc)) { - w.phase = QUIET; - break; - } - } - else { - LockSupport.park(this); - if (w.phase < 0) // one spurious wakeup check - LockSupport.park(this); + final boolean isSaturated() { + int maxTotal = bounds >>> SWIDTH; + for (long c;;) { + if (((int)(c = ctl) & ~UNSIGNALLED) != 0) + return false; + if ((short)(c >>> TC_SHIFT) >= maxTotal) + return true; + long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK); + if (compareAndSetCtl(c, nc)) + return !createWorker(); + } + } + + /** + * Returns true if can start terminating if enabled, or already terminated + */ + final boolean canStop() { + outer: for (long oldSum = 0L;;) { // repeat until stable + int md; WorkQueue[] qs; long c; + if ((qs = queues) == null || ((md = mode) & STOP) != 0) + return true; + if ((md & SMASK) + (int)((c = ctl) >> RC_SHIFT) > 0) + break; + long checkSum = c; + for (int i = 1; i < qs.length; i += 2) { // scan submitters + WorkQueue q; ForkJoinTask[] a; int s = 0, cap; + if ((q = qs[i]) != null && (a = q.array) != null && + (cap = a.length) > 0 && + ((s = q.top) != q.base || a[(cap - 1) & s] != null || + q.source != 0)) + break outer; + checkSum += (((long)i) << 32) ^ s; + } + if (oldSum == (oldSum = checkSum) && queues == qs) + return true; + } + return (mode & STOP) != 0; // recheck mode on false return + } + + /** + * Tries to decrement counts (sometimes implicitly) and possibly + * arrange for a compensating worker in preparation for + * blocking. May fail due to interference, in which case -1 is + * returned so caller may retry. A zero return value indicates + * that the caller doesn't need to re-adjust counts when later + * unblocked. + * + * @param c incoming ctl value + * @return UNCOMPENSATE: block then adjust, 0: block, -1 : retry + */ + private int tryCompensate(long c) { + Predicate sat; + int b = bounds; // counts are signed; centered at parallelism level == 0 + int minActive = (short)(b & SMASK), + maxTotal = b >>> SWIDTH, + active = (int)(c >> RC_SHIFT), + total = (short)(c >>> TC_SHIFT), + sp = (int)c & ~UNSIGNALLED; + if (total >= 0) { + if (sp != 0) { // activate idle worker + WorkQueue[] qs; int n; WorkQueue v; + if ((qs = queues) != null && (n = qs.length) > 0 && + (v = qs[sp & (n - 1)]) != null) { + Thread vt = v.owner; + long nc = ((long)v.stackPred & SP_MASK) | (UC_MASK & c); + if (compareAndSetCtl(c, nc)) { + v.phase = sp; + LockSupport.unpark(vt); + return UNCOMPENSATE; } } - w.source = 0; // disable signal + return -1; // retry + } + else if (active > minActive) { // reduce parallelism + long nc = ((RC_MASK & (c - RC_UNIT)) | (~RC_MASK & c)); + return compareAndSetCtl(c, nc) ? UNCOMPENSATE : -1; } } + if (total < maxTotal) { // expand pool + long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK); + return (!compareAndSetCtl(c, nc) ? -1 : + !createWorker() ? 0 : UNCOMPENSATE); + } + else if (!compareAndSetCtl(c, c)) // validate + return -1; + else if ((sat = saturate) != null && sat.test(this)) + return 0; + else + throw new RejectedExecutionException( + "Thread limit exceeded replacing blocked worker"); } /** - * Scans for and if found executes one or more top-level tasks from a queue. + * Readjusts RC count; called from ForkJoinTask after blocking. + */ + final void uncompensate() { + getAndAddCtl(RC_UNIT); + } + + /** + * Helps if possible until the given task is done. Scans other + * queues for a task produced by one of w's stealers; returning + * compensated blocking sentinel if none are found. * - * @return true if found an apparently non-empty queue, and - * possibly ran task(s). - */ - private boolean scan(WorkQueue w, int r) { - WorkQueue[] ws; int n; - if ((ws = workQueues) != null && (n = ws.length) > 0 && w != null) { - for (int m = n - 1, j = r & m;;) { - WorkQueue q; int b; - if ((q = ws[j]) != null && q.top != (b = q.base)) { - int qid = q.id; - ForkJoinTask[] a; int cap, k; ForkJoinTask t; - if ((a = q.array) != null && (cap = a.length) > 0) { - t = (ForkJoinTask)QA.getAcquire(a, k = (cap - 1) & b); - if (q.base == b++ && t != null && - QA.compareAndSet(a, k, t, null)) { - q.base = b; - w.source = qid; - if (a[(cap - 1) & b] != null) - signalWork(q); // help signal if more tasks - w.topLevelExec(t, q, // random fairness bound - (r | (1 << TOP_BOUND_SHIFT)) & SMASK); + * @param task the task + * @param w caller's WorkQueue + * @return task status on exit, or UNCOMPENSATE for compensated blocking + */ + final int helpJoin(ForkJoinTask task, WorkQueue w) { + int s = 0; + if (task != null && w != null) { + int wsrc = w.source, wid = w.config & SMASK, r = wid + 2; + boolean scan = true; + long c = 0L; // track ctl stability + outer: for (;;) { + if ((s = task.status) < 0) + break; + else if (scan = !scan) { // previous scan was empty + if (mode < 0) + ForkJoinTask.cancelIgnoringExceptions(task); + else if (c == (c = ctl) && (s = tryCompensate(c)) >= 0) + break; // block + } + else { // scan for subtasks + WorkQueue[] qs = queues; + int n = (qs == null) ? 0 : qs.length, m = n - 1; + for (int i = n; i > 0; i -= 2, r += 2) { + int j; WorkQueue q, x, y; ForkJoinTask[] a; + if ((q = qs[j = r & m]) != null) { + int sq = q.source & SMASK, cap, b; + if ((a = q.array) != null && (cap = a.length) > 0) { + int k = (cap - 1) & (b = q.base); + int nextBase = b + 1, src = j | SRC, sx; + ForkJoinTask t = WorkQueue.getSlot(a, k); + boolean eligible = sq == wid || + ((x = qs[sq & m]) != null && // indirect + ((sx = (x.source & SMASK)) == wid || + ((y = qs[sx & m]) != null && // 2-indirect + (y.source & SMASK) == wid))); + if ((s = task.status) < 0) + break outer; + else if ((q.source & SMASK) != sq || + q.base != b) + scan = true; // inconsistent + else if (t == null) + scan |= (a[nextBase & (cap - 1)] != null || + q.top != b); // lagging + else if (eligible) { + if (WorkQueue.casSlotToNull(a, k, t)) { + q.base = nextBase; + w.source = src; + t.doExec(); + w.source = wsrc; + } + scan = true; + break; + } + } } } - return true; } - else if (--n > 0) - j = (j + 1) & m; - else - break; } } - return false; + return s; } /** - * Helps and/or blocks until the given task is done or timeout. - * First tries locally helping, then scans other queues for a task - * produced by one of w's stealers; compensating and blocking if - * none are found (rescanning if tryCompensate fails). + * Extra helpJoin steps for CountedCompleters. Scans for and runs + * subtasks of the given root task, returning if none are found. * - * @param w caller - * @param task the task - * @param deadline for timed waits, if nonzero + * @param task root of CountedCompleter computation + * @param w caller's WorkQueue + * @param owned true if owned by a ForkJoinWorkerThread * @return task status on exit */ - final int awaitJoin(WorkQueue w, ForkJoinTask task, long deadline) { + final int helpComplete(ForkJoinTask task, WorkQueue w, boolean owned) { int s = 0; - int seed = ThreadLocalRandom.nextSecondarySeed(); - if (w != null && task != null && - (!(task instanceof CountedCompleter) || - (s = w.helpCC((CountedCompleter)task, 0, false)) >= 0)) { - w.tryRemoveAndExec(task); - int src = w.source, id = w.id; - int r = (seed >>> 16) | 1, step = (seed & ~1) | 2; - s = task.status; - while (s >= 0) { - WorkQueue[] ws; - int n = (ws = workQueues) == null ? 0 : ws.length, m = n - 1; - while (n > 0) { - WorkQueue q; int b; - if ((q = ws[r & m]) != null && q.source == id && - q.top != (b = q.base)) { - ForkJoinTask[] a; int cap, k; - int qid = q.id; - if ((a = q.array) != null && (cap = a.length) > 0) { - ForkJoinTask t = (ForkJoinTask) - QA.getAcquire(a, k = (cap - 1) & b); - if (q.source == id && q.base == b++ && - t != null && QA.compareAndSet(a, k, t, null)) { - q.base = b; - w.source = qid; - t.doExec(); - w.source = src; - } - } + if (task != null && w != null) { + int r = w.config; + boolean scan = true, locals = true; + long c = 0L; + outer: for (;;) { + if (locals) { // try locals before scanning + if ((s = w.helpComplete(task, owned, 0)) < 0) break; - } - else { - r += step; - --n; - } + locals = false; } - if ((s = task.status) < 0) + else if ((s = task.status) < 0) break; - else if (n == 0) { // empty scan - long ms, ns; int block; - if (deadline == 0L) - ms = 0L; // untimed - else if ((ns = deadline - System.nanoTime()) <= 0L) - break; // timeout - else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L) - ms = 1L; // avoid 0 for timed wait - if ((block = tryCompensate(w)) != 0) { - task.internalWait(ms); - CTL.getAndAdd(this, (block > 0) ? RC_UNIT : 0L); + else if (scan = !scan) { + if (c == (c = ctl)) + break; + } + else { // scan for subtasks + WorkQueue[] qs = queues; + int n = (qs == null) ? 0 : qs.length; + for (int i = n; i > 0; --i, ++r) { + int j, cap, b; WorkQueue q; ForkJoinTask[] a; + boolean eligible = false; + if ((q = qs[j = r & (n - 1)]) != null && + (a = q.array) != null && (cap = a.length) > 0) { + int k = (cap - 1) & (b = q.base), nextBase = b + 1; + ForkJoinTask t = WorkQueue.getSlot(a, k); + if (t instanceof CountedCompleter) { + CountedCompleter f = (CountedCompleter)t; + do {} while (!(eligible = (f == task)) && + (f = f.completer) != null); + } + if ((s = task.status) < 0) + break outer; + else if (q.base != b) + scan = true; // inconsistent + else if (t == null) + scan |= (a[nextBase & (cap - 1)] != null || + q.top != b); + else if (eligible) { + if (WorkQueue.casSlotToNull(a, k, t)) { + q.setBaseOpaque(nextBase); + t.doExec(); + locals = true; + } + scan = true; + break; + } + } } - s = task.status; } } } @@ -1747,112 +1946,164 @@ } /** + * Scans for and returns a polled task, if available. Used only + * for untracked polls. Begins scan at an index (scanRover) + * advanced on each call, to avoid systematic unfairness. + * + * @param submissionsOnly if true, only scan submission queues + */ + private ForkJoinTask pollScan(boolean submissionsOnly) { + VarHandle.acquireFence(); + int r = scanRover += 0x61c88647; // Weyl increment; raciness OK + if (submissionsOnly) // even indices only + r &= ~1; + int step = (submissionsOnly) ? 2 : 1; + WorkQueue[] qs; int n; + while ((qs = queues) != null && (n = qs.length) > 0) { + boolean scan = false; + for (int i = 0; i < n; i += step) { + int j, cap, b; WorkQueue q; ForkJoinTask[] a; + if ((q = qs[j = (n - 1) & (r + i)]) != null && + (a = q.array) != null && (cap = a.length) > 0) { + int k = (cap - 1) & (b = q.base), nextBase = b + 1; + ForkJoinTask t = WorkQueue.getSlot(a, k); + if (q.base != b) + scan = true; + else if (t == null) + scan |= (q.top != b || a[nextBase & (cap - 1)] != null); + else if (!WorkQueue.casSlotToNull(a, k, t)) + scan = true; + else { + q.setBaseOpaque(nextBase); + return t; + } + } + } + if (!scan && queues == qs) + break; + } + return null; + } + + /** * Runs tasks until {@code isQuiescent()}. Rather than blocking * when tasks cannot be found, rescans until all others cannot * find tasks either. - */ - final void helpQuiescePool(WorkQueue w) { - int prevSrc = w.source; - int seed = ThreadLocalRandom.nextSecondarySeed(); - int r = seed >>> 16, step = r | 1; - for (int source = prevSrc, released = -1;;) { // -1 until known - ForkJoinTask localTask; WorkQueue[] ws; - while ((localTask = w.nextLocalTask()) != null) - localTask.doExec(); - if (w.phase >= 0 && released == -1) - released = 1; - boolean quiet = true, empty = true; - int n = (ws = workQueues) == null ? 0 : ws.length; - for (int m = n - 1; n > 0; r += step, --n) { - WorkQueue q; int b; - if ((q = ws[r & m]) != null) { - int qs = q.source; - if (q.top != (b = q.base)) { - quiet = empty = false; - ForkJoinTask[] a; int cap, k; - int qid = q.id; - if ((a = q.array) != null && (cap = a.length) > 0) { - if (released == 0) { // increment - released = 1; - CTL.getAndAdd(this, RC_UNIT); - } - ForkJoinTask t = (ForkJoinTask) - QA.getAcquire(a, k = (cap - 1) & b); - if (q.base == b++ && t != null && - QA.compareAndSet(a, k, t, null)) { - q.base = b; - w.source = qid; - t.doExec(); - w.source = source = prevSrc; - } + * + * @param nanos max wait time (Long.MAX_VALUE if effectively untimed) + * @param interruptible true if return on interrupt + * @return positive if quiescent, negative if interrupted, else 0 + */ + final int helpQuiescePool(WorkQueue w, long nanos, boolean interruptible) { + if (w == null) + return 0; + long startTime = System.nanoTime(), parkTime = 0L; + int prevSrc = w.source, wsrc = prevSrc, cfg = w.config, r = cfg + 1; + for (boolean active = true, locals = true;;) { + boolean busy = false, scan = false; + if (locals) { // run local tasks before (re)polling + locals = false; + for (ForkJoinTask u; (u = w.nextLocalTask(cfg)) != null;) + u.doExec(); + } + WorkQueue[] qs = queues; + int n = (qs == null) ? 0 : qs.length; + for (int i = n; i > 0; --i, ++r) { + int j, b, cap; WorkQueue q; ForkJoinTask[] a; + if ((q = qs[j = (n - 1) & r]) != null && q != w && + (a = q.array) != null && (cap = a.length) > 0) { + int k = (cap - 1) & (b = q.base); + int nextBase = b + 1, src = j | SRC; + ForkJoinTask t = WorkQueue.getSlot(a, k); + if (q.base != b) + busy = scan = true; + else if (t != null) { + busy = scan = true; + if (!active) { // increment before taking + active = true; + getAndAddCtl(RC_UNIT); + } + if (WorkQueue.casSlotToNull(a, k, t)) { + q.base = nextBase; + w.source = src; + t.doExec(); + w.source = wsrc = prevSrc; + locals = true; } break; } - else if ((qs & QUIET) == 0) - quiet = false; + else if (!busy) { + if (q.top != b || a[nextBase & (cap - 1)] != null) + busy = scan = true; + else if (q.source != QUIET && q.phase >= 0) + busy = true; + } } } - if (quiet) { - if (released == 0) - CTL.getAndAdd(this, RC_UNIT); - w.source = prevSrc; - break; - } - else if (empty) { - if (source != QUIET) - w.source = source = QUIET; - if (released == 1) { // decrement - released = 0; - CTL.getAndAdd(this, RC_MASK & -RC_UNIT); + VarHandle.acquireFence(); + if (!scan && queues == qs) { + boolean interrupted; + if (!busy) { + w.source = prevSrc; + if (!active) + getAndAddCtl(RC_UNIT); + return 1; + } + if (wsrc != QUIET) + w.source = wsrc = QUIET; + if (active) { // decrement + active = false; + parkTime = 0L; + getAndAddCtl(RC_MASK & -RC_UNIT); + } + else if (parkTime == 0L) { + parkTime = 1L << 10; // initially about 1 usec + Thread.yield(); + } + else if ((interrupted = interruptible && Thread.interrupted()) || + System.nanoTime() - startTime > nanos) { + getAndAddCtl(RC_UNIT); + return interrupted ? -1 : 0; + } + else { + LockSupport.parkNanos(this, parkTime); + if (parkTime < nanos >>> 8 && parkTime < 1L << 20) + parkTime <<= 1; // max sleep approx 1 sec or 1% nanos } } } } /** - * Scans for and returns a polled task, if available. - * Used only for untracked polls. + * Helps quiesce from external caller until done, interrupted, or timeout * - * @param submissionsOnly if true, only scan submission queues + * @param nanos max wait time (Long.MAX_VALUE if effectively untimed) + * @param interruptible true if return on interrupt + * @return positive if quiescent, negative if interrupted, else 0 */ - private ForkJoinTask pollScan(boolean submissionsOnly) { - WorkQueue[] ws; int n; - rescan: while ((mode & STOP) == 0 && (ws = workQueues) != null && - (n = ws.length) > 0) { - int m = n - 1; - int r = ThreadLocalRandom.nextSecondarySeed(); - int h = r >>> 16; - int origin, step; - if (submissionsOnly) { - origin = (r & ~1) & m; // even indices and steps - step = (h & ~1) | 2; + final int externalHelpQuiescePool(long nanos, boolean interruptible) { + for (long startTime = System.nanoTime(), parkTime = 0L;;) { + ForkJoinTask t; + if ((t = pollScan(false)) != null) { + t.doExec(); + parkTime = 0L; } - else { - origin = r & m; - step = h | 1; + else if (canStop()) + return 1; + else if (parkTime == 0L) { + parkTime = 1L << 10; + Thread.yield(); } - boolean nonempty = false; - for (int i = origin, oldSum = 0, checkSum = 0;;) { - WorkQueue q; - if ((q = ws[i]) != null) { - int b; ForkJoinTask t; - if (q.top - (b = q.base) > 0) { - nonempty = true; - if ((t = q.poll()) != null) - return t; - } - else - checkSum += b + q.id; - } - if ((i = (i + step) & m) == origin) { - if (!nonempty && oldSum == (oldSum = checkSum)) - break rescan; - checkSum = 0; - nonempty = false; - } + else if ((System.nanoTime() - startTime) > nanos) + return 0; + else if (interruptible && Thread.interrupted()) + return -1; + else { + LockSupport.parkNanos(this, parkTime); + if (parkTime < nanos >>> 8 && parkTime < 1L << 20) + parkTime <<= 1; } } - return null; } /** @@ -1862,7 +2113,7 @@ */ final ForkJoinTask nextTaskFor(WorkQueue w) { ForkJoinTask t; - if (w == null || (t = w.nextLocalTask()) == null) + if (w == null || (t = w.nextLocalTask(w.config)) == null) t = pollScan(false); return t; } @@ -1870,115 +2121,94 @@ // External operations /** - * Adds the given task to a submission queue at submitter's - * current queue, creating one if null or contended. - * - * @param task the task. Caller must ensure non-null. + * Finds and locks a WorkQueue for an external submitter, or + * returns null if shutdown or terminating. */ - final void externalPush(ForkJoinTask task) { - int r; // initialize caller's probe + final WorkQueue submissionQueue() { + int r; if ((r = ThreadLocalRandom.getProbe()) == 0) { - ThreadLocalRandom.localInit(); + ThreadLocalRandom.localInit(); // initialize caller's probe r = ThreadLocalRandom.getProbe(); } - for (;;) { - WorkQueue q; - int md = mode, n; - WorkQueue[] ws = workQueues; - if ((md & SHUTDOWN) != 0 || ws == null || (n = ws.length) <= 0) - throw new RejectedExecutionException(); - else if ((q = ws[(n - 1) & r & SQMASK]) == null) { // add queue - int qid = (r | QUIET) & ~(FIFO | OWNED); - Object lock = workerNamePrefix; - ForkJoinTask[] qa = - new ForkJoinTask[INITIAL_QUEUE_CAPACITY]; - q = new WorkQueue(this, null); - q.array = qa; - q.id = qid; - q.source = QUIET; - if (lock != null) { // unless disabled, lock pool to install - synchronized (lock) { - WorkQueue[] vs; int i, vn; - if ((vs = workQueues) != null && (vn = vs.length) > 0 && - vs[i = qid & (vn - 1) & SQMASK] == null) - vs[i] = q; // else another thread already installed - } + for (int id = r << 1;;) { // even indices only + int md = mode, n, i; WorkQueue q; ReentrantLock lock; + WorkQueue[] qs = queues; + if ((md & SHUTDOWN) != 0 || qs == null || (n = qs.length) <= 0) + return null; + else if ((q = qs[i = (n - 1) & id]) == null) { + if ((lock = registrationLock) != null) { + WorkQueue w = new WorkQueue(id | SRC); + lock.lock(); // install under lock + if (qs[i] == null) + qs[i] = w; // else lost race; discard + lock.unlock(); } } - else if (!q.tryLockPhase()) // move if busy - r = ThreadLocalRandom.advanceProbe(r); - else { - if (q.lockedPush(task)) - signalWork(null); - return; - } + else if (!q.tryLock()) // move and restart + id = (r = ThreadLocalRandom.advanceProbe(r)) << 1; + else + return q; } } /** + * Adds the given task to an external submission queue, or throws + * exception if shutdown or terminating. + * + * @param task the task. Caller must ensure non-null. + */ + final void externalPush(ForkJoinTask task) { + WorkQueue q; + if ((q = submissionQueue()) == null) + throw new RejectedExecutionException(); // shutdown or disabled + else if (q.lockedPush(task)) + signalWork(); + } + + /** * Pushes a possibly-external submission. */ private ForkJoinTask externalSubmit(ForkJoinTask task) { - Thread t; ForkJoinWorkerThread w; WorkQueue q; + Thread t; ForkJoinWorkerThread wt; WorkQueue q; if (task == null) throw new NullPointerException(); if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) && - (w = (ForkJoinWorkerThread)t).pool == this && - (q = w.workQueue) != null) - q.push(task); + (q = (wt = (ForkJoinWorkerThread)t).workQueue) != null && + wt.pool == this) + q.push(task, this); else externalPush(task); return task; } /** - * Returns common pool queue for an external thread. - */ - static WorkQueue commonSubmitterQueue() { - ForkJoinPool p = common; - int r = ThreadLocalRandom.getProbe(); - WorkQueue[] ws; int n; - return (p != null && (ws = p.workQueues) != null && - (n = ws.length) > 0) ? - ws[(n - 1) & r & SQMASK] : null; - } - - /** - * Performs tryUnpush for an external submitter. - */ - final boolean tryExternalUnpush(ForkJoinTask task) { - int r = ThreadLocalRandom.getProbe(); - WorkQueue[] ws; WorkQueue w; int n; - return ((ws = workQueues) != null && - (n = ws.length) > 0 && - (w = ws[(n - 1) & r & SQMASK]) != null && - w.tryLockedUnpush(task)); - } - - /** - * Performs helpComplete for an external submitter. - */ - final int externalHelpComplete(CountedCompleter task, int maxTasks) { - int r = ThreadLocalRandom.getProbe(); - WorkQueue[] ws; WorkQueue w; int n; - return ((ws = workQueues) != null && (n = ws.length) > 0 && - (w = ws[(n - 1) & r & SQMASK]) != null) ? - w.helpCC(task, maxTasks, true) : 0; + * Returns common pool queue for an external thread that has + * possibly ever submitted a common pool task (nonzero probe), or + * null if none. + */ + static WorkQueue commonQueue() { + ForkJoinPool p; WorkQueue[] qs; + int r = ThreadLocalRandom.getProbe(), n; + return ((p = common) != null && (qs = p.queues) != null && + (n = qs.length) > 0 && r != 0) ? + qs[(n - 1) & (r << 1)] : null; } /** - * Tries to steal and run tasks within the target's computation. - * The maxTasks argument supports external usages; internal calls - * use zero, allowing unbounded steps (external calls trap - * non-positive values). - * - * @param w caller - * @param maxTasks if non-zero, the maximum number of other tasks to run - * @return task status on exit + * If the given executor is a ForkJoinPool, poll and execute + * AsynchronousCompletionTasks from worker's queue until none are + * available or blocker is released. */ - final int helpComplete(WorkQueue w, CountedCompleter task, - int maxTasks) { - return (w == null) ? 0 : w.helpCC(task, maxTasks, false); + static void helpAsyncBlocker(Executor e, ManagedBlocker blocker) { + WorkQueue w = null; Thread t; ForkJoinWorkerThread wt; + if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) { + if ((wt = (ForkJoinWorkerThread)t).pool == e) + w = wt.workQueue; + } + else if (e == common) + w = commonQueue(); + if (w != null) + w.helpAsyncBlocker(blocker); } /** @@ -2051,83 +2281,41 @@ * @return true if terminating or terminated */ private boolean tryTerminate(boolean now, boolean enable) { - int md; // 3 phases: try to set SHUTDOWN, then STOP, then TERMINATED - - while (((md = mode) & SHUTDOWN) == 0) { - if (!enable || this == common) // cannot shutdown + int md; // try to set SHUTDOWN, then STOP, then help terminate + if (((md = mode) & SHUTDOWN) == 0) { + if (!enable) return false; - else - MODE.compareAndSet(this, md, md | SHUTDOWN); + md = getAndBitwiseOrMode(SHUTDOWN); } - - while (((md = mode) & STOP) == 0) { // try to initiate termination - if (!now) { // check if quiescent & empty - for (long oldSum = 0L;;) { // repeat until stable - boolean running = false; - long checkSum = ctl; - WorkQueue[] ws = workQueues; - if ((md & SMASK) + (int)(checkSum >> RC_SHIFT) > 0) - running = true; - else if (ws != null) { - WorkQueue w; - for (int i = 0; i < ws.length; ++i) { - if ((w = ws[i]) != null) { - int s = w.source, p = w.phase; - int d = w.id, b = w.base; - if (b != w.top || - ((d & 1) == 1 && (s >= 0 || p >= 0))) { - running = true; - break; // working, scanning, or have work - } - checkSum += (((long)s << 48) + ((long)p << 32) + - ((long)b << 16) + (long)d); - } - } - } - if (((md = mode) & STOP) != 0) - break; // already triggered - else if (running) - return false; - else if (workQueues == ws && oldSum == (oldSum = checkSum)) - break; - } - } - if ((md & STOP) == 0) - MODE.compareAndSet(this, md, md | STOP); + if ((md & STOP) == 0) { + if (!now && !canStop()) + return false; + md = getAndBitwiseOrMode(STOP); } - - while (((md = mode) & TERMINATED) == 0) { // help terminate others - for (long oldSum = 0L;;) { // repeat until stable - WorkQueue[] ws; WorkQueue w; - long checkSum = ctl; - if ((ws = workQueues) != null) { - for (int i = 0; i < ws.length; ++i) { - if ((w = ws[i]) != null) { - ForkJoinWorkerThread wt = w.owner; - w.cancelAll(); // clear queues - if (wt != null) { - try { // unblock join or park - wt.interrupt(); - } catch (Throwable ignore) { - } - } - checkSum += ((long)w.phase << 32) + w.base; + for (int k = 0; k < 2; ++k) { // twice in case of lagging qs updates + for (ForkJoinTask t; (t = pollScan(false)) != null; ) + ForkJoinTask.cancelIgnoringExceptions(t); // help cancel + WorkQueue[] qs; int n; WorkQueue q; Thread thread; + if ((qs = queues) != null && (n = qs.length) > 0) { + for (int j = 1; j < n; j += 2) { // unblock other workers + if ((q = qs[j]) != null && (thread = q.owner) != null && + !thread.isInterrupted()) { + try { + thread.interrupt(); + } catch (Throwable ignore) { } } } - if (((md = mode) & TERMINATED) != 0 || - (workQueues == ws && oldSum == (oldSum = checkSum))) - break; } - if ((md & TERMINATED) != 0) - break; - else if ((md & SMASK) + (short)(ctl >>> TC_SHIFT) > 0) - break; - else if (MODE.compareAndSet(this, md, md | TERMINATED)) { - synchronized (this) { - notifyAll(); // for awaitTermination - } - break; + ReentrantLock lock; Condition cond; // signal when no workers + if (((md = mode) & TERMINATED) == 0 && + (md & SMASK) + (short)(ctl >>> TC_SHIFT) <= 0 && + (getAndBitwiseOrMode(TERMINATED) & TERMINATED) == 0 && + (lock = registrationLock) != null) { + lock.lock(); + if ((cond = termination) != null) + cond.signalAll(); + lock.unlock(); } } return true; @@ -2298,37 +2486,31 @@ Predicate saturate, long keepAliveTime, TimeUnit unit) { - // check, encode, pack parameters - if (parallelism <= 0 || parallelism > MAX_CAP || - maximumPoolSize < parallelism || keepAliveTime <= 0L) + checkPermission(); + int p = parallelism; + if (p <= 0 || p > MAX_CAP || p > maximumPoolSize || keepAliveTime <= 0L) throw new IllegalArgumentException(); - if (factory == null) + if (factory == null || unit == null) throw new NullPointerException(); - long ms = Math.max(unit.toMillis(keepAliveTime), TIMEOUT_SLOP); - - int corep = Math.min(Math.max(corePoolSize, parallelism), MAX_CAP); - long c = ((((long)(-corep) << TC_SHIFT) & TC_MASK) | - (((long)(-parallelism) << RC_SHIFT) & RC_MASK)); - int m = parallelism | (asyncMode ? FIFO : 0); - int maxSpares = Math.min(maximumPoolSize, MAX_CAP) - parallelism; - int minAvail = Math.min(Math.max(minimumRunnable, 0), MAX_CAP); - int b = ((minAvail - parallelism) & SMASK) | (maxSpares << SWIDTH); - int n = (parallelism > 1) ? parallelism - 1 : 1; // at least 2 slots - n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; - n = (n + 1) << 1; // power of two, including space for submission queues - - this.workerNamePrefix = "ForkJoinPool-" + nextPoolId() + "-worker-"; - this.workQueues = new WorkQueue[n]; this.factory = factory; this.ueh = handler; this.saturate = saturate; - this.keepAlive = ms; - this.bounds = b; - this.mode = m; - this.ctl = c; - checkPermission(); + this.keepAlive = Math.max(unit.toMillis(keepAliveTime), TIMEOUT_SLOP); + int size = 1 << (33 - Integer.numberOfLeadingZeros(p - 1)); + int corep = Math.min(Math.max(corePoolSize, p), MAX_CAP); + int maxSpares = Math.min(maximumPoolSize, MAX_CAP) - p; + int minAvail = Math.min(Math.max(minimumRunnable, 0), MAX_CAP); + this.bounds = ((minAvail - p) & SMASK) | (maxSpares << SWIDTH); + this.mode = p | (asyncMode ? FIFO : 0); + this.ctl = ((((long)(-corep) << TC_SHIFT) & TC_MASK) | + (((long)(-p) << RC_SHIFT) & RC_MASK)); + this.registrationLock = new ReentrantLock(); + this.queues = new WorkQueue[size]; + String pid = Integer.toString(getAndAddPoolIds(1) + 1); + this.workerNamePrefix = "ForkJoinPool-" + pid + "-worker-"; } + // helper method for commonPool constructor private static Object newInstanceFromSystemProperty(String property) throws ReflectiveOperationException { String className = System.getProperty(property); @@ -2343,49 +2525,33 @@ * overridden by system properties */ private ForkJoinPool(byte forCommonPoolOnly) { - int parallelism = -1; + int parallelism = Runtime.getRuntime().availableProcessors() - 1; ForkJoinWorkerThreadFactory fac = null; UncaughtExceptionHandler handler = null; try { // ignore exceptions in accessing/parsing properties - String pp = System.getProperty - ("java.util.concurrent.ForkJoinPool.common.parallelism"); - if (pp != null) - parallelism = Integer.parseInt(pp); fac = (ForkJoinWorkerThreadFactory) newInstanceFromSystemProperty( "java.util.concurrent.ForkJoinPool.common.threadFactory"); handler = (UncaughtExceptionHandler) newInstanceFromSystemProperty( "java.util.concurrent.ForkJoinPool.common.exceptionHandler"); + String pp = System.getProperty + ("java.util.concurrent.ForkJoinPool.common.parallelism"); + if (pp != null) + parallelism = Integer.parseInt(pp); } catch (Exception ignore) { } - - if (fac == null) { - if (System.getSecurityManager() == null) - fac = defaultForkJoinWorkerThreadFactory; - else // use security-managed default - fac = new InnocuousForkJoinWorkerThreadFactory(); - } - if (parallelism < 0 && // default 1 less than #cores - (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0) - parallelism = 1; - if (parallelism > MAX_CAP) - parallelism = MAX_CAP; - - long c = ((((long)(-parallelism) << TC_SHIFT) & TC_MASK) | - (((long)(-parallelism) << RC_SHIFT) & RC_MASK)); - int b = ((1 - parallelism) & SMASK) | (COMMON_MAX_SPARES << SWIDTH); - int n = (parallelism > 1) ? parallelism - 1 : 1; - n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; - n = (n + 1) << 1; - - this.workerNamePrefix = "ForkJoinPool.commonPool-worker-"; - this.workQueues = new WorkQueue[n]; - this.factory = fac; + int p = this.mode = Math.min(Math.max(parallelism, 0), MAX_CAP); + int size = 1 << (33 - Integer.numberOfLeadingZeros(p > 0 ? p - 1 : 1)); + this.factory = (fac != null) ? fac : + new DefaultCommonPoolForkJoinWorkerThreadFactory(); this.ueh = handler; - this.saturate = null; this.keepAlive = DEFAULT_KEEPALIVE; - this.bounds = b; - this.mode = parallelism; - this.ctl = c; + this.saturate = null; + this.workerNamePrefix = null; + this.bounds = ((1 - p) & SMASK) | (COMMON_MAX_SPARES << SWIDTH); + this.ctl = ((((long)(-p) << TC_SHIFT) & TC_MASK) | + (((long)(-p) << RC_SHIFT) & RC_MASK)); + this.queues = new WorkQueue[size]; + this.registrationLock = new ReentrantLock(); } /** @@ -2426,8 +2592,6 @@ * scheduled for execution */ public T invoke(ForkJoinTask task) { - if (task == null) - throw new NullPointerException(); externalSubmit(task); return task.join(); } @@ -2451,15 +2615,12 @@ * @throws RejectedExecutionException if the task cannot be * scheduled for execution */ + @Override + @SuppressWarnings("unchecked") public void execute(Runnable task) { - if (task == null) - throw new NullPointerException(); - ForkJoinTask job; - if (task instanceof ForkJoinTask) // avoid re-wrap - job = (ForkJoinTask) task; - else - job = new ForkJoinTask.RunnableExecuteAction(task); - externalSubmit(job); + externalSubmit((task instanceof ForkJoinTask) + ? (ForkJoinTask) task // avoid re-wrap + : new ForkJoinTask.RunnableExecuteAction(task)); } /** @@ -2481,6 +2642,7 @@ * @throws RejectedExecutionException if the task cannot be * scheduled for execution */ + @Override public ForkJoinTask submit(Callable task) { return externalSubmit(new ForkJoinTask.AdaptedCallable(task)); } @@ -2490,6 +2652,7 @@ * @throws RejectedExecutionException if the task cannot be * scheduled for execution */ + @Override public ForkJoinTask submit(Runnable task, T result) { return externalSubmit(new ForkJoinTask.AdaptedRunnable(task, result)); } @@ -2499,10 +2662,9 @@ * @throws RejectedExecutionException if the task cannot be * scheduled for execution */ + @Override @SuppressWarnings("unchecked") public ForkJoinTask submit(Runnable task) { - if (task == null) - throw new NullPointerException(); return externalSubmit((task instanceof ForkJoinTask) ? (ForkJoinTask) task // avoid re-wrap : new ForkJoinTask.AdaptedRunnableAction(task)); @@ -2512,28 +2674,183 @@ * @throws NullPointerException {@inheritDoc} * @throws RejectedExecutionException {@inheritDoc} */ + @Override public List> invokeAll(Collection> tasks) { - // In previous versions of this class, this method constructed - // a task to run ForkJoinTask.invokeAll, but now external - // invocation of multiple tasks is at least as efficient. ArrayList> futures = new ArrayList<>(tasks.size()); - try { for (Callable t : tasks) { - ForkJoinTask f = new ForkJoinTask.AdaptedCallable(t); + ForkJoinTask f = + new ForkJoinTask.AdaptedInterruptibleCallable(t); futures.add(f); externalSubmit(f); } - for (int i = 0, size = futures.size(); i < size; i++) + for (int i = futures.size() - 1; i >= 0; --i) ((ForkJoinTask)futures.get(i)).quietlyJoin(); return futures; } catch (Throwable t) { - for (int i = 0, size = futures.size(); i < size; i++) - futures.get(i).cancel(false); + for (Future e : futures) + ForkJoinTask.cancelIgnoringExceptions(e); + throw t; + } + } + + @Override + public List> invokeAll(Collection> tasks, + long timeout, TimeUnit unit) + throws InterruptedException { + long nanos = unit.toNanos(timeout); + ArrayList> futures = new ArrayList<>(tasks.size()); + try { + for (Callable t : tasks) { + ForkJoinTask f = + new ForkJoinTask.AdaptedInterruptibleCallable(t); + futures.add(f); + externalSubmit(f); + } + long startTime = System.nanoTime(), ns = nanos; + boolean timedOut = (ns < 0L); + for (int i = futures.size() - 1; i >= 0; --i) { + Future f = futures.get(i); + if (!f.isDone()) { + if (timedOut) + ForkJoinTask.cancelIgnoringExceptions(f); + else { + try { + f.get(ns, TimeUnit.NANOSECONDS); + } catch (CancellationException | TimeoutException | + ExecutionException ok) { + } + if ((ns = nanos - (System.nanoTime() - startTime)) < 0L) + timedOut = true; + } + } + } + return futures; + } catch (Throwable t) { + for (Future e : futures) + ForkJoinTask.cancelIgnoringExceptions(e); throw t; } } + // Task to hold results from InvokeAnyTasks + static final class InvokeAnyRoot extends ForkJoinTask { + private static final long serialVersionUID = 2838392045355241008L; + @SuppressWarnings("serial") // Conditionally serializable + volatile E result; + final AtomicInteger count; // in case all throw + final ForkJoinPool pool; // to check shutdown while collecting + InvokeAnyRoot(int n, ForkJoinPool p) { + pool = p; + count = new AtomicInteger(n); + } + final void tryComplete(Callable c) { // called by InvokeAnyTasks + Throwable ex = null; + boolean failed = (c == null || isCancelled() || + (pool != null && pool.mode < 0)); + if (!failed && !isDone()) { + try { + complete(c.call()); + } catch (Throwable tx) { + ex = tx; + failed = true; + } + } + if ((pool != null && pool.mode < 0) || + (failed && count.getAndDecrement() <= 1)) + trySetThrown(ex != null ? ex : new CancellationException()); + } + public final boolean exec() { return false; } // never forked + public final E getRawResult() { return result; } + public final void setRawResult(E v) { result = v; } + } + + // Variant of AdaptedInterruptibleCallable with results in InvokeAnyRoot + static final class InvokeAnyTask extends ForkJoinTask { + private static final long serialVersionUID = 2838392045355241008L; + final InvokeAnyRoot root; + @SuppressWarnings("serial") // Conditionally serializable + final Callable callable; + transient volatile Thread runner; + InvokeAnyTask(InvokeAnyRoot root, Callable callable) { + this.root = root; + this.callable = callable; + } + public final boolean exec() { + Thread.interrupted(); + runner = Thread.currentThread(); + root.tryComplete(callable); + runner = null; + Thread.interrupted(); + return true; + } + public final boolean cancel(boolean mayInterruptIfRunning) { + Thread t; + boolean stat = super.cancel(false); + if (mayInterruptIfRunning && (t = runner) != null) { + try { + t.interrupt(); + } catch (Throwable ignore) { + } + } + return stat; + } + public final void setRawResult(E v) {} // unused + public final E getRawResult() { return null; } + } + + @Override + public T invokeAny(Collection> tasks) + throws InterruptedException, ExecutionException { + int n = tasks.size(); + if (n <= 0) + throw new IllegalArgumentException(); + InvokeAnyRoot root = new InvokeAnyRoot(n, this); + ArrayList> fs = new ArrayList<>(n); + try { + for (Callable c : tasks) { + if (c == null) + throw new NullPointerException(); + InvokeAnyTask f = new InvokeAnyTask(root, c); + fs.add(f); + externalSubmit(f); + if (root.isDone()) + break; + } + return root.get(); + } finally { + for (InvokeAnyTask f : fs) + ForkJoinTask.cancelIgnoringExceptions(f); + } + } + + @Override + public T invokeAny(Collection> tasks, + long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + long nanos = unit.toNanos(timeout); + int n = tasks.size(); + if (n <= 0) + throw new IllegalArgumentException(); + InvokeAnyRoot root = new InvokeAnyRoot(n, this); + ArrayList> fs = new ArrayList<>(n); + try { + for (Callable c : tasks) { + if (c == null) + throw new NullPointerException(); + InvokeAnyTask f = new InvokeAnyTask(root, c); + fs.add(f); + externalSubmit(f); + if (root.isDone()) + break; + } + return root.get(nanos, TimeUnit.NANOSECONDS); + } finally { + for (InvokeAnyTask f : fs) + ForkJoinTask.cancelIgnoringExceptions(f); + } + } + /** * Returns the factory used for constructing new workers. * @@ -2604,12 +2921,12 @@ * @return the number of worker threads */ public int getRunningThreadCount() { - WorkQueue[] ws; WorkQueue w; VarHandle.acquireFence(); + WorkQueue[] qs; WorkQueue q; int rc = 0; - if ((ws = workQueues) != null) { - for (int i = 1; i < ws.length; i += 2) { - if ((w = ws[i]) != null && w.isApparentlyUnblocked()) + if ((qs = queues) != null) { + for (int i = 1; i < qs.length; i += 2) { + if ((q = qs[i]) != null && q.isApparentlyUnblocked()) ++rc; } } @@ -2640,30 +2957,7 @@ * @return {@code true} if all threads are currently idle */ public boolean isQuiescent() { - for (;;) { - long c = ctl; - int md = mode, pc = md & SMASK; - int tc = pc + (short)(c >>> TC_SHIFT); - int rc = pc + (int)(c >> RC_SHIFT); - if ((md & (STOP | TERMINATED)) != 0) - return true; - else if (rc > 0) - return false; - else { - WorkQueue[] ws; WorkQueue v; - if ((ws = workQueues) != null) { - for (int i = 1; i < ws.length; i += 2) { - if ((v = ws[i]) != null) { - if (v.source > 0) - return false; - --tc; - } - } - } - if (tc == 0 && ctl == c) - return true; - } - } + return canStop(); } /** @@ -2679,11 +2973,11 @@ */ public long getStealCount() { long count = stealCount; - WorkQueue[] ws; WorkQueue w; - if ((ws = workQueues) != null) { - for (int i = 1; i < ws.length; i += 2) { - if ((w = ws[i]) != null) - count += (long)w.nsteals & 0xffffffffL; + WorkQueue[] qs; WorkQueue q; + if ((qs = queues) != null) { + for (int i = 1; i < qs.length; i += 2) { + if ((q = qs[i]) != null) + count += (long)q.nsteals & 0xffffffffL; } } return count; @@ -2700,13 +2994,13 @@ * @return the number of queued tasks */ public long getQueuedTaskCount() { - WorkQueue[] ws; WorkQueue w; VarHandle.acquireFence(); + WorkQueue[] qs; WorkQueue q; int count = 0; - if ((ws = workQueues) != null) { - for (int i = 1; i < ws.length; i += 2) { - if ((w = ws[i]) != null) - count += w.queueSize(); + if ((qs = queues) != null) { + for (int i = 1; i < qs.length; i += 2) { + if ((q = qs[i]) != null) + count += q.queueSize(); } } return count; @@ -2720,13 +3014,13 @@ * @return the number of queued submissions */ public int getQueuedSubmissionCount() { - WorkQueue[] ws; WorkQueue w; VarHandle.acquireFence(); + WorkQueue[] qs; WorkQueue q; int count = 0; - if ((ws = workQueues) != null) { - for (int i = 0; i < ws.length; i += 2) { - if ((w = ws[i]) != null) - count += w.queueSize(); + if ((qs = queues) != null) { + for (int i = 0; i < qs.length; i += 2) { + if ((q = qs[i]) != null) + count += q.queueSize(); } } return count; @@ -2739,11 +3033,11 @@ * @return {@code true} if there are any queued submissions */ public boolean hasQueuedSubmissions() { - WorkQueue[] ws; WorkQueue w; VarHandle.acquireFence(); - if ((ws = workQueues) != null) { - for (int i = 0; i < ws.length; i += 2) { - if ((w = ws[i]) != null && !w.isEmpty()) + WorkQueue[] qs; WorkQueue q; + if ((qs = queues) != null) { + for (int i = 0; i < qs.length; i += 2) { + if ((q = qs[i]) != null && !q.isEmpty()) return true; } } @@ -2779,18 +3073,10 @@ * @return the number of elements transferred */ protected int drainTasksTo(Collection> c) { - WorkQueue[] ws; WorkQueue w; ForkJoinTask t; - VarHandle.acquireFence(); int count = 0; - if ((ws = workQueues) != null) { - for (int i = 0; i < ws.length; ++i) { - if ((w = ws[i]) != null) { - while ((t = w.poll()) != null) { - c.add(t); - ++count; - } - } - } + for (ForkJoinTask t; (t = pollScan(false)) != null; ) { + c.add(t); + ++count; } return count; } @@ -2803,22 +3089,22 @@ * @return a string identifying this pool, as well as its state */ public String toString() { - // Use a single pass through workQueues to collect counts + // Use a single pass through queues to collect counts int md = mode; // read volatile fields first long c = ctl; long st = stealCount; - long qt = 0L, qs = 0L; int rc = 0; - WorkQueue[] ws; WorkQueue w; - if ((ws = workQueues) != null) { - for (int i = 0; i < ws.length; ++i) { - if ((w = ws[i]) != null) { - int size = w.queueSize(); + long qt = 0L, ss = 0L; int rc = 0; + WorkQueue[] qs; WorkQueue q; + if ((qs = queues) != null) { + for (int i = 0; i < qs.length; ++i) { + if ((q = qs[i]) != null) { + int size = q.queueSize(); if ((i & 1) == 0) - qs += size; + ss += size; else { qt += size; - st += (long)w.nsteals & 0xffffffffL; - if (w.isApparentlyUnblocked()) + st += (long)q.nsteals & 0xffffffffL; + if (q.isApparentlyUnblocked()) ++rc; } } @@ -2842,7 +3128,7 @@ ", running = " + rc + ", steals = " + st + ", tasks = " + qt + - ", submissions = " + qs + + ", submissions = " + ss + "]"; } @@ -2862,7 +3148,8 @@ */ public void shutdown() { checkPermission(); - tryTerminate(false, true); + if (this != common) + tryTerminate(false, true); } /** @@ -2885,7 +3172,8 @@ */ public List shutdownNow() { checkPermission(); - tryTerminate(true, true); + if (this != common) + tryTerminate(true, true); return Collections.emptyList(); } @@ -2912,8 +3200,7 @@ * @return {@code true} if terminating but not yet terminated */ public boolean isTerminating() { - int md = mode; - return (md & STOP) != 0 && (md & TERMINATED) == 0; + return (mode & (STOP | TERMINATED)) == STOP; } /** @@ -2941,29 +3228,32 @@ */ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { - if (Thread.interrupted()) - throw new InterruptedException(); - if (this == common) { - awaitQuiescence(timeout, unit); - return false; - } + ReentrantLock lock; Condition cond; long nanos = unit.toNanos(timeout); - if (isTerminated()) - return true; - if (nanos <= 0L) - return false; - long deadline = System.nanoTime() + nanos; - synchronized (this) { - for (;;) { - if (isTerminated()) - return true; - if (nanos <= 0L) - return false; - long millis = TimeUnit.NANOSECONDS.toMillis(nanos); - wait(millis > 0L ? millis : 1L); - nanos = deadline - System.nanoTime(); + boolean terminated = false; + if (this == common) { + Thread t; ForkJoinWorkerThread wt; int q; + if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread && + (wt = (ForkJoinWorkerThread)t).pool == this) + q = helpQuiescePool(wt.workQueue, nanos, true); + else + q = externalHelpQuiescePool(nanos, true); + if (q < 0) + throw new InterruptedException(); + } + else if (!(terminated = ((mode & TERMINATED) != 0)) && + (lock = registrationLock) != null) { + lock.lock(); + try { + if ((cond = termination) == null) + termination = cond = lock.newCondition(); + while (!(terminated = ((mode & TERMINATED) != 0)) && nanos > 0L) + nanos = cond.awaitNanos(nanos); + } finally { + lock.unlock(); } } + return terminated; } /** @@ -2978,35 +3268,14 @@ * timeout elapsed. */ public boolean awaitQuiescence(long timeout, TimeUnit unit) { + Thread t; ForkJoinWorkerThread wt; int q; long nanos = unit.toNanos(timeout); - ForkJoinWorkerThread wt; - Thread thread = Thread.currentThread(); - if ((thread instanceof ForkJoinWorkerThread) && - (wt = (ForkJoinWorkerThread)thread).pool == this) { - helpQuiescePool(wt.workQueue); - return true; - } - else { - for (long startTime = System.nanoTime();;) { - ForkJoinTask t; - if ((t = pollScan(false)) != null) - t.doExec(); - else if (isQuiescent()) - return true; - else if ((System.nanoTime() - startTime) > nanos) - return false; - else - Thread.yield(); // cannot block - } - } - } - - /** - * Waits and/or attempts to assist performing tasks indefinitely - * until the {@link #commonPool()} {@link #isQuiescent}. - */ - static void quiesceCommonPool() { - common.awaitQuiescence(Long.MAX_VALUE, TimeUnit.NANOSECONDS); + if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread && + (wt = (ForkJoinWorkerThread)t).pool == this) + q = helpQuiescePool(wt.workQueue, nanos, false); + else + q = externalHelpQuiescePool(nanos, false); + return (q > 0); } /** @@ -3018,14 +3287,16 @@ * not necessary. Method {@link #block} blocks the current thread * if necessary (perhaps internally invoking {@code isReleasable} * before actually blocking). These actions are performed by any - * thread invoking {@link ForkJoinPool#managedBlock(ManagedBlocker)}. - * The unusual methods in this API accommodate synchronizers that - * may, but don't usually, block for long periods. Similarly, they - * allow more efficient internal handling of cases in which - * additional workers may be, but usually are not, needed to - * ensure sufficient parallelism. Toward this end, - * implementations of method {@code isReleasable} must be amenable - * to repeated invocation. + * thread invoking {@link + * ForkJoinPool#managedBlock(ManagedBlocker)}. The unusual + * methods in this API accommodate synchronizers that may, but + * don't usually, block for long periods. Similarly, they allow + * more efficient internal handling of cases in which additional + * workers may be, but usually are not, needed to ensure + * sufficient parallelism. Toward this end, implementations of + * method {@code isReleasable} must be amenable to repeated + * invocation. Neither method is invoked after a prior invocation + * of {@code isReleasable} or {@code block} returns {@code true}. * *

For example, here is a ManagedBlocker based on a * ReentrantLock: @@ -3111,79 +3382,64 @@ */ public static void managedBlock(ManagedBlocker blocker) throws InterruptedException { + Thread t; ForkJoinPool p; + if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread && + (p = ((ForkJoinWorkerThread)t).pool) != null) + p.compensatedBlock(blocker); + else + unmanagedBlock(blocker); + } + + /** ManagedBlock for ForkJoinWorkerThreads */ + private void compensatedBlock(ManagedBlocker blocker) + throws InterruptedException { if (blocker == null) throw new NullPointerException(); - ForkJoinPool p; - ForkJoinWorkerThread wt; - WorkQueue w; - Thread t = Thread.currentThread(); - if ((t instanceof ForkJoinWorkerThread) && - (p = (wt = (ForkJoinWorkerThread)t).pool) != null && - (w = wt.workQueue) != null) { - int block; - while (!blocker.isReleasable()) { - if ((block = p.tryCompensate(w)) != 0) { - try { - do {} while (!blocker.isReleasable() && - !blocker.block()); - } finally { - CTL.getAndAdd(p, (block > 0) ? RC_UNIT : 0L); - } - break; + for (;;) { + int comp; boolean done; + long c = ctl; + if (blocker.isReleasable()) + break; + if ((comp = tryCompensate(c)) >= 0) { + long post = (comp == 0) ? 0L : RC_UNIT; + try { + done = blocker.block(); + } finally { + getAndAddCtl(post); } + if (done) + break; } } - else { - do {} while (!blocker.isReleasable() && - !blocker.block()); - } } - /** - * If the given executor is a ForkJoinPool, poll and execute - * AsynchronousCompletionTasks from worker's queue until none are - * available or blocker is released. - */ - static void helpAsyncBlocker(Executor e, ManagedBlocker blocker) { - if (e instanceof ForkJoinPool) { - WorkQueue w; ForkJoinWorkerThread wt; WorkQueue[] ws; int r, n; - ForkJoinPool p = (ForkJoinPool)e; - Thread thread = Thread.currentThread(); - if (thread instanceof ForkJoinWorkerThread && - (wt = (ForkJoinWorkerThread)thread).pool == p) - w = wt.workQueue; - else if ((r = ThreadLocalRandom.getProbe()) != 0 && - (ws = p.workQueues) != null && (n = ws.length) > 0) - w = ws[(n - 1) & r & SQMASK]; - else - w = null; - if (w != null) - w.helpAsyncBlocker(blocker); - } + /** ManagedBlock for external threads */ + private static void unmanagedBlock(ManagedBlocker blocker) + throws InterruptedException { + if (blocker == null) throw new NullPointerException(); + do {} while (!blocker.isReleasable() && !blocker.block()); } - // AbstractExecutorService overrides. These rely on undocumented - // fact that ForkJoinTask.adapt returns ForkJoinTasks that also - // implement RunnableFuture. + // AbstractExecutorService.newTaskFor overrides rely on + // undocumented fact that ForkJoinTask.adapt returns ForkJoinTasks + // that also implement RunnableFuture. + @Override protected RunnableFuture newTaskFor(Runnable runnable, T value) { return new ForkJoinTask.AdaptedRunnable(runnable, value); } + @Override protected RunnableFuture newTaskFor(Callable callable) { return new ForkJoinTask.AdaptedCallable(callable); } - // VarHandle mechanics - private static final VarHandle CTL; - private static final VarHandle MODE; - static final VarHandle QA; - static { try { MethodHandles.Lookup l = MethodHandles.lookup(); CTL = l.findVarHandle(ForkJoinPool.class, "ctl", long.class); MODE = l.findVarHandle(ForkJoinPool.class, "mode", int.class); - QA = MethodHandles.arrayElementVarHandle(ForkJoinTask[].class); + THREADIDS = l.findVarHandle(ForkJoinPool.class, "threadIds", int.class); + POOLIDS = l.findStaticVarHandle(ForkJoinPool.class, "poolIds", int.class); } catch (ReflectiveOperationException e) { throw new ExceptionInInitializerError(e); } @@ -3204,38 +3460,10 @@ defaultForkJoinWorkerThreadFactory = new DefaultForkJoinWorkerThreadFactory(); modifyThreadPermission = new RuntimePermission("modifyThread"); - common = AccessController.doPrivileged(new PrivilegedAction<>() { public ForkJoinPool run() { return new ForkJoinPool((byte)0); }}); COMMON_PARALLELISM = Math.max(common.mode & SMASK, 1); } - - /** - * Factory for innocuous worker threads. - */ - private static final class InnocuousForkJoinWorkerThreadFactory - implements ForkJoinWorkerThreadFactory { - - /** - * An ACC to restrict permissions for the factory itself. - * The constructed workers have no permissions set. - */ - private static final AccessControlContext ACC = contextWithPermissions( - modifyThreadPermission, - new RuntimePermission("enableContextClassLoaderOverride"), - new RuntimePermission("modifyThreadGroup"), - new RuntimePermission("getClassLoader"), - new RuntimePermission("setContextClassLoader")); - - public final ForkJoinWorkerThread newThread(ForkJoinPool pool) { - return AccessController.doPrivileged( - new PrivilegedAction<>() { - public ForkJoinWorkerThread run() { - return new ForkJoinWorkerThread. - InnocuousForkJoinWorkerThread(pool); }}, - ACC); - } - } }