< prev index next >

src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java

Print this page
8246585: ForkJoin updates
Reviewed-by: martin

*** 47,57 **** --- 47,60 ---- import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.function.Predicate; + import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.LockSupport; + import java.util.concurrent.locks.ReentrantLock; + import java.util.concurrent.locks.Condition; /** * An {@link ExecutorService} for running {@link ForkJoinTask}s. * A {@code ForkJoinPool} provides the entry point for submissions * from non-{@code ForkJoinTask} clients, as well as management and
*** 161,171 **** * are used. It is possible to disable or limit the use of threads in * the common pool by setting the parallelism property to zero, and/or * using a factory that may return {@code null}. However doing so may * cause unjoined tasks to never be executed. * ! * <p><b>Implementation notes</b>: This implementation restricts the * maximum number of running threads to 32767. Attempts to create * pools with greater than the maximum number result in * {@code IllegalArgumentException}. * * <p>This implementation rejects submitted tasks (that is, by throwing --- 164,174 ---- * are used. It is possible to disable or limit the use of threads in * the common pool by setting the parallelism property to zero, and/or * using a factory that may return {@code null}. However doing so may * cause unjoined tasks to never be executed. * ! * <p><b>Implementation notes:</b> This implementation restricts the * maximum number of running threads to 32767. Attempts to create * pools with greater than the maximum number result in * {@code IllegalArgumentException}. * * <p>This implementation rejects submitted tasks (that is, by throwing
*** 228,339 **** * * Adding tasks then takes the form of a classic array push(task) * in a circular buffer: * q.array[q.top++ % length] = task; * ! * (The actual code needs to null-check and size-check the array, * uses masking, not mod, for indexing a power-of-two-sized array, ! * adds a release fence for publication, and possibly signals ! * waiting workers to start scanning -- see below.) Both a ! * successful pop and poll mainly entail a CAS of a slot from ! * non-null to null. ! * ! * The pop operation (always performed by owner) is: ! * if ((the task at top slot is not null) and ! * (CAS slot to null)) * decrement top and return task; * ! * And the poll operation (usually by a stealer) is ! * if ((the task at base slot is not null) and ! * (CAS slot to null)) * increment base and return task; * ! * There are several variants of each of these. Most uses occur ! * within operations that also interleave contention or emptiness ! * tracking or inspection of elements before extracting them, so ! * must interleave these with the above code. When performed by ! * owner, getAndSet is used instead of CAS (see for example method ! * nextLocalTask) which is usually more efficient, and possible ! * because the top index cannot independently change during the ! * operation. * * Memory ordering. See "Correct and Efficient Work-Stealing for * Weak Memory Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013 * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an * analysis of memory ordering requirements in work-stealing ! * algorithms similar to (but different than) the one used here. ! * Extracting tasks in array slots via (fully fenced) CAS provides ! * primary synchronization. The base and top indices imprecisely ! * guide where to extract from. We do not usually require strict ! * orderings of array and index updates. Many index accesses use ! * plain mode, with ordering constrained by surrounding context ! * (usually with respect to element CASes or the two WorkQueue ! * volatile fields source and phase). When not otherwise already ! * constrained, reads of "base" by queue owners use acquire-mode, ! * and some externally callable methods preface accesses with ! * acquire fences. Additionally, to ensure that index update ! * writes are not coalesced or postponed in loops etc, "opaque" ! * mode is used in a few cases where timely writes are not ! * otherwise ensured. The "locked" versions of push- and pop- ! * based methods for shared queues differ from owned versions ! * because locking already forces some of the ordering. * * Because indices and slot contents cannot always be consistent, ! * a check that base == top indicates (momentary) emptiness, but ! * otherwise may err on the side of possibly making the queue ! * appear nonempty when a push, pop, or poll have not fully ! * committed, or making it appear empty when an update of top has ! * not yet been visibly written. (Method isEmpty() checks the ! * case of a partially completed removal of the last element.) ! * Because of this, the poll operation, considered individually, ! * is not wait-free. One thief cannot successfully continue until ! * another in-progress one (or, if previously empty, a push) ! * visibly completes. This can stall threads when required to ! * consume from a given queue (see method poll()). However, in ! * the aggregate, we ensure at least probabilistic ! * non-blockingness. If an attempted steal fails, a scanning ! * thief chooses a different random victim target to try next. So, ! * in order for one thief to progress, it suffices for any ! * in-progress poll or new push on any empty queue to complete. * * This approach also enables support of a user mode in which * local task processing is in FIFO, not LIFO order, simply by * using poll rather than pop. This can be useful in ! * message-passing frameworks in which tasks are never joined. * * WorkQueues are also used in a similar way for tasks submitted * to the pool. We cannot mix these tasks in the same queues used * by workers. Instead, we randomly associate submission queues * with submitting threads, using a form of hashing. The * ThreadLocalRandom probe value serves as a hash code for * choosing existing queues, and may be randomly repositioned upon * contention with other submitters. In essence, submitters act * like workers except that they are restricted to executing local ! * tasks that they submitted. Insertion of tasks in shared mode ! * requires a lock but we use only a simple spinlock (using field ! * phase), because submitters encountering a busy queue move to a ! * different position to use or create other queues -- they block ! * only when creating and registering new queues. Because it is ! * used only as a spinlock, unlocking requires only a "releasing" ! * store (using setRelease) unless otherwise signalling. * * Management * ========== * * The main throughput advantages of work-stealing stem from * decentralized control -- workers mostly take tasks from * themselves or each other, at rates that can exceed a billion ! * per second. The pool itself creates, activates (enables ! * scanning for and running tasks), deactivates, blocks, and ! * terminates threads, all with minimal central information. ! * There are only a few properties that we can globally track or ! * maintain, so we pack them into a small number of variables, ! * often maintaining atomicity without blocking or locking. ! * Nearly all essentially atomic control state is held in a few ! * volatile variables that are by far most often read (not ! * written) as status and consistency checks. We pack as much ! * information into them as we can. * * Field "ctl" contains 64 bits holding information needed to * atomically decide to add, enqueue (on an event queue), and * dequeue and release workers. To enable this packing, we * restrict maximum parallelism to (1<<15)-1 (which is far in --- 231,359 ---- * * Adding tasks then takes the form of a classic array push(task) * in a circular buffer: * q.array[q.top++ % length] = task; * ! * The actual code needs to null-check and size-check the array, * uses masking, not mod, for indexing a power-of-two-sized array, ! * enforces memory ordering, supports resizing, and possibly ! * signals waiting workers to start scanning -- see below. ! * ! * The pop operation (always performed by owner) is of the form: ! * if ((task = getAndSet(q.array, (q.top-1) % length, null)) != null) * decrement top and return task; + * If this fails, the queue is empty. * ! * The poll operation by another stealer thread is, basically: ! * if (CAS nonnull task at q.array[q.base % length] to null) * increment base and return task; * ! * This may fail due to contention, and may be retried. ! * Implementations must ensure a consistent snapshot of the base ! * index and the task (by looping or trying elsewhere) before ! * trying CAS. There isn't actually a method of this form, ! * because failure due to inconsistency or contention is handled ! * in different ways in different contexts, normally by first ! * trying other queues. (For the most straightforward example, see ! * method pollScan.) There are further variants for cases ! * requiring inspection of elements before extracting them, so ! * must interleave these with variants of this code. Also, a more ! * efficient version (nextLocalTask) is used for polls by owners. ! * It avoids some overhead because the queue cannot be growing ! * during call. * * Memory ordering. See "Correct and Efficient Work-Stealing for * Weak Memory Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013 * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an * analysis of memory ordering requirements in work-stealing ! * algorithms similar to the one used here. Inserting and ! * extracting tasks in array slots via volatile or atomic accesses ! * or explicit fences provides primary synchronization. ! * ! * Operations on deque elements require reads and writes of both ! * indices and slots. When possible, we allow these to occur in ! * any order. Because the base and top indices (along with other ! * pool or array fields accessed in many methods) only imprecisely ! * guide where to extract from, we let accesses other than the ! * element getAndSet/CAS/setVolatile appear in any order, using ! * plain mode. But we must still preface some methods (mainly ! * those that may be accessed externally) with an acquireFence to ! * avoid unbounded staleness. This is equivalent to acting as if ! * callers use an acquiring read of the reference to the pool or ! * queue when invoking the method, even when they do not. We use ! * explicit acquiring reads (getSlot) rather than plain array ! * access when acquire mode is required but not otherwise ensured ! * by context. To reduce stalls by other stealers, we encourage ! * timely writes to the base index by immediately following ! * updates with a write of a volatile field that must be updated ! * anyway, or an Opaque-mode write if there is no such ! * opportunity. * * Because indices and slot contents cannot always be consistent, ! * the emptiness check base == top is only quiescently accurate ! * (and so used where this suffices). Otherwise, it may err on the ! * side of possibly making the queue appear nonempty when a push, ! * pop, or poll have not fully committed, or making it appear ! * empty when an update of top or base has not yet been seen. ! * Similarly, the check in push for the queue array being full may ! * trigger when not completely full, causing a resize earlier than ! * required. ! * ! * Mainly because of these potential inconsistencies among slots ! * vs indices, the poll operation, considered individually, is not ! * wait-free. One thief cannot successfully continue until another ! * in-progress one (or, if previously empty, a push) visibly ! * completes. This can stall threads when required to consume ! * from a given queue (which may spin). However, in the ! * aggregate, we ensure probabilistic non-blockingness at least ! * until checking quiescence (which is intrinsically blocking): ! * If an attempted steal fails, a scanning thief chooses a ! * different victim target to try next. So, in order for one thief ! * to progress, it suffices for any in-progress poll or new push ! * on any empty queue to complete. The worst cases occur when many ! * threads are looking for tasks being produced by a stalled ! * producer. * * This approach also enables support of a user mode in which * local task processing is in FIFO, not LIFO order, simply by * using poll rather than pop. This can be useful in ! * message-passing frameworks in which tasks are never joined, ! * although with increased contention among task producers and ! * consumers. * * WorkQueues are also used in a similar way for tasks submitted * to the pool. We cannot mix these tasks in the same queues used * by workers. Instead, we randomly associate submission queues * with submitting threads, using a form of hashing. The * ThreadLocalRandom probe value serves as a hash code for * choosing existing queues, and may be randomly repositioned upon * contention with other submitters. In essence, submitters act * like workers except that they are restricted to executing local ! * tasks that they submitted (or when known, subtasks thereof). ! * Insertion of tasks in shared mode requires a lock. We use only ! * a simple spinlock (using field "source"), because submitters ! * encountering a busy queue move to a different position to use ! * or create other queues. They block only when registering new ! * queues. * * Management * ========== * * The main throughput advantages of work-stealing stem from * decentralized control -- workers mostly take tasks from * themselves or each other, at rates that can exceed a billion ! * per second. Most non-atomic control is performed by some form ! * of scanning across or within queues. The pool itself creates, ! * activates (enables scanning for and running tasks), ! * deactivates, blocks, and terminates threads, all with minimal ! * central information. There are only a few properties that we ! * can globally track or maintain, so we pack them into a small ! * number of variables, often maintaining atomicity without ! * blocking or locking. Nearly all essentially atomic control ! * state is held in a few volatile variables that are by far most ! * often read (not written) as status and consistency checks. We ! * pack as much information into them as we can. * * Field "ctl" contains 64 bits holding information needed to * atomically decide to add, enqueue (on an event queue), and * dequeue and release workers. To enable this packing, we * restrict maximum parallelism to (1<<15)-1 (which is far in
*** 341,561 **** * their negations (used for thresholding) to fit into 16bit * subfields. * * Field "mode" holds configuration parameters as well as lifetime * status, atomically and monotonically setting SHUTDOWN, STOP, ! * and finally TERMINATED bits. * ! * Field "workQueues" holds references to WorkQueues. It is ! * updated (only during worker creation and termination) under ! * lock (using field workerNamePrefix as lock), but is otherwise ! * concurrently readable, and accessed directly. We also ensure ! * that uses of the array reference itself never become too stale ! * in case of resizing, by arranging that (re-)reads are separated ! * by at least one acquiring read access. To simplify index-based ! * operations, the array size is always a power of two, and all ! * readers must tolerate null slots. Worker queues are at odd ! * indices. Shared (submission) queues are at even indices, up to ! * a maximum of 64 slots, to limit growth even if the array needs ! * to expand to add more workers. Grouping them together in this ! * way simplifies and speeds up task scanning. * * All worker thread creation is on-demand, triggered by task * submissions, replacement of terminated workers, and/or * compensation for blocked workers. However, all other support * code is set up to work with other policies. To ensure that we ! * do not hold on to worker references that would prevent GC, all ! * accesses to workQueues are via indices into the workQueues ! * array (which is one source of some of the messy code ! * constructions here). In essence, the workQueues array serves as * a weak reference mechanism. Thus for example the stack top * subfield of ctl stores indices, not references. * * Queuing Idle Workers. Unlike HPC work-stealing frameworks, we * cannot let workers spin indefinitely scanning for tasks when * none can be found immediately, and we cannot start/resume * workers unless there appear to be tasks available. On the * other hand, we must quickly prod them into action when new ! * tasks are submitted or generated. In many usages, ramp-up time * is the main limiting factor in overall performance, which is * compounded at program start-up by JIT compilation and ! * allocation. So we streamline this as much as possible. * ! * The "ctl" field atomically maintains total worker and ! * "released" worker counts, plus the head of the available worker ! * queue (actually stack, represented by the lower 32bit subfield ! * of ctl). Released workers are those known to be scanning for * and/or running tasks. Unreleased ("available") workers are * recorded in the ctl stack. These workers are made available for ! * signalling by enqueuing in ctl (see method runWorker). The * "queue" is a form of Treiber stack. This is ideal for * activating threads in most-recently used order, and improves * performance and locality, outweighing the disadvantages of * being prone to contention and inability to release a worker ! * unless it is topmost on stack. To avoid missed signal problems ! * inherent in any wait/signal design, available workers rescan ! * for (and if found run) tasks after enqueuing. Normally their ! * release status will be updated while doing so, but the released ! * worker ctl count may underestimate the number of active ! * threads. (However, it is still possible to determine quiescence ! * via a validation traversal -- see isQuiescent). After an ! * unsuccessful rescan, available workers are blocked until ! * signalled (see signalWork). The top stack state holds the * value of the "phase" field of the worker: its index and status, * plus a version counter that, in addition to the count subfields * (also serving as version stamps) provide protection against * Treiber stack ABA effects. * * Creating workers. To create a worker, we pre-increment counts * (serving as a reservation), and attempt to construct a ! * ForkJoinWorkerThread via its factory. Upon construction, the ! * new thread invokes registerWorker, where it constructs a ! * WorkQueue and is assigned an index in the workQueues array ! * (expanding the array if necessary). The thread is then started. ! * Upon any exception across these steps, or null return from ! * factory, deregisterWorker adjusts counts and records ! * accordingly. If a null return, the pool continues running with ! * fewer than the target number workers. If exceptional, the ! * exception is propagated, generally to some external caller. ! * Worker index assignment avoids the bias in scanning that would ! * occur if entries were sequentially packed starting at the front ! * of the workQueues array. We treat the array as a simple ! * power-of-two hash table, expanding as needed. The seedIndex ! * increment ensures no collisions until a resize is needed or a ! * worker is deregistered and replaced, and thereafter keeps ! * probability of collision low. We cannot use ! * ThreadLocalRandom.getProbe() for similar purposes here because ! * the thread has not started yet, but do so for creating ! * submission queues for existing external threads (see ! * externalPush). * * WorkQueue field "phase" is used by both workers and the pool to * manage and track whether a worker is UNSIGNALLED (possibly * blocked waiting for a signal). When a worker is enqueued its ! * phase field is set. Note that phase field updates lag queue CAS ! * releases so usage requires care -- seeing a negative phase does ! * not guarantee that the worker is available. When queued, the ! * lower 16 bits of scanState must hold its pool index. So we ! * place the index there upon initialization and otherwise keep it ! * there or restore it when necessary. * * The ctl field also serves as the basis for memory * synchronization surrounding activation. This uses a more * efficient version of a Dekker-like rule that task producers and * consumers sync with each other by both writing/CASing ctl (even ! * if to its current value). This would be extremely costly. So ! * we relax it in several ways: (1) Producers only signal when ! * their queue is possibly empty at some point during a push ! * operation. (2) Other workers propagate this signal ! * when they find tasks in a queue with size greater than one. (3) ! * Workers only enqueue after scanning (see below) and not finding ! * any tasks. (4) Rather than CASing ctl to its current value in ! * the common case where no action is required, we reduce write ! * contention by equivalently prefacing signalWork when called by ! * an external task producer using a memory access with ! * full-volatile semantics or a "fullFence". ! * ! * Almost always, too many signals are issued, in part because a ! * task producer cannot tell if some existing worker is in the ! * midst of finishing one task (or already scanning) and ready to ! * take another without being signalled. So the producer might ! * instead activate a different worker that does not find any ! * work, and then inactivates. This scarcely matters in ! * steady-state computations involving all workers, but can create ! * contention and bookkeeping bottlenecks during ramp-up, * ramp-down, and small computations involving only a few workers. * ! * Scanning. Method scan (from runWorker) performs top-level ! * scanning for tasks. (Similar scans appear in helpQuiesce and ! * pollScan.) Each scan traverses and tries to poll from each ! * queue starting at a random index. Scans are not performed in ! * ideal random permutation order, to reduce cacheline ! * contention. The pseudorandom generator need not have ! * high-quality statistical properties in the long term, but just ! * within computations; We use Marsaglia XorShifts (often via ! * ThreadLocalRandom.nextSecondarySeed), which are cheap and ! * suffice. Scanning also includes contention reduction: When ! * scanning workers fail to extract an apparently existing task, ! * they soon restart at a different pseudorandom index. This form ! * of backoff improves throughput when many threads are trying to ! * take tasks from few queues, which can be common in some usages. ! * Scans do not otherwise explicitly take into account core ! * affinities, loads, cache localities, etc, However, they do * exploit temporal locality (which usually approximates these) by * preferring to re-poll from the same queue after a successful ! * poll before trying others (see method topLevelExec). However ! * this preference is bounded (see TOP_BOUND_SHIFT) as a safeguard ! * against infinitely unfair looping under unbounded user task ! * recursion, and also to reduce long-term contention when many ! * threads poll few queues holding many small tasks. The bound is ! * high enough to avoid much impact on locality and scheduling ! * overhead. * * Trimming workers. To release resources after periods of lack of * use, a worker starting to wait when the pool is quiescent will ! * time out and terminate (see method runWorker) if the pool has ! * remained quiescent for period given by field keepAlive. * * Shutdown and Termination. A call to shutdownNow invokes ! * tryTerminate to atomically set a runState bit. The calling ! * thread, as well as every other worker thereafter terminating, ! * helps terminate others by cancelling their unprocessed tasks, ! * and waking them up, doing so repeatedly until stable. Calls to ! * non-abrupt shutdown() preface this by checking whether ! * termination should commence by sweeping through queues (until ! * stable) to ensure lack of in-flight submissions and workers ! * about to process them before triggering the "STOP" phase of * termination. * * Joining Tasks * ============= * ! * Any of several actions may be taken when one worker is waiting * to join a task stolen (or always held) by another. Because we * are multiplexing many tasks on to a pool of workers, we can't * always just let them block (as in Thread.join). We also cannot * just reassign the joiner's run-time stack with another and * replace it later, which would be a form of "continuation", that * even if possible is not necessarily a good idea since we may * need both an unblocked task and its continuation to progress. * Instead we combine two tactics: * * Helping: Arranging for the joiner to execute some task that it ! * would be running if the steal had not occurred. * * Compensating: Unless there are already enough live threads, * method tryCompensate() may create or re-activate a spare * thread to compensate for blocked joiners until they unblock. * ! * A third form (implemented in tryRemoveAndExec) amounts to ! * helping a hypothetical compensator: If we can readily tell that ! * a possible action of a compensator is to steal and execute the * task being joined, the joining thread can do so directly, ! * without the need for a compensation thread. * * The ManagedBlocker extension API can't use helping so relies * only on compensation in method awaitBlocker. * ! * The algorithm in awaitJoin entails a form of "linear helping". ! * Each worker records (in field source) the id of the queue from ! * which it last stole a task. The scan in method awaitJoin uses ! * these markers to try to find a worker to help (i.e., steal back ! * a task from and execute it) that could hasten completion of the ! * actively joined task. Thus, the joiner executes a task that ! * would be on its own local deque if the to-be-joined task had ! * not been stolen. This is a conservative variant of the approach ! * described in Wagner & Calder "Leapfrogging: a portable * technique for implementing efficient futures" SIGPLAN Notices, * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs * mainly in that we only record queue ids, not full dependency ! * links. This requires a linear scan of the workQueues array to * locate stealers, but isolates cost to when it is needed, rather ! * than adding to per-task overhead. Searches can fail to locate ! * stealers GC stalls and the like delay recording sources. ! * Further, even when accurately identified, stealers might not ! * ever produce a task that the joiner can in turn help with. So, ! * compensation is tried upon failure to find tasks to run. * * Compensation does not by default aim to keep exactly the target * parallelism number of unblocked threads running at any given * time. Some previous versions of this class employed immediate * compensations for any blocked join. However, in practice, the --- 361,602 ---- * their negations (used for thresholding) to fit into 16bit * subfields. * * Field "mode" holds configuration parameters as well as lifetime * status, atomically and monotonically setting SHUTDOWN, STOP, ! * and finally TERMINATED bits. It is updated only via bitwise ! * atomics (getAndBitwiseOr). * ! * Array "queues" holds references to WorkQueues. It is updated ! * (only during worker creation and termination) under the ! * registrationLock, but is otherwise concurrently readable, and ! * accessed directly (although always prefaced by acquireFences or ! * other acquiring reads). To simplify index-based operations, the ! * array size is always a power of two, and all readers must ! * tolerate null slots. Worker queues are at odd indices. Worker ! * ids masked with SMASK match their index. Shared (submission) ! * queues are at even indices. Grouping them together in this way ! * simplifies and speeds up task scanning. * * All worker thread creation is on-demand, triggered by task * submissions, replacement of terminated workers, and/or * compensation for blocked workers. However, all other support * code is set up to work with other policies. To ensure that we ! * do not hold on to worker or task references that would prevent ! * GC, all accesses to workQueues are via indices into the ! * queues array (which is one source of some of the messy code ! * constructions here). In essence, the queues array serves as * a weak reference mechanism. Thus for example the stack top * subfield of ctl stores indices, not references. * * Queuing Idle Workers. Unlike HPC work-stealing frameworks, we * cannot let workers spin indefinitely scanning for tasks when * none can be found immediately, and we cannot start/resume * workers unless there appear to be tasks available. On the * other hand, we must quickly prod them into action when new ! * tasks are submitted or generated. These latencies are mainly a ! * function of JVM park/unpark (and underlying OS) performance, ! * which can be slow and variable. In many usages, ramp-up time * is the main limiting factor in overall performance, which is * compounded at program start-up by JIT compilation and ! * allocation. On the other hand, throughput degrades when too ! * many threads poll for too few tasks. * ! * The "ctl" field atomically maintains total and "released" ! * worker counts, plus the head of the available worker queue ! * (actually stack, represented by the lower 32bit subfield of ! * ctl). Released workers are those known to be scanning for * and/or running tasks. Unreleased ("available") workers are * recorded in the ctl stack. These workers are made available for ! * signalling by enqueuing in ctl (see method awaitWork). The * "queue" is a form of Treiber stack. This is ideal for * activating threads in most-recently used order, and improves * performance and locality, outweighing the disadvantages of * being prone to contention and inability to release a worker ! * unless it is topmost on stack. The top stack state holds the * value of the "phase" field of the worker: its index and status, * plus a version counter that, in addition to the count subfields * (also serving as version stamps) provide protection against * Treiber stack ABA effects. * * Creating workers. To create a worker, we pre-increment counts * (serving as a reservation), and attempt to construct a ! * ForkJoinWorkerThread via its factory. On starting, the new ! * thread first invokes registerWorker, where it constructs a ! * WorkQueue and is assigned an index in the queues array ! * (expanding the array if necessary). Upon any exception across ! * these steps, or null return from factory, deregisterWorker ! * adjusts counts and records accordingly. If a null return, the ! * pool continues running with fewer than the target number ! * workers. If exceptional, the exception is propagated, generally ! * to some external caller. * * WorkQueue field "phase" is used by both workers and the pool to * manage and track whether a worker is UNSIGNALLED (possibly * blocked waiting for a signal). When a worker is enqueued its ! * phase field is set negative. Note that phase field updates lag ! * queue CAS releases; seeing a negative phase does not guarantee ! * that the worker is available. When queued, the lower 16 bits of ! * its phase must hold its pool index. So we place the index there ! * upon initialization and never modify these bits. * * The ctl field also serves as the basis for memory * synchronization surrounding activation. This uses a more * efficient version of a Dekker-like rule that task producers and * consumers sync with each other by both writing/CASing ctl (even ! * if to its current value). However, rather than CASing ctl to ! * its current value in the common case where no action is ! * required, we reduce write contention by ensuring that ! * signalWork invocations are prefaced with a full-volatile memory ! * access (which is usually needed anyway). ! * ! * Signalling. Signals (in signalWork) cause new or reactivated ! * workers to scan for tasks. Method signalWork and its callers ! * try to approximate the unattainable goal of having the right ! * number of workers activated for the tasks at hand, but must err ! * on the side of too many workers vs too few to avoid stalls. If ! * computations are purely tree structured, it suffices for every ! * worker to activate another when it pushes a task into an empty ! * queue, resulting in O(log(#threads)) steps to full activation. ! * If instead, tasks come in serially from only a single producer, ! * each worker taking its first (since the last quiescence) task ! * from a queue should signal another if there are more tasks in ! * that queue. This is equivalent to, but generally faster than, ! * arranging the stealer take two tasks, re-pushing one on its own ! * queue, and signalling (because its queue is empty), also ! * resulting in logarithmic full activation time. Because we don't ! * know about usage patterns (or most commonly, mixtures), we use ! * both approaches. We approximate the second rule by arranging ! * that workers in scan() do not repeat signals when repeatedly ! * taking tasks from any given queue, by remembering the previous ! * one. There are narrow windows in which both rules may apply, ! * leading to duplicate or unnecessary signals. Despite such ! * limitations, these rules usually avoid slowdowns that otherwise ! * occur when too many workers contend to take too few tasks, or ! * when producers waste most of their time resignalling. However, ! * contention and overhead effects may still occur during ramp-up, * ramp-down, and small computations involving only a few workers. * ! * Scanning. Method scan performs top-level scanning for (and ! * execution of) tasks. Scans by different workers and/or at ! * different times are unlikely to poll queues in the same ! * order. Each scan traverses and tries to poll from each queue in ! * a pseudorandom permutation order by starting at a random index, ! * and using a constant cyclically exhaustive stride; restarting ! * upon contention. (Non-top-level scans; for example in ! * helpJoin, use simpler linear probes because they do not ! * systematically contend with top-level scans.) The pseudorandom ! * generator need not have high-quality statistical properties in ! * the long term. We use Marsaglia XorShifts, seeded with the Weyl ! * sequence from ThreadLocalRandom probes, which are cheap and ! * suffice. Scans do not otherwise explicitly take into account ! * core affinities, loads, cache localities, etc, However, they do * exploit temporal locality (which usually approximates these) by * preferring to re-poll from the same queue after a successful ! * poll before trying others (see method topLevelExec). This ! * reduces fairness, which is partially counteracted by using a ! * one-shot form of poll (tryPoll) that may lose to other workers. ! * ! * Deactivation. Method scan returns a sentinel when no tasks are ! * found, leading to deactivation (see awaitWork). The count ! * fields in ctl allow accurate discovery of quiescent states ! * (i.e., when all workers are idle) after deactivation. However, ! * this may also race with new (external) submissions, so a ! * recheck is also needed to determine quiescence. Upon apparently ! * triggering quiescence, awaitWork re-scans and self-signals if ! * it may have missed a signal. In other cases, a missed signal ! * may transiently lower parallelism because deactivation does not ! * necessarily mean that there is no more work, only that that ! * there were no tasks not taken by other workers. But more ! * signals are generated (see above) to eventually reactivate if ! * needed. * * Trimming workers. To release resources after periods of lack of * use, a worker starting to wait when the pool is quiescent will ! * time out and terminate if the pool has remained quiescent for ! * period given by field keepAlive. * * Shutdown and Termination. A call to shutdownNow invokes ! * tryTerminate to atomically set a mode bit. The calling thread, ! * as well as every other worker thereafter terminating, helps ! * terminate others by cancelling their unprocessed tasks, and ! * waking them up. Calls to non-abrupt shutdown() preface this by ! * checking isQuiescent before triggering the "STOP" phase of * termination. * * Joining Tasks * ============= * ! * Normally, the first option when joining a task that is not done ! * is to try to unfork it from local queue and run it. Otherwise, ! * any of several actions may be taken when one worker is waiting * to join a task stolen (or always held) by another. Because we * are multiplexing many tasks on to a pool of workers, we can't * always just let them block (as in Thread.join). We also cannot * just reassign the joiner's run-time stack with another and * replace it later, which would be a form of "continuation", that * even if possible is not necessarily a good idea since we may * need both an unblocked task and its continuation to progress. * Instead we combine two tactics: * * Helping: Arranging for the joiner to execute some task that it ! * could be running if the steal had not occurred. * * Compensating: Unless there are already enough live threads, * method tryCompensate() may create or re-activate a spare * thread to compensate for blocked joiners until they unblock. * ! * A third form (implemented via tryRemove) amounts to helping a ! * hypothetical compensator: If we can readily tell that a ! * possible action of a compensator is to steal and execute the * task being joined, the joining thread can do so directly, ! * without the need for a compensation thread; although with a ! * (rare) possibility of reduced parallelism because of a ! * transient gap in the queue array. ! * ! * Other intermediate forms available for specific task types (for ! * example helpAsyncBlocker) often avoid or postpone the need for ! * blocking or compensation. * * The ManagedBlocker extension API can't use helping so relies * only on compensation in method awaitBlocker. * ! * The algorithm in helpJoin entails a form of "linear helping". ! * Each worker records (in field "source") the id of the queue ! * from which it last stole a task. The scan in method helpJoin ! * uses these markers to try to find a worker to help (i.e., steal ! * back a task from and execute it) that could hasten completion ! * of the actively joined task. Thus, the joiner executes a task ! * that would be on its own local deque if the to-be-joined task ! * had not been stolen. This is a conservative variant of the ! * approach described in Wagner & Calder "Leapfrogging: a portable * technique for implementing efficient futures" SIGPLAN Notices, * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs * mainly in that we only record queue ids, not full dependency ! * links. This requires a linear scan of the queues array to * locate stealers, but isolates cost to when it is needed, rather ! * than adding to per-task overhead. Also, searches are limited to ! * direct and at most two levels of indirect stealers, after which ! * there are rapidly diminishing returns on increased overhead. ! * Searches can fail to locate stealers when stalls delay ! * recording sources. Further, even when accurately identified, ! * stealers might not ever produce a task that the joiner can in ! * turn help with. So, compensation is tried upon failure to find ! * tasks to run. ! * ! * Joining CountedCompleters (see helpComplete) differs from (and ! * is generally more efficient than) other cases because task ! * eligibility is determined by checking completion chains rather ! * than tracking stealers. ! * ! * Joining under timeouts (ForkJoinTask timed get) uses a ! * constrained mixture of helping and compensating in part because ! * pools (actually, only the common pool) may not have any ! * available threads: If the pool is saturated (all available ! * workers are busy), the caller tries to remove and otherwise ! * help; else it blocks under compensation so that it may time out ! * independently of any tasks. * * Compensation does not by default aim to keep exactly the target * parallelism number of unblocked threads running at any given * time. Some previous versions of this class employed immediate * compensations for any blocked join. However, in practice, the
*** 576,587 **** * initialization. Since it (or any other created pool) need * never be used, we minimize initial construction overhead and * footprint to the setup of about a dozen fields. * * When external threads submit to the common pool, they can ! * perform subtask processing (see externalHelpComplete and ! * related methods) upon joins. This caller-helps policy makes it * sensible to set common pool parallelism level to one (or more) * less than the total number of available cores, or even zero for * pure caller-runs. We do not need to record whether external * submissions are to the common pool -- if not, external help * methods return quickly. These submitters would otherwise be --- 617,628 ---- * initialization. Since it (or any other created pool) need * never be used, we minimize initial construction overhead and * footprint to the setup of about a dozen fields. * * When external threads submit to the common pool, they can ! * perform subtask processing (see helpComplete and related ! * methods) upon joins. This caller-helps policy makes it * sensible to set common pool parallelism level to one (or more) * less than the total number of available cores, or even zero for * pure caller-runs. We do not need to record whether external * submissions are to the common pool -- if not, external help * methods return quickly. These submitters would otherwise be
*** 593,635 **** * As a more appropriate default in managed environments, unless * overridden by system properties, we use workers of subclass * InnocuousForkJoinWorkerThread when there is a SecurityManager * present. These workers have no permissions set, do not belong * to any user-defined ThreadGroup, and erase all ThreadLocals ! * after executing any top-level task (see ! * WorkQueue.afterTopLevelExec). The associated mechanics (mainly ! * in ForkJoinWorkerThread) may be JVM-dependent and must access ! * particular Thread class fields to achieve this effect. * * Memory placement * ================ * * Performance can be very sensitive to placement of instances of * ForkJoinPool and WorkQueues and their queue arrays. To reduce ! * false-sharing impact, the @Contended annotation isolates ! * adjacent WorkQueue instances, as well as the ForkJoinPool.ctl ! * field. WorkQueue arrays are allocated (by their threads) with ! * larger initial sizes than most ever need, mostly to reduce ! * false sharing with current garbage collectors that use cardmark ! * tables. * * Style notes * =========== * ! * Memory ordering relies mainly on VarHandles. This can be * awkward and ugly, but also reflects the need to control * outcomes across the unusual cases that arise in very racy code * with very few invariants. All fields are read into locals ! * before use, and null-checked if they are references. Array ! * accesses using masked indices include checks (that are always ! * true) that the array length is non-zero to avoid compilers ! * inserting more expensive traps. This is usually done in a ! * "C"-like style of listing declarations at the heads of methods ! * or blocks, and using inline assignments on first encounter. ! * Nearly all explicit checks lead to bypass/return, not exception ! * throws, because they may legitimately arise due to ! * cancellation/revocation during shutdown. * * There is a lot of representation-level coupling among classes * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask. The * fields of WorkQueue maintain data structures managed by * ForkJoinPool, so are directly accessed. There is little point --- 634,698 ---- * As a more appropriate default in managed environments, unless * overridden by system properties, we use workers of subclass * InnocuousForkJoinWorkerThread when there is a SecurityManager * present. These workers have no permissions set, do not belong * to any user-defined ThreadGroup, and erase all ThreadLocals ! * after executing any top-level task. The associated mechanics ! * may be JVM-dependent and must access particular Thread class ! * fields to achieve this effect. ! * ! * Interrupt handling ! * ================== ! * ! * The framework is designed to manage task cancellation ! * (ForkJoinTask.cancel) independently from the interrupt status ! * of threads running tasks. (See the public ForkJoinTask ! * documentation for rationale.) Interrupts are issued only in ! * tryTerminate, when workers should be terminating and tasks ! * should be cancelled anyway. Interrupts are cleared only when ! * necessary to ensure that calls to LockSupport.park do not loop ! * indefinitely (park returns immediately if the current thread is ! * interrupted). If so, interruption is reinstated after blocking ! * if status could be visible during the scope of any task. For ! * cases in which task bodies are specified or desired to ! * interrupt upon cancellation, ForkJoinTask.cancel can be ! * overridden to do so (as is done for invoke{Any,All}). * * Memory placement * ================ * * Performance can be very sensitive to placement of instances of * ForkJoinPool and WorkQueues and their queue arrays. To reduce ! * false-sharing impact, the @Contended annotation isolates the ! * ForkJoinPool.ctl field as well as the most heavily written ! * WorkQueue fields. These mainly reduce cache traffic by scanners. ! * WorkQueue arrays are presized large enough to avoid resizing ! * (which transiently reduces throughput) in most tree-like ! * computations, although not in some streaming usages. Initial ! * sizes are not large enough to avoid secondary contention ! * effects (especially for GC cardmarks) when queues are placed ! * near each other in memory. This is common, but has different ! * impact in different collectors and remains incompletely ! * addressed. * * Style notes * =========== * ! * Memory ordering relies mainly on atomic operations (CAS, ! * getAndSet, getAndAdd) along with explicit fences. This can be * awkward and ugly, but also reflects the need to control * outcomes across the unusual cases that arise in very racy code * with very few invariants. All fields are read into locals ! * before use, and null-checked if they are references, even if ! * they can never be null under current usages. Array accesses ! * using masked indices include checks (that are always true) that ! * the array length is non-zero to avoid compilers inserting more ! * expensive traps. This is usually done in a "C"-like style of ! * listing declarations at the heads of methods or blocks, and ! * using inline assignments on first encounter. Nearly all ! * explicit checks lead to bypass/return, not exception throws, ! * because they may legitimately arise during shutdown. * * There is a lot of representation-level coupling among classes * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask. The * fields of WorkQueue maintain data structures managed by * ForkJoinPool, so are directly accessed. There is little point
*** 650,659 **** --- 713,738 ---- * (4) Fields, along with constants used when unpacking some of them * (5) Internal control methods * (6) Callbacks and other support for ForkJoinTask methods * (7) Exported methods * (8) Static block initializing statics in minimally dependent order + * + * Revision notes + * ============== + * + * The main sources of differences of January 2020 ForkJoin + * classes from previous version are: + * + * * ForkJoinTask now uses field "aux" to support blocking joins + * and/or record exceptions, replacing reliance on builtin + * monitors and side tables. + * * Scans probe slots (vs compare indices), along with related + * changes that reduce performance differences across most + * garbage collectors, and reduce contention. + * * Refactoring for better integration of special task types and + * other capabilities that had been incrementally tacked on. Plus + * many minor reworkings to improve consistency. */ // Static utilities /**
*** 664,673 **** --- 743,760 ---- SecurityManager security = System.getSecurityManager(); if (security != null) security.checkPermission(modifyThreadPermission); } + static AccessControlContext contextWithPermissions(Permission ... perms) { + Permissions permissions = new Permissions(); + for (Permission perm : perms) + permissions.add(perm); + return new AccessControlContext( + new ProtectionDomain[] { new ProtectionDomain(null, permissions) }); + } + // Nested classes /** * Factory for creating new {@link ForkJoinWorkerThread}s. * A {@code ForkJoinWorkerThreadFactory} must be defined and used
*** 691,1177 **** * @throws NullPointerException if the pool is null */ public ForkJoinWorkerThread newThread(ForkJoinPool pool); } - static AccessControlContext contextWithPermissions(Permission ... perms) { - Permissions permissions = new Permissions(); - for (Permission perm : perms) - permissions.add(perm); - return new AccessControlContext( - new ProtectionDomain[] { new ProtectionDomain(null, permissions) }); - } - /** * Default ForkJoinWorkerThreadFactory implementation; creates a * new ForkJoinWorkerThread using the system class loader as the * thread context class loader. */ ! private static final class DefaultForkJoinWorkerThreadFactory implements ForkJoinWorkerThreadFactory { private static final AccessControlContext ACC = contextWithPermissions( new RuntimePermission("getClassLoader"), new RuntimePermission("setContextClassLoader")); public final ForkJoinWorkerThread newThread(ForkJoinPool pool) { return AccessController.doPrivileged( new PrivilegedAction<>() { public ForkJoinWorkerThread run() { ! return new ForkJoinWorkerThread( ! pool, ClassLoader.getSystemClassLoader()); }}, ACC); } } // Constants shared across ForkJoinPool and WorkQueue // Bounds static final int SWIDTH = 16; // width of short static final int SMASK = 0xffff; // short bits == max index static final int MAX_CAP = 0x7fff; // max #workers - 1 - static final int SQMASK = 0x007e; // max 64 (even) slots // Masks and units for WorkQueue.phase and ctl sp subfield static final int UNSIGNALLED = 1 << 31; // must be negative static final int SS_SEQ = 1 << 16; // version count - static final int QLOCK = 1; // must be 1 ! // Mode bits and sentinels, some also used in WorkQueue id and.source fields ! static final int OWNED = 1; // queue has owner thread static final int FIFO = 1 << 16; // fifo queue or access mode ! static final int SHUTDOWN = 1 << 18; ! static final int TERMINATED = 1 << 19; static final int STOP = 1 << 31; // must be negative ! static final int QUIET = 1 << 30; // not scanning or working ! static final int DORMANT = QUIET | UNSIGNALLED; ! ! /** ! * Initial capacity of work-stealing queue array. ! * Must be a power of two, at least 2. ! */ ! static final int INITIAL_QUEUE_CAPACITY = 1 << 13; ! ! /** ! * Maximum capacity for queue arrays. Must be a power of two less ! * than or equal to 1 << (31 - width of array entry) to ensure ! * lack of wraparound of index calculations, but defined to a ! * value a bit less than this to help users trap runaway programs ! * before saturating systems. ! */ ! static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M /** ! * The maximum number of top-level polls per worker before ! * checking other queues, expressed as a bit shift. See above for ! * rationale. */ ! static final int TOP_BOUND_SHIFT = 10; /** * Queues supporting work-stealing as well as external task * submission. See above for descriptions and algorithms. */ - @jdk.internal.vm.annotation.Contended static final class WorkQueue { ! volatile int source; // source queue id, or sentinel ! int id; // pool index, mode, tag ! int base; // index of next slot for poll ! int top; // index of next slot for push ! volatile int phase; // versioned, negative: queued, 1: locked int stackPred; // pool stack (ctl) predecessor link ! int nsteals; // number of steals ForkJoinTask<?>[] array; // the queued tasks; power of 2 size - final ForkJoinPool pool; // the containing pool (may be null) final ForkJoinWorkerThread owner; // owning thread or null if shared ! WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) { ! this.pool = pool; ! this.owner = owner; ! // Place indices in the center of array (that is not yet allocated) ! base = top = INITIAL_QUEUE_CAPACITY >>> 1; } /** ! * Tries to lock shared queue by CASing phase field. */ ! final boolean tryLockPhase() { ! return PHASE.compareAndSet(this, 0, 1); } ! final void releasePhaseLock() { ! PHASE.setRelease(this, 0); } /** * Returns an exportable index (used by ForkJoinWorkerThread). */ final int getPoolIndex() { ! return (id & 0xffff) >>> 1; // ignore odd/even tag bit } /** * Returns the approximate number of tasks in the queue. */ final int queueSize() { ! int n = (int)BASE.getAcquire(this) - top; ! return (n >= 0) ? 0 : -n; // ignore transient negative } /** ! * Provides a more accurate estimate of whether this queue has ! * any tasks than does queueSize, by checking whether a ! * near-empty queue has at least one unclaimed task. */ final boolean isEmpty() { ! ForkJoinTask<?>[] a; int n, cap, b; ! VarHandle.acquireFence(); // needed by external callers ! return ((n = (b = base) - top) >= 0 || // possibly one task ! (n == -1 && ((a = array) == null || ! (cap = a.length) == 0 || ! a[(cap - 1) & b] == null))); } /** * Pushes a task. Call only by owner in unshared queues. * * @param task the task. Caller must ensure non-null. * @throws RejectedExecutionException if array cannot be resized */ ! final void push(ForkJoinTask<?> task) { ! ForkJoinTask<?>[] a; ! int s = top, d = s - base, cap, m; ! ForkJoinPool p = pool; ! if ((a = array) != null && (cap = a.length) > 0) { ! QA.setRelease(a, (m = cap - 1) & s, task); ! top = s + 1; if (d == m) ! growArray(false); ! else if (QA.getAcquire(a, m & (s - 1)) == null && p != null) { ! VarHandle.fullFence(); // was empty ! p.signalWork(null); ! } } } /** ! * Version of push for shared queues. Call only with phase lock held. ! * @return true if should signal work */ final boolean lockedPush(ForkJoinTask<?> task) { ! ForkJoinTask<?>[] a; ! boolean signal = false; ! int s = top, d = s - base, cap, m; ! if ((a = array) != null && (cap = a.length) > 0) { ! a[(m = (cap - 1)) & s] = task; ! top = s + 1; if (d == m) ! growArray(true); ! else { ! phase = 0; // full volatile unlock ! if (((s - base) & ~1) == 0) // size 0 or 1 ! signal = true; ! } } ! return signal; } /** ! * Doubles the capacity of array. Call either by owner or with ! * lock held -- it is OK for base, but not top, to move while ! * resizings are in progress. ! */ ! final void growArray(boolean locked) { ! ForkJoinTask<?>[] newA = null; ! try { ! ForkJoinTask<?>[] oldA; int oldSize, newSize; ! if ((oldA = array) != null && (oldSize = oldA.length) > 0 && ! (newSize = oldSize << 1) <= MAXIMUM_QUEUE_CAPACITY && ! newSize > 0) { try { ! newA = new ForkJoinTask<?>[newSize]; ! } catch (OutOfMemoryError ex) { ! } ! if (newA != null) { // poll from old array, push to new ! int oldMask = oldSize - 1, newMask = newSize - 1; ! for (int s = top - 1, k = oldMask; k >= 0; --k) { ! ForkJoinTask<?> x = (ForkJoinTask<?>) ! QA.getAndSet(oldA, s & oldMask, null); ! if (x != null) ! newA[s-- & newMask] = x; ! else ! break; ! } ! array = newA; ! VarHandle.releaseFence(); } } ! } finally { ! if (locked) ! phase = 0; } - if (newA == null) - throw new RejectedExecutionException("Queue capacity exceeded"); } ! /** ! * Takes next task, if one exists, in FIFO order. ! */ ! final ForkJoinTask<?> poll() { ! int b, k, cap; ForkJoinTask<?>[] a; ! while ((a = array) != null && (cap = a.length) > 0 && ! top - (b = base) > 0) { ! ForkJoinTask<?> t = (ForkJoinTask<?>) ! QA.getAcquire(a, k = (cap - 1) & b); ! if (base == b++) { ! if (t == null) ! Thread.yield(); // await index advance ! else if (QA.compareAndSet(a, k, t, null)) { ! BASE.setOpaque(this, b); ! return t; ! } ! } ! } ! return null; ! } /** ! * Takes next task, if one exists, in order specified by mode. */ ! final ForkJoinTask<?> nextLocalTask() { ForkJoinTask<?> t = null; ! int md = id, b, s, d, cap; ForkJoinTask<?>[] a; ! if ((a = array) != null && (cap = a.length) > 0 && ! (d = (s = top) - (b = base)) > 0) { ! if ((md & FIFO) == 0 || d == 1) { ! if ((t = (ForkJoinTask<?>) ! QA.getAndSet(a, (cap - 1) & --s, null)) != null) ! TOP.setOpaque(this, s); ! } ! else if ((t = (ForkJoinTask<?>) ! QA.getAndSet(a, (cap - 1) & b++, null)) != null) { ! BASE.setOpaque(this, b); ! } ! else // on contention in FIFO mode, use regular poll ! t = poll(); ! } return t; } /** ! * Returns next task, if one exists, in order specified by mode. */ ! final ForkJoinTask<?> peek() { ! int cap; ForkJoinTask<?>[] a; ! return ((a = array) != null && (cap = a.length) > 0) ? ! a[(cap - 1) & ((id & FIFO) != 0 ? base : top - 1)] : null; } /** ! * Pops the given task only if it is at the current top. */ ! final boolean tryUnpush(ForkJoinTask<?> task) { ! boolean popped = false; ! int s, cap; ForkJoinTask<?>[] a; if ((a = array) != null && (cap = a.length) > 0 && ! (s = top) != base && ! (popped = QA.compareAndSet(a, (cap - 1) & --s, task, null))) ! TOP.setOpaque(this, s); ! return popped; } /** ! * Shared version of tryUnpush. */ ! final boolean tryLockedUnpush(ForkJoinTask<?> task) { ! boolean popped = false; ! int s = top - 1, k, cap; ForkJoinTask<?>[] a; ! if ((a = array) != null && (cap = a.length) > 0 && ! a[k = (cap - 1) & s] == task && tryLockPhase()) { ! if (top == s + 1 && array == a && ! (popped = QA.compareAndSet(a, k, task, null))) top = s; - releasePhaseLock(); } ! return popped; } /** ! * Removes and cancels all known tasks, ignoring any exceptions. */ ! final void cancelAll() { ! for (ForkJoinTask<?> t; (t = poll()) != null; ) ! ForkJoinTask.cancelIgnoringExceptions(t); } - - // Specialized execution methods /** ! * Runs the given (stolen) task if nonnull, as well as ! * remaining local tasks and others available from the given ! * queue, up to bound n (to avoid infinite unfairness). */ ! final void topLevelExec(ForkJoinTask<?> t, WorkQueue q, int n) { ! int nstolen = 1; ! for (int j = 0;;) { ! if (t != null) ! t.doExec(); ! if (j++ <= n) ! t = nextLocalTask(); ! else { ! j = 0; ! t = null; ! } ! if (t == null) { ! if (q != null && (t = q.poll()) != null) { ! ++nstolen; ! j = 0; } ! else if (j != 0) break; } } ! ForkJoinWorkerThread thread = owner; ! nsteals += nstolen; ! source = 0; ! if (thread != null) ! thread.afterTopLevelExec(); } /** ! * If present, removes task from queue and executes it. */ ! final void tryRemoveAndExec(ForkJoinTask<?> task) { ! ForkJoinTask<?>[] a; int s, cap; ! if ((a = array) != null && (cap = a.length) > 0 && ! (s = top) - base > 0) { // traverse from top ! for (int m = cap - 1, ns = s - 1, i = ns; ; --i) { ! int index = i & m; ! ForkJoinTask<?> t = (ForkJoinTask<?>)QA.get(a, index); ! if (t == null) ! break; ! else if (t == task) { ! if (QA.compareAndSet(a, index, t, null)) { ! top = ns; // safely shift down ! for (int j = i; j != ns; ++j) { ! ForkJoinTask<?> f; ! int pindex = (j + 1) & m; ! f = (ForkJoinTask<?>)QA.get(a, pindex); ! QA.setVolatile(a, pindex, null); ! int jindex = j & m; ! QA.setRelease(a, jindex, f); ! } ! VarHandle.releaseFence(); ! t.doExec(); ! } ! break; } } } } /** * Tries to pop and run tasks within the target's computation * until done, not found, or limit exceeded. * * @param task root of CountedCompleter computation * @param limit max runs, or zero for no limit - * @param shared true if must lock to extract task * @return task status on exit */ ! final int helpCC(CountedCompleter<?> task, int limit, boolean shared) { ! int status = 0; ! if (task != null && (status = task.status) >= 0) { ! int s, k, cap; ForkJoinTask<?>[] a; ! while ((a = array) != null && (cap = a.length) > 0 && ! (s = top) - base > 0) { ! CountedCompleter<?> v = null; ! ForkJoinTask<?> o = a[k = (cap - 1) & (s - 1)]; ! if (o instanceof CountedCompleter) { ! CountedCompleter<?> t = (CountedCompleter<?>)o; ! for (CountedCompleter<?> f = t;;) { ! if (f != task) { ! if ((f = f.completer) == null) ! break; ! } ! else if (shared) { ! if (tryLockPhase()) { ! if (top == s && array == a && ! QA.compareAndSet(a, k, t, null)) { ! top = s - 1; ! v = t; } ! releasePhaseLock(); } break; } ! else { ! if (QA.compareAndSet(a, k, t, null)) { ! top = s - 1; ! v = t; ! } break; } ! } ! } ! if (v != null) ! v.doExec(); ! if ((status = task.status) < 0 || v == null || ! (limit != 0 && --limit == 0)) break; - } } return status; } /** * Tries to poll and run AsynchronousCompletionTasks until ! * none found or blocker is released * * @param blocker the blocker */ final void helpAsyncBlocker(ManagedBlocker blocker) { ! if (blocker != null) { ! int b, k, cap; ForkJoinTask<?>[] a; ForkJoinTask<?> t; ! while ((a = array) != null && (cap = a.length) > 0 && ! top - (b = base) > 0) { ! t = (ForkJoinTask<?>)QA.getAcquire(a, k = (cap - 1) & b); ! if (blocker.isReleasable()) ! break; ! else if (base == b++ && t != null) { ! if (!(t instanceof CompletableFuture. ! AsynchronousCompletionTask)) ! break; ! else if (QA.compareAndSet(a, k, t, null)) { ! BASE.setOpaque(this, b); t.doExec(); } } } ! } } /** ! * Returns true if owned and not known to be blocked. */ final boolean isApparentlyUnblocked() { Thread wt; Thread.State s; return ((wt = owner) != null && (s = wt.getState()) != Thread.State.BLOCKED && s != Thread.State.WAITING && s != Thread.State.TIMED_WAITING); } - // VarHandle mechanics. - static final VarHandle PHASE; - static final VarHandle BASE; - static final VarHandle TOP; static { try { MethodHandles.Lookup l = MethodHandles.lookup(); ! PHASE = l.findVarHandle(WorkQueue.class, "phase", int.class); BASE = l.findVarHandle(WorkQueue.class, "base", int.class); - TOP = l.findVarHandle(WorkQueue.class, "top", int.class); } catch (ReflectiveOperationException e) { throw new ExceptionInInitializerError(e); } } } --- 778,1271 ---- * @throws NullPointerException if the pool is null */ public ForkJoinWorkerThread newThread(ForkJoinPool pool); } /** * Default ForkJoinWorkerThreadFactory implementation; creates a * new ForkJoinWorkerThread using the system class loader as the * thread context class loader. */ ! static final class DefaultForkJoinWorkerThreadFactory ! implements ForkJoinWorkerThreadFactory { ! // ACC for access to the factory ! private static final AccessControlContext ACC = contextWithPermissions( ! new RuntimePermission("getClassLoader"), ! new RuntimePermission("setContextClassLoader")); ! public final ForkJoinWorkerThread newThread(ForkJoinPool pool) { ! return AccessController.doPrivileged( ! new PrivilegedAction<>() { ! public ForkJoinWorkerThread run() { ! return new ForkJoinWorkerThread(null, pool, true, false); ! }}, ! ACC); ! } ! } ! ! /** ! * Factory for CommonPool unless overridden by System property. ! * Creates InnocuousForkJoinWorkerThreads if a security manager is ! * present at time of invocation. Support requires that we break ! * quite a lot of encapsulation (some via helper methods in ! * ThreadLocalRandom) to access and set Thread fields. ! */ ! static final class DefaultCommonPoolForkJoinWorkerThreadFactory implements ForkJoinWorkerThreadFactory { private static final AccessControlContext ACC = contextWithPermissions( + modifyThreadPermission, + new RuntimePermission("enableContextClassLoaderOverride"), + new RuntimePermission("modifyThreadGroup"), new RuntimePermission("getClassLoader"), new RuntimePermission("setContextClassLoader")); public final ForkJoinWorkerThread newThread(ForkJoinPool pool) { return AccessController.doPrivileged( new PrivilegedAction<>() { public ForkJoinWorkerThread run() { ! return System.getSecurityManager() == null ? ! new ForkJoinWorkerThread(null, pool, true, true): ! new ForkJoinWorkerThread. ! InnocuousForkJoinWorkerThread(pool); }}, ACC); } } // Constants shared across ForkJoinPool and WorkQueue // Bounds static final int SWIDTH = 16; // width of short static final int SMASK = 0xffff; // short bits == max index static final int MAX_CAP = 0x7fff; // max #workers - 1 // Masks and units for WorkQueue.phase and ctl sp subfield static final int UNSIGNALLED = 1 << 31; // must be negative static final int SS_SEQ = 1 << 16; // version count ! // Mode bits and sentinels, some also used in WorkQueue fields static final int FIFO = 1 << 16; // fifo queue or access mode ! static final int SRC = 1 << 17; // set for valid queue ids ! static final int INNOCUOUS = 1 << 18; // set for Innocuous workers ! static final int QUIET = 1 << 19; // quiescing phase or source ! static final int SHUTDOWN = 1 << 24; ! static final int TERMINATED = 1 << 25; static final int STOP = 1 << 31; // must be negative ! static final int UNCOMPENSATE = 1 << 16; // tryCompensate return /** ! * Initial capacity of work-stealing queue array. Must be a power ! * of two, at least 2. See above. */ ! static final int INITIAL_QUEUE_CAPACITY = 1 << 8; /** * Queues supporting work-stealing as well as external task * submission. See above for descriptions and algorithms. */ static final class WorkQueue { ! volatile int phase; // versioned, negative if inactive int stackPred; // pool stack (ctl) predecessor link ! int config; // index, mode, ORed with SRC after init ! int base; // index of next slot for poll ForkJoinTask<?>[] array; // the queued tasks; power of 2 size final ForkJoinWorkerThread owner; // owning thread or null if shared ! // segregate fields frequently updated but not read by scans or steals ! @jdk.internal.vm.annotation.Contended("w") ! int top; // index of next slot for push ! @jdk.internal.vm.annotation.Contended("w") ! volatile int source; // source queue id, lock, or sentinel ! @jdk.internal.vm.annotation.Contended("w") ! int nsteals; // number of steals from other queues ! ! // Support for atomic operations ! private static final VarHandle QA; // for array slots ! private static final VarHandle SOURCE; ! private static final VarHandle BASE; ! static final ForkJoinTask<?> getSlot(ForkJoinTask<?>[] a, int i) { ! return (ForkJoinTask<?>)QA.getAcquire(a, i); ! } ! static final ForkJoinTask<?> getAndClearSlot(ForkJoinTask<?>[] a, ! int i) { ! return (ForkJoinTask<?>)QA.getAndSet(a, i, null); ! } ! static final void setSlotVolatile(ForkJoinTask<?>[] a, int i, ! ForkJoinTask<?> v) { ! QA.setVolatile(a, i, v); ! } ! static final boolean casSlotToNull(ForkJoinTask<?>[] a, int i, ! ForkJoinTask<?> c) { ! return QA.weakCompareAndSet(a, i, c, null); ! } ! final boolean tryLock() { ! return SOURCE.compareAndSet(this, 0, 1); ! } ! final void setBaseOpaque(int b) { ! BASE.setOpaque(this, b); } /** ! * Constructor used by ForkJoinWorkerThreads. Most fields ! * are initialized upon thread start, in pool.registerWorker. */ ! WorkQueue(ForkJoinWorkerThread owner, boolean isInnocuous) { ! this.config = (isInnocuous) ? INNOCUOUS : 0; ! this.owner = owner; } ! /** ! * Constructor used for external queues. ! */ ! WorkQueue(int config) { ! array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY]; ! this.config = config; ! owner = null; ! phase = -1; } /** * Returns an exportable index (used by ForkJoinWorkerThread). */ final int getPoolIndex() { ! return (config & 0xffff) >>> 1; // ignore odd/even tag bit } /** * Returns the approximate number of tasks in the queue. */ final int queueSize() { ! VarHandle.acquireFence(); // ensure fresh reads by external callers ! int n = top - base; ! return (n < 0) ? 0 : n; // ignore transient negative } /** ! * Provides a more conservative estimate of whether this queue ! * has any tasks than does queueSize. */ final boolean isEmpty() { ! return !((source != 0 && owner == null) || top - base > 0); } /** * Pushes a task. Call only by owner in unshared queues. * * @param task the task. Caller must ensure non-null. + * @param pool (no-op if null) * @throws RejectedExecutionException if array cannot be resized */ ! final void push(ForkJoinTask<?> task, ForkJoinPool pool) { ! ForkJoinTask<?>[] a = array; ! int s = top++, d = s - base, cap, m; // skip insert if disabled ! if (a != null && pool != null && (cap = a.length) > 0) { ! setSlotVolatile(a, (m = cap - 1) & s, task); if (d == m) ! growArray(); ! if (d == m || a[m & (s - 1)] == null) ! pool.signalWork(); // signal if was empty or resized } } /** ! * Pushes task to a shared queue with lock already held, and unlocks. ! * ! * @return true if caller should signal work */ final boolean lockedPush(ForkJoinTask<?> task) { ! ForkJoinTask<?>[] a = array; ! int s = top++, d = s - base, cap, m; ! if (a != null && (cap = a.length) > 0) { ! a[(m = cap - 1) & s] = task; if (d == m) ! growArray(); ! source = 0; // unlock ! if (d == m || a[m & (s - 1)] == null) ! return true; } ! return false; } /** ! * Doubles the capacity of array. Called by owner or with lock ! * held after pre-incrementing top, which is reverted on ! * allocation failure. ! */ ! final void growArray() { ! ForkJoinTask<?>[] oldArray = array, newArray; ! int s = top - 1, oldCap, newCap; ! if (oldArray != null && (oldCap = oldArray.length) > 0 && ! (newCap = oldCap << 1) > 0) { // skip if disabled try { ! newArray = new ForkJoinTask<?>[newCap]; ! } catch (Throwable ex) { ! top = s; ! if (owner == null) ! source = 0; // unlock ! throw new RejectedExecutionException( ! "Queue capacity exceeded"); } + int newMask = newCap - 1, oldMask = oldCap - 1; + for (int k = oldCap; k > 0; --k, --s) { + ForkJoinTask<?> x; // poll old, push to new + if ((x = getAndClearSlot(oldArray, s & oldMask)) == null) + break; // others already taken + newArray[s & newMask] = x; } ! VarHandle.releaseFence(); // fill before publish ! array = newArray; } } ! // Variants of pop /** ! * Pops and returns task, or null if empty. Called only by owner. */ ! private ForkJoinTask<?> pop() { ForkJoinTask<?> t = null; ! int s = top, cap; ForkJoinTask<?>[] a; ! if ((a = array) != null && (cap = a.length) > 0 && base != s-- && ! (t = getAndClearSlot(a, (cap - 1) & s)) != null) ! top = s; return t; } /** ! * Pops the given task for owner only if it is at the current top. */ ! final boolean tryUnpush(ForkJoinTask<?> task) { ! int s = top, cap; ForkJoinTask<?>[] a; ! if ((a = array) != null && (cap = a.length) > 0 && base != s-- && ! casSlotToNull(a, (cap - 1) & s, task)) { ! top = s; ! return true; ! } ! return false; } /** ! * Locking version of tryUnpush. */ ! final boolean externalTryUnpush(ForkJoinTask<?> task) { ! boolean taken = false; ! int s = top, cap, k; ForkJoinTask<?>[] a; if ((a = array) != null && (cap = a.length) > 0 && ! a[k = (cap - 1) & (s - 1)] == task && tryLock()) { ! if (top == s && array == a && ! (taken = casSlotToNull(a, k, task))) ! top = s - 1; ! source = 0; // release lock ! } ! return taken; } /** ! * Deep form of tryUnpush: Traverses from top and removes task if ! * present, shifting others to fill gap. */ ! final boolean tryRemove(ForkJoinTask<?> task, boolean owned) { ! boolean taken = false; ! int p = top, cap; ForkJoinTask<?>[] a; ForkJoinTask<?> t; ! if ((a = array) != null && task != null && (cap = a.length) > 0) { ! int m = cap - 1, s = p - 1, d = p - base; ! for (int i = s, k; d > 0; --i, --d) { ! if ((t = a[k = i & m]) == task) { ! if (owned || tryLock()) { ! if ((owned || (array == a && top == p)) && ! (taken = casSlotToNull(a, k, t))) { ! for (int j = i; j != s; ) // shift down ! a[j & m] = getAndClearSlot(a, ++j & m); top = s; } ! if (!owned) ! source = 0; ! } ! break; ! } ! } } + return taken; + } + + // variants of poll /** ! * Tries once to poll next task in FIFO order, failing on ! * inconsistency or contention. */ ! final ForkJoinTask<?> tryPoll() { ! int cap, b, k; ForkJoinTask<?>[] a; ! if ((a = array) != null && (cap = a.length) > 0) { ! ForkJoinTask<?> t = getSlot(a, k = (cap - 1) & (b = base)); ! if (base == b++ && t != null && casSlotToNull(a, k, t)) { ! setBaseOpaque(b); ! return t; ! } ! } ! return null; } /** ! * Takes next task, if one exists, in order specified by mode. */ ! final ForkJoinTask<?> nextLocalTask(int cfg) { ! ForkJoinTask<?> t = null; ! int s = top, cap; ForkJoinTask<?>[] a; ! if ((a = array) != null && (cap = a.length) > 0) { ! for (int b, d;;) { ! if ((d = s - (b = base)) <= 0) ! break; ! if (d == 1 || (cfg & FIFO) == 0) { ! if ((t = getAndClearSlot(a, --s & (cap - 1))) != null) ! top = s; ! break; } ! if ((t = getAndClearSlot(a, b++ & (cap - 1))) != null) { ! setBaseOpaque(b); break; } } ! } ! return t; } /** ! * Takes next task, if one exists, using configured mode. */ ! final ForkJoinTask<?> nextLocalTask() { ! return nextLocalTask(config); } + + /** + * Returns next task, if one exists, in order specified by mode. + */ + final ForkJoinTask<?> peek() { + VarHandle.acquireFence(); + int cap; ForkJoinTask<?>[] a; + return ((a = array) != null && (cap = a.length) > 0) ? + a[(cap - 1) & ((config & FIFO) != 0 ? base : top - 1)] : null; } + + // specialized execution methods + + /** + * Runs the given (stolen) task if nonnull, as well as + * remaining local tasks and/or others available from the + * given queue. + */ + final void topLevelExec(ForkJoinTask<?> task, WorkQueue q) { + int cfg = config, nstolen = 1; + while (task != null) { + task.doExec(); + if ((task = nextLocalTask(cfg)) == null && + q != null && (task = q.tryPoll()) != null) + ++nstolen; } + nsteals += nstolen; + source = 0; + if ((cfg & INNOCUOUS) != 0) + ThreadLocalRandom.eraseThreadLocals(Thread.currentThread()); } /** * Tries to pop and run tasks within the target's computation * until done, not found, or limit exceeded. * * @param task root of CountedCompleter computation + * @param owned true if owned by a ForkJoinWorkerThread * @param limit max runs, or zero for no limit * @return task status on exit */ ! final int helpComplete(ForkJoinTask<?> task, boolean owned, int limit) { ! int status = 0, cap, k, p, s; ForkJoinTask<?>[] a; ForkJoinTask<?> t; ! while (task != null && (status = task.status) >= 0 && ! (a = array) != null && (cap = a.length) > 0 && ! (t = a[k = (cap - 1) & (s = (p = top) - 1)]) ! instanceof CountedCompleter) { ! CountedCompleter<?> f = (CountedCompleter<?>)t; ! boolean taken = false; ! for (;;) { // exec if root task is a completer of t ! if (f == task) { ! if (owned) { ! if ((taken = casSlotToNull(a, k, t))) ! top = s; } ! else if (tryLock()) { ! if (top == p && array == a && ! (taken = casSlotToNull(a, k, t))) ! top = s; ! source = 0; } break; } ! else if ((f = f.completer) == null) break; } ! if (!taken) ! break; ! t.doExec(); ! if (limit != 0 && --limit == 0) break; } return status; } /** * Tries to poll and run AsynchronousCompletionTasks until ! * none found or blocker is released. * * @param blocker the blocker */ final void helpAsyncBlocker(ManagedBlocker blocker) { ! int cap, b, d, k; ForkJoinTask<?>[] a; ForkJoinTask<?> t; ! while (blocker != null && (d = top - (b = base)) > 0 && ! (a = array) != null && (cap = a.length) > 0 && ! (((t = getSlot(a, k = (cap - 1) & b)) == null && d > 1) || ! t instanceof ! CompletableFuture.AsynchronousCompletionTask) && ! !blocker.isReleasable()) { ! if (t != null && base == b++ && casSlotToNull(a, k, t)) { ! setBaseOpaque(b); t.doExec(); } } } ! ! // misc ! ! /** AccessControlContext for innocuous workers, created on 1st use. */ ! private static AccessControlContext INNOCUOUS_ACC; ! ! /** ! * Initializes (upon registration) InnocuousForkJoinWorkerThreads. ! */ ! final void initializeInnocuousWorker() { ! AccessControlContext acc; // racy construction OK ! if ((acc = INNOCUOUS_ACC) == null) ! INNOCUOUS_ACC = acc = new AccessControlContext( ! new ProtectionDomain[] { new ProtectionDomain(null, null) }); ! Thread t = Thread.currentThread(); ! ThreadLocalRandom.setInheritedAccessControlContext(t, acc); ! ThreadLocalRandom.eraseThreadLocals(t); } /** ! * Returns true if owned by a worker thread and not known to be blocked. */ final boolean isApparentlyUnblocked() { Thread wt; Thread.State s; return ((wt = owner) != null && (s = wt.getState()) != Thread.State.BLOCKED && s != Thread.State.WAITING && s != Thread.State.TIMED_WAITING); } static { try { + QA = MethodHandles.arrayElementVarHandle(ForkJoinTask[].class); MethodHandles.Lookup l = MethodHandles.lookup(); ! SOURCE = l.findVarHandle(WorkQueue.class, "source", int.class); BASE = l.findVarHandle(WorkQueue.class, "base", int.class); } catch (ReflectiveOperationException e) { throw new ExceptionInInitializerError(e); } } }
*** 1211,1231 **** * Limit on spare thread construction in tryCompensate. */ private static final int COMMON_MAX_SPARES; /** ! * Sequence number for creating workerNamePrefix. */ ! private static int poolNumberSequence; ! ! /** ! * Returns the next sequence number. We don't expect this to ! * ever contend, so use simple builtin sync. ! */ ! private static final synchronized int nextPoolId() { ! return ++poolNumberSequence; ! } // static configuration constants /** * Default idle timeout value (in milliseconds) for the thread --- 1305,1317 ---- * Limit on spare thread construction in tryCompensate. */ private static final int COMMON_MAX_SPARES; /** ! * Sequence number for creating worker names */ ! private static volatile int poolIds; // static configuration constants /** * Default idle timeout value (in milliseconds) for the thread
*** 1246,1261 **** * thread limits, so allows JVMs to catch misuse/abuse before * running out of resources needed to do so. */ private static final int DEFAULT_COMMON_MAX_SPARES = 256; - /** - * Increment for seed generators. See class ThreadLocal for - * explanation. - */ - private static final int SEED_INCREMENT = 0x9e3779b9; - /* * Bits and masks for field ctl, packed with 4 16 bit subfields: * RC: Number of released (unqueued) workers minus target parallelism * TC: Number of total workers minus target parallelism * SS: version count and status of top waiting thread --- 1332,1341 ----
*** 1269,1282 **** * workers, when tc is negative, there are not enough total * workers. When sp is non-zero, there are waiting workers. To * deal with possibly negative fields, we use casts in and out of * "short" and/or signed shifts to maintain signedness. * ! * Because it occupies uppermost bits, we can add one release count ! * using getAndAddLong of RC_UNIT, rather than CAS, when returning ! * from a blocked join. Other updates entail multiple subfields ! * and masking, requiring CAS. * * The limits packed in field "bounds" are also offset by the * parallelism level to make them comparable to the ctl rc and tc * fields. */ --- 1349,1362 ---- * workers, when tc is negative, there are not enough total * workers. When sp is non-zero, there are waiting workers. To * deal with possibly negative fields, we use casts in and out of * "short" and/or signed shifts to maintain signedness. * ! * Because it occupies uppermost bits, we can add one release ! * count using getAndAdd of RC_UNIT, rather than CAS, when ! * returning from a blocked join. Other updates entail multiple ! * subfields and masking, requiring CAS. * * The limits packed in field "bounds" are also offset by the * parallelism level to make them comparable to the ctl rc and tc * fields. */
*** 1296,1319 **** private static final long TC_MASK = 0xffffL << TC_SHIFT; private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign // Instance fields - volatile long stealCount; // collects worker nsteals final long keepAlive; // milliseconds before dropping if idle ! int indexSeed; // next worker index final int bounds; // min, max threads packed as shorts volatile int mode; // parallelism, runstate, queue mode ! WorkQueue[] workQueues; // main registry ! final String workerNamePrefix; // for worker thread string; sync lock final ForkJoinWorkerThreadFactory factory; final UncaughtExceptionHandler ueh; // per-worker UEH final Predicate<? super ForkJoinPool> saturate; @jdk.internal.vm.annotation.Contended("fjpctl") // segregate volatile long ctl; // main pool control // Creating, registering and deregistering workers /** * Tries to construct and start one worker. Assumes that total * count has already been incremented as a reservation. Invokes --- 1376,1426 ---- private static final long TC_MASK = 0xffffL << TC_SHIFT; private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign // Instance fields final long keepAlive; // milliseconds before dropping if idle ! volatile long stealCount; // collects worker nsteals ! int scanRover; // advances across pollScan calls ! volatile int threadIds; // for worker thread names final int bounds; // min, max threads packed as shorts volatile int mode; // parallelism, runstate, queue mode ! WorkQueue[] queues; // main registry ! final ReentrantLock registrationLock; ! Condition termination; // lazily constructed ! final String workerNamePrefix; // null for common pool final ForkJoinWorkerThreadFactory factory; final UncaughtExceptionHandler ueh; // per-worker UEH final Predicate<? super ForkJoinPool> saturate; @jdk.internal.vm.annotation.Contended("fjpctl") // segregate volatile long ctl; // main pool control + // Support for atomic operations + private static final VarHandle CTL; + private static final VarHandle MODE; + private static final VarHandle THREADIDS; + private static final VarHandle POOLIDS; + private boolean compareAndSetCtl(long c, long v) { + return CTL.compareAndSet(this, c, v); + } + private long compareAndExchangeCtl(long c, long v) { + return (long)CTL.compareAndExchange(this, c, v); + } + private long getAndAddCtl(long v) { + return (long)CTL.getAndAdd(this, v); + } + private int getAndBitwiseOrMode(int v) { + return (int)MODE.getAndBitwiseOr(this, v); + } + private int getAndAddThreadIds(int x) { + return (int)THREADIDS.getAndAdd(this, x); + } + private static int getAndAddPoolIds(int x) { + return (int)POOLIDS.getAndAdd(x); + } + // Creating, registering and deregistering workers /** * Tries to construct and start one worker. Assumes that total * count has already been incremented as a reservation. Invokes
*** 1336,1422 **** deregisterWorker(wt, ex); return false; } /** ! * Tries to add one worker, incrementing ctl counts before doing ! * so, relying on createWorker to back out on failure. ! * ! * @param c incoming ctl value, with total count negative and no ! * idle workers. On CAS failure, c is refreshed and retried if ! * this holds (otherwise, a new worker is not needed). */ ! private void tryAddWorker(long c) { ! do { ! long nc = ((RC_MASK & (c + RC_UNIT)) | ! (TC_MASK & (c + TC_UNIT))); ! if (ctl == c && CTL.compareAndSet(this, c, nc)) { ! createWorker(); ! break; ! } ! } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0); } /** ! * Callback from ForkJoinWorkerThread constructor to establish and ! * record its WorkQueue. * ! * @param wt the worker thread ! * @return the worker's queue */ ! final WorkQueue registerWorker(ForkJoinWorkerThread wt) { ! UncaughtExceptionHandler handler; ! wt.setDaemon(true); // configure thread ! if ((handler = ueh) != null) ! wt.setUncaughtExceptionHandler(handler); ! int tid = 0; // for thread name ! int idbits = mode & FIFO; ! String prefix = workerNamePrefix; ! WorkQueue w = new WorkQueue(this, wt); ! if (prefix != null) { ! synchronized (prefix) { ! WorkQueue[] ws = workQueues; int n; ! int s = indexSeed += SEED_INCREMENT; ! idbits |= (s & ~(SMASK | FIFO | DORMANT)); ! if (ws != null && (n = ws.length) > 1) { ! int m = n - 1; ! tid = m & ((s << 1) | 1); // odd-numbered indices ! for (int probes = n >>> 1;;) { // find empty slot ! WorkQueue q; ! if ((q = ws[tid]) == null || q.phase == QUIET) ! break; ! else if (--probes == 0) { ! tid = n | 1; // resize below ! break; ! } ! else ! tid = (tid + 2) & m; ! } ! w.phase = w.id = tid | idbits; // now publishable ! if (tid < n) ! ws[tid] = w; else { // expand array ! int an = n << 1; WorkQueue[] as = new WorkQueue[an]; ! as[tid] = w; ! int am = an - 1; ! for (int j = 0; j < n; ++j) { ! WorkQueue v; // copy external queue ! if ((v = ws[j]) != null) // position may change ! as[v.id & am & SQMASK] = v; ! if (++j >= n) ! break; ! as[j] = ws[j]; // copy worker } ! workQueues = as; } } } - wt.setName(prefix.concat(Integer.toString(tid))); } - return w; } /** * Final callback from terminating worker, as well as upon failure * to construct or start a worker. Removes record of worker from --- 1443,1509 ---- deregisterWorker(wt, ex); return false; } /** ! * Provides a name for ForkJoinWorkerThread constructor. */ ! final String nextWorkerThreadName() { ! String prefix = workerNamePrefix; ! int tid = getAndAddThreadIds(1) + 1; ! if (prefix == null) // commonPool has no prefix ! prefix = "ForkJoinPool.commonPool-worker-"; ! return prefix.concat(Integer.toString(tid)); } /** ! * Finishes initializing and records owned queue. * ! * @param w caller's WorkQueue */ ! final void registerWorker(WorkQueue w) { ! ReentrantLock lock = registrationLock; ! ThreadLocalRandom.localInit(); ! int seed = ThreadLocalRandom.getProbe(); ! if (w != null && lock != null) { ! int modebits = (mode & FIFO) | w.config; ! w.array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY]; ! w.stackPred = seed; // stash for runWorker ! if ((modebits & INNOCUOUS) != 0) ! w.initializeInnocuousWorker(); ! int id = (seed << 1) | 1; // initial index guess ! lock.lock(); ! try { ! WorkQueue[] qs; int n; // find queue index ! if ((qs = queues) != null && (n = qs.length) > 0) { ! int k = n, m = n - 1; ! for (; qs[id &= m] != null && k > 0; id -= 2, k -= 2); ! if (k == 0) ! id = n | 1; // resize below ! w.phase = w.config = id | modebits; // now publishable ! if (id < n) ! qs[id] = w; else { // expand array ! int an = n << 1, am = an - 1; WorkQueue[] as = new WorkQueue[an]; ! as[id & am] = w; ! for (int j = 1; j < n; j += 2) ! as[j] = qs[j]; ! for (int j = 0; j < n; j += 2) { ! WorkQueue q; ! if ((q = qs[j]) != null) // shared queues may move ! as[q.config & am] = q; } ! VarHandle.releaseFence(); // fill before publish ! queues = as; } } + } finally { + lock.unlock(); } } } /** * Final callback from terminating worker, as well as upon failure * to construct or start a worker. Removes record of worker from
*** 1425,1986 **** * * @param wt the worker thread, or null if construction failed * @param ex the exception causing failure, or null if none */ final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) { WorkQueue w = null; ! int phase = 0; ! if (wt != null && (w = wt.workQueue) != null) { ! Object lock = workerNamePrefix; ! int wid = w.id; ! long ns = (long)w.nsteals & 0xffffffffL; ! if (lock != null) { ! synchronized (lock) { ! WorkQueue[] ws; int n, i; // remove index from array ! if ((ws = workQueues) != null && (n = ws.length) > 0 && ! ws[i = wid & (n - 1)] == w) ! ws[i] = null; ! stealCount += ns; ! } ! } ! phase = w.phase; ! } ! if (phase != QUIET) { // else pre-adjusted ! long c; // decrement counts ! do {} while (!CTL.weakCompareAndSet ! (this, c = ctl, ((RC_MASK & (c - RC_UNIT)) | (TC_MASK & (c - TC_UNIT)) | ! (SP_MASK & c)))); } - if (w != null) - w.cancelAll(); // cancel remaining tasks ! if (!tryTerminate(false, false) && // possibly replace worker ! w != null && w.array != null) // avoid repeated failures ! signalWork(null); ! ! if (ex == null) // help clean on way out ! ForkJoinTask.helpExpungeStaleExceptions(); ! else // rethrow ForkJoinTask.rethrow(ex); } ! /** * Tries to create or release a worker if too few are running. - * @param q if non-null recheck if empty on CAS failure */ ! final void signalWork(WorkQueue q) { ! for (;;) { ! long c; int sp; WorkQueue[] ws; int i; WorkQueue v; ! if ((c = ctl) >= 0L) // enough workers ! break; ! else if ((sp = (int)c) == 0) { // no idle workers ! if ((c & ADD_WORKER) != 0L) // too few workers ! tryAddWorker(c); break; } ! else if ((ws = workQueues) == null) break; // unstarted/terminated ! else if (ws.length <= (i = sp & SMASK)) break; // terminated ! else if ((v = ws[i]) == null) break; // terminating else { - int np = sp & ~UNSIGNALLED; - int vp = v.phase; long nc = (v.stackPred & SP_MASK) | (UC_MASK & (c + RC_UNIT)); Thread vt = v.owner; ! if (sp == vp && CTL.compareAndSet(this, c, nc)) { ! v.phase = np; ! if (vt != null && v.source < 0) ! LockSupport.unpark(vt); break; } - else if (q != null && q.isEmpty()) // no need to retry - break; } } } /** ! * Tries to decrement counts (sometimes implicitly) and possibly ! * arrange for a compensating worker in preparation for blocking: ! * If not all core workers yet exist, creates one, else if any are ! * unreleased (possibly including caller) releases one, else if ! * fewer than the minimum allowed number of workers running, ! * checks to see that they are all active, and if so creates an ! * extra worker unless over maximum limit and policy is to ! * saturate. Most of these steps can fail due to interference, in ! * which case 0 is returned so caller will retry. A negative ! * return value indicates that the caller doesn't need to ! * re-adjust counts when later unblocked. * ! * @return 1: block then adjust, -1: block without adjust, 0 : retry */ ! private int tryCompensate(WorkQueue w) { ! int t, n, sp; ! long c = ctl; ! WorkQueue[] ws = workQueues; ! if ((t = (short)(c >>> TC_SHIFT)) >= 0) { ! if (ws == null || (n = ws.length) <= 0 || w == null) ! return 0; // disabled ! else if ((sp = (int)c) != 0) { // replace or release ! WorkQueue v = ws[sp & (n - 1)]; ! int wp = w.phase; ! long uc = UC_MASK & ((wp < 0) ? c + RC_UNIT : c); ! int np = sp & ~UNSIGNALLED; ! if (v != null) { ! int vp = v.phase; ! Thread vt = v.owner; ! long nc = ((long)v.stackPred & SP_MASK) | uc; ! if (vp == sp && CTL.compareAndSet(this, c, nc)) { ! v.phase = np; ! if (vt != null && v.source < 0) ! LockSupport.unpark(vt); ! return (wp < 0) ? -1 : 1; } } ! return 0; ! } ! else if ((int)(c >> RC_SHIFT) - // reduce parallelism ! (short)(bounds & SMASK) > 0) { ! long nc = ((RC_MASK & (c - RC_UNIT)) | (~RC_MASK & c)); ! return CTL.compareAndSet(this, c, nc) ? 1 : 0; } ! else { // validate ! int md = mode, pc = md & SMASK, tc = pc + t, bc = 0; ! boolean unstable = false; ! for (int i = 1; i < n; i += 2) { ! WorkQueue q; Thread wt; Thread.State ts; ! if ((q = ws[i]) != null) { ! if (q.source == 0) { ! unstable = true; break; } ! else { ! --tc; ! if ((wt = q.owner) != null && ! ((ts = wt.getState()) == Thread.State.BLOCKED || ! ts == Thread.State.WAITING)) ! ++bc; // worker is blocking } } } ! if (unstable || tc != 0 || ctl != c) ! return 0; // inconsistent ! else if (t + pc >= MAX_CAP || t >= (bounds >>> SWIDTH)) { ! Predicate<? super ForkJoinPool> sat; ! if ((sat = saturate) != null && sat.test(this)) return -1; ! else if (bc < pc) { // lagging ! Thread.yield(); // for retry spins ! return 0; ! } ! else ! throw new RejectedExecutionException( ! "Thread limit exceeded replacing blocked worker"); } } } ! long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK); // expand pool ! return CTL.compareAndSet(this, c, nc) && createWorker() ? 1 : 0; } /** ! * Top-level runloop for workers, called by ForkJoinWorkerThread.run. ! * See above for explanation. */ ! final void runWorker(WorkQueue w) { ! int r = (w.id ^ ThreadLocalRandom.nextSecondarySeed()) | FIFO; // rng ! w.array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY]; // initialize ! for (;;) { ! int phase; ! if (scan(w, r)) { // scan until apparently empty ! r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // move (xorshift) ! } ! else if ((phase = w.phase) >= 0) { // enqueue, then rescan ! long np = (w.phase = (phase + SS_SEQ) | UNSIGNALLED) & SP_MASK; ! long c, nc; ! do { ! w.stackPred = (int)(c = ctl); ! nc = ((c - RC_UNIT) & UC_MASK) | np; ! } while (!CTL.weakCompareAndSet(this, c, nc)); ! } ! else { // already queued ! int pred = w.stackPred; ! Thread.interrupted(); // clear before park ! w.source = DORMANT; // enable signal ! long c = ctl; ! int md = mode, rc = (md & SMASK) + (int)(c >> RC_SHIFT); ! if (md < 0) // terminating ! break; ! else if (rc <= 0 && (md & SHUTDOWN) != 0 && ! tryTerminate(false, false)) ! break; // quiescent shutdown ! else if (w.phase < 0) { ! if (rc <= 0 && pred != 0 && phase == (int)c) { ! long nc = (UC_MASK & (c - TC_UNIT)) | (SP_MASK & pred); ! long d = keepAlive + System.currentTimeMillis(); ! LockSupport.parkUntil(this, d); ! if (ctl == c && // drop on timeout if all idle ! d - System.currentTimeMillis() <= TIMEOUT_SLOP && ! CTL.compareAndSet(this, c, nc)) { ! w.phase = QUIET; break; } } - else { - LockSupport.park(this); - if (w.phase < 0) // one spurious wakeup check - LockSupport.park(this); } } - w.source = 0; // disable signal } } } /** ! * Scans for and if found executes one or more top-level tasks from a queue. * ! * @return true if found an apparently non-empty queue, and ! * possibly ran task(s). */ ! private boolean scan(WorkQueue w, int r) { ! WorkQueue[] ws; int n; ! if ((ws = workQueues) != null && (n = ws.length) > 0 && w != null) { ! for (int m = n - 1, j = r & m;;) { ! WorkQueue q; int b; ! if ((q = ws[j]) != null && q.top != (b = q.base)) { ! int qid = q.id; ! ForkJoinTask<?>[] a; int cap, k; ForkJoinTask<?> t; if ((a = q.array) != null && (cap = a.length) > 0) { ! t = (ForkJoinTask<?>)QA.getAcquire(a, k = (cap - 1) & b); ! if (q.base == b++ && t != null && ! QA.compareAndSet(a, k, t, null)) { ! q.base = b; ! w.source = qid; ! if (a[(cap - 1) & b] != null) ! signalWork(q); // help signal if more tasks ! w.topLevelExec(t, q, // random fairness bound ! (r | (1 << TOP_BOUND_SHIFT)) & SMASK); } } - return true; } - else if (--n > 0) - j = (j + 1) & m; - else - break; } } ! return false; } /** ! * Helps and/or blocks until the given task is done or timeout. ! * First tries locally helping, then scans other queues for a task ! * produced by one of w's stealers; compensating and blocking if ! * none are found (rescanning if tryCompensate fails). * ! * @param w caller ! * @param task the task ! * @param deadline for timed waits, if nonzero * @return task status on exit */ ! final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) { int s = 0; ! int seed = ThreadLocalRandom.nextSecondarySeed(); ! if (w != null && task != null && ! (!(task instanceof CountedCompleter) || ! (s = w.helpCC((CountedCompleter<?>)task, 0, false)) >= 0)) { ! w.tryRemoveAndExec(task); ! int src = w.source, id = w.id; ! int r = (seed >>> 16) | 1, step = (seed & ~1) | 2; ! s = task.status; ! while (s >= 0) { ! WorkQueue[] ws; ! int n = (ws = workQueues) == null ? 0 : ws.length, m = n - 1; ! while (n > 0) { ! WorkQueue q; int b; ! if ((q = ws[r & m]) != null && q.source == id && ! q.top != (b = q.base)) { ! ForkJoinTask<?>[] a; int cap, k; ! int qid = q.id; ! if ((a = q.array) != null && (cap = a.length) > 0) { ! ForkJoinTask<?> t = (ForkJoinTask<?>) ! QA.getAcquire(a, k = (cap - 1) & b); ! if (q.source == id && q.base == b++ && ! t != null && QA.compareAndSet(a, k, t, null)) { ! q.base = b; ! w.source = qid; ! t.doExec(); ! w.source = src; } } break; } - else { - r += step; - --n; } } - if ((s = task.status) < 0) - break; - else if (n == 0) { // empty scan - long ms, ns; int block; - if (deadline == 0L) - ms = 0L; // untimed - else if ((ns = deadline - System.nanoTime()) <= 0L) - break; // timeout - else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L) - ms = 1L; // avoid 0 for timed wait - if ((block = tryCompensate(w)) != 0) { - task.internalWait(ms); - CTL.getAndAdd(this, (block > 0) ? RC_UNIT : 0L); - } - s = task.status; } } } return s; } /** * Runs tasks until {@code isQuiescent()}. Rather than blocking * when tasks cannot be found, rescans until all others cannot * find tasks either. */ ! final void helpQuiescePool(WorkQueue w) { ! int prevSrc = w.source; ! int seed = ThreadLocalRandom.nextSecondarySeed(); ! int r = seed >>> 16, step = r | 1; ! for (int source = prevSrc, released = -1;;) { // -1 until known ! ForkJoinTask<?> localTask; WorkQueue[] ws; ! while ((localTask = w.nextLocalTask()) != null) ! localTask.doExec(); ! if (w.phase >= 0 && released == -1) ! released = 1; ! boolean quiet = true, empty = true; ! int n = (ws = workQueues) == null ? 0 : ws.length; ! for (int m = n - 1; n > 0; r += step, --n) { ! WorkQueue q; int b; ! if ((q = ws[r & m]) != null) { ! int qs = q.source; ! if (q.top != (b = q.base)) { ! quiet = empty = false; ! ForkJoinTask<?>[] a; int cap, k; ! int qid = q.id; ! if ((a = q.array) != null && (cap = a.length) > 0) { ! if (released == 0) { // increment ! released = 1; ! CTL.getAndAdd(this, RC_UNIT); ! } ! ForkJoinTask<?> t = (ForkJoinTask<?>) ! QA.getAcquire(a, k = (cap - 1) & b); ! if (q.base == b++ && t != null && ! QA.compareAndSet(a, k, t, null)) { ! q.base = b; ! w.source = qid; ! t.doExec(); ! w.source = source = prevSrc; } } break; } ! else if ((qs & QUIET) == 0) ! quiet = false; } } ! if (quiet) { ! if (released == 0) ! CTL.getAndAdd(this, RC_UNIT); w.source = prevSrc; ! break; } ! else if (empty) { ! if (source != QUIET) ! w.source = source = QUIET; ! if (released == 1) { // decrement ! released = 0; ! CTL.getAndAdd(this, RC_MASK & -RC_UNIT); } } } } /** ! * Scans for and returns a polled task, if available. ! * Used only for untracked polls. * ! * @param submissionsOnly if true, only scan submission queues */ ! private ForkJoinTask<?> pollScan(boolean submissionsOnly) { ! WorkQueue[] ws; int n; ! rescan: while ((mode & STOP) == 0 && (ws = workQueues) != null && ! (n = ws.length) > 0) { ! int m = n - 1; ! int r = ThreadLocalRandom.nextSecondarySeed(); ! int h = r >>> 16; ! int origin, step; ! if (submissionsOnly) { ! origin = (r & ~1) & m; // even indices and steps ! step = (h & ~1) | 2; ! } ! else { ! origin = r & m; ! step = h | 1; ! } ! boolean nonempty = false; ! for (int i = origin, oldSum = 0, checkSum = 0;;) { ! WorkQueue q; ! if ((q = ws[i]) != null) { ! int b; ForkJoinTask<?> t; ! if (q.top - (b = q.base) > 0) { ! nonempty = true; ! if ((t = q.poll()) != null) ! return t; ! } ! else ! checkSum += b + q.id; } ! if ((i = (i + step) & m) == origin) { ! if (!nonempty && oldSum == (oldSum = checkSum)) ! break rescan; ! checkSum = 0; ! nonempty = false; } } } - return null; } /** * Gets and removes a local or stolen task for the given worker. * * @return a task, if available */ final ForkJoinTask<?> nextTaskFor(WorkQueue w) { ForkJoinTask<?> t; ! if (w == null || (t = w.nextLocalTask()) == null) t = pollScan(false); return t; } // External operations /** ! * Adds the given task to a submission queue at submitter's ! * current queue, creating one if null or contended. ! * ! * @param task the task. Caller must ensure non-null. */ ! final void externalPush(ForkJoinTask<?> task) { ! int r; // initialize caller's probe if ((r = ThreadLocalRandom.getProbe()) == 0) { ! ThreadLocalRandom.localInit(); r = ThreadLocalRandom.getProbe(); } ! for (;;) { ! WorkQueue q; ! int md = mode, n; ! WorkQueue[] ws = workQueues; ! if ((md & SHUTDOWN) != 0 || ws == null || (n = ws.length) <= 0) ! throw new RejectedExecutionException(); ! else if ((q = ws[(n - 1) & r & SQMASK]) == null) { // add queue ! int qid = (r | QUIET) & ~(FIFO | OWNED); ! Object lock = workerNamePrefix; ! ForkJoinTask<?>[] qa = ! new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY]; ! q = new WorkQueue(this, null); ! q.array = qa; ! q.id = qid; ! q.source = QUIET; ! if (lock != null) { // unless disabled, lock pool to install ! synchronized (lock) { ! WorkQueue[] vs; int i, vn; ! if ((vs = workQueues) != null && (vn = vs.length) > 0 && ! vs[i = qid & (vn - 1) & SQMASK] == null) ! vs[i] = q; // else another thread already installed ! } } } ! else if (!q.tryLockPhase()) // move if busy ! r = ThreadLocalRandom.advanceProbe(r); ! else { ! if (q.lockedPush(task)) ! signalWork(null); ! return; } } } /** * Pushes a possibly-external submission. */ private <T> ForkJoinTask<T> externalSubmit(ForkJoinTask<T> task) { ! Thread t; ForkJoinWorkerThread w; WorkQueue q; if (task == null) throw new NullPointerException(); if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) && ! (w = (ForkJoinWorkerThread)t).pool == this && ! (q = w.workQueue) != null) ! q.push(task); else externalPush(task); return task; } /** ! * Returns common pool queue for an external thread. ! */ ! static WorkQueue commonSubmitterQueue() { ! ForkJoinPool p = common; ! int r = ThreadLocalRandom.getProbe(); ! WorkQueue[] ws; int n; ! return (p != null && (ws = p.workQueues) != null && ! (n = ws.length) > 0) ? ! ws[(n - 1) & r & SQMASK] : null; } /** ! * Performs tryUnpush for an external submitter. ! */ ! final boolean tryExternalUnpush(ForkJoinTask<?> task) { ! int r = ThreadLocalRandom.getProbe(); ! WorkQueue[] ws; WorkQueue w; int n; ! return ((ws = workQueues) != null && ! (n = ws.length) > 0 && ! (w = ws[(n - 1) & r & SQMASK]) != null && ! w.tryLockedUnpush(task)); ! } ! ! /** ! * Performs helpComplete for an external submitter. */ ! final int externalHelpComplete(CountedCompleter<?> task, int maxTasks) { ! int r = ThreadLocalRandom.getProbe(); ! WorkQueue[] ws; WorkQueue w; int n; ! return ((ws = workQueues) != null && (n = ws.length) > 0 && ! (w = ws[(n - 1) & r & SQMASK]) != null) ? ! w.helpCC(task, maxTasks, true) : 0; } ! ! /** ! * Tries to steal and run tasks within the target's computation. ! * The maxTasks argument supports external usages; internal calls ! * use zero, allowing unbounded steps (external calls trap ! * non-positive values). ! * ! * @param w caller ! * @param maxTasks if non-zero, the maximum number of other tasks to run ! * @return task status on exit ! */ ! final int helpComplete(WorkQueue w, CountedCompleter<?> task, ! int maxTasks) { ! return (w == null) ? 0 : w.helpCC(task, maxTasks, false); } /** * Returns a cheap heuristic guide for task partitioning when * programmers, frameworks, tools, or languages have little or no --- 1512,2216 ---- * * @param wt the worker thread, or null if construction failed * @param ex the exception causing failure, or null if none */ final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) { + ReentrantLock lock = registrationLock; WorkQueue w = null; ! int cfg = 0; ! if (wt != null && (w = wt.workQueue) != null && lock != null) { ! WorkQueue[] qs; int n, i; ! cfg = w.config; ! long ns = w.nsteals & 0xffffffffL; ! lock.lock(); // remove index from array ! if ((qs = queues) != null && (n = qs.length) > 0 && ! qs[i = cfg & (n - 1)] == w) ! qs[i] = null; ! stealCount += ns; // accumulate steals ! lock.unlock(); ! long c = ctl; ! if ((cfg & QUIET) == 0) // unless self-signalled, decrement counts ! do {} while (c != (c = compareAndExchangeCtl( ! c, ((RC_MASK & (c - RC_UNIT)) | (TC_MASK & (c - TC_UNIT)) | ! (SP_MASK & c))))); ! else if ((int)c == 0) // was dropped on timeout ! cfg = 0; // suppress signal if last ! for (ForkJoinTask<?> t; (t = w.pop()) != null; ) ! ForkJoinTask.cancelIgnoringExceptions(t); // cancel tasks } ! if (!tryTerminate(false, false) && w != null && (cfg & SRC) != 0) ! signalWork(); // possibly replace worker ! if (ex != null) ForkJoinTask.rethrow(ex); } ! /* * Tries to create or release a worker if too few are running. */ ! final void signalWork() { ! for (long c = ctl; c < 0L;) { ! int sp, i; WorkQueue[] qs; WorkQueue v; ! if ((sp = (int)c & ~UNSIGNALLED) == 0) { // no idle workers ! if ((c & ADD_WORKER) == 0L) // enough total workers ! break; ! if (c == (c = compareAndExchangeCtl( ! c, ((RC_MASK & (c + RC_UNIT)) | ! (TC_MASK & (c + TC_UNIT)))))) { ! createWorker(); break; } ! } ! else if ((qs = queues) == null) break; // unstarted/terminated ! else if (qs.length <= (i = sp & SMASK)) break; // terminated ! else if ((v = qs[i]) == null) break; // terminating else { long nc = (v.stackPred & SP_MASK) | (UC_MASK & (c + RC_UNIT)); Thread vt = v.owner; ! if (c == (c = compareAndExchangeCtl(c, nc))) { ! v.phase = sp; ! LockSupport.unpark(vt); // release idle worker break; } } } } /** ! * Top-level runloop for workers, called by ForkJoinWorkerThread.run. ! * See above for explanation. * ! * @param w caller's WorkQueue (may be null on failed initialization) */ ! final void runWorker(WorkQueue w) { ! if (w != null) { // skip on failed init ! w.config |= SRC; // mark as valid source ! int r = w.stackPred, src = 0; // use seed from registerWorker ! do { ! r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift ! } while ((src = scan(w, src, r)) >= 0 || ! (src = awaitWork(w)) == 0); } } ! ! /** ! * Scans for and if found executes top-level tasks: Tries to poll ! * each queue starting at a random index with random stride, ! * returning source id or retry indicator if contended or ! * inconsistent. ! * ! * @param w caller's WorkQueue ! * @param prevSrc the previous queue stolen from in current phase, or 0 ! * @param r random seed ! * @return id of queue if taken, negative if none found, prevSrc for retry ! */ ! private int scan(WorkQueue w, int prevSrc, int r) { ! WorkQueue[] qs = queues; ! int n = (w == null || qs == null) ? 0 : qs.length; ! for (int step = (r >>> 16) | 1, i = n; i > 0; --i, r += step) { ! int j, cap, b; WorkQueue q; ForkJoinTask<?>[] a; ! if ((q = qs[j = r & (n - 1)]) != null && // poll at qs[j].array[k] ! (a = q.array) != null && (cap = a.length) > 0) { ! int k = (cap - 1) & (b = q.base), nextBase = b + 1; ! int nextIndex = (cap - 1) & nextBase, src = j | SRC; ! ForkJoinTask<?> t = WorkQueue.getSlot(a, k); ! if (q.base != b) // inconsistent ! return prevSrc; ! else if (t != null && WorkQueue.casSlotToNull(a, k, t)) { ! q.base = nextBase; ! ForkJoinTask<?> next = a[nextIndex]; ! if ((w.source = src) != prevSrc && next != null) ! signalWork(); // propagate ! w.topLevelExec(t, q); ! return src; ! } ! else if (a[nextIndex] != null) // revisit ! return prevSrc; ! } ! } ! return (queues != qs) ? prevSrc: -1; // possibly resized } ! ! /** ! * Advances worker phase, pushes onto ctl stack, and awaits signal ! * or reports termination. ! * ! * @return negative if terminated, else 0 ! */ ! private int awaitWork(WorkQueue w) { ! if (w == null) ! return -1; // already terminated ! int phase = (w.phase + SS_SEQ) & ~UNSIGNALLED; ! w.phase = phase | UNSIGNALLED; // advance phase ! long prevCtl = ctl, c; // enqueue ! do { ! w.stackPred = (int)prevCtl; ! c = ((prevCtl - RC_UNIT) & UC_MASK) | (phase & SP_MASK); ! } while (prevCtl != (prevCtl = compareAndExchangeCtl(prevCtl, c))); ! ! Thread.interrupted(); // clear status ! LockSupport.setCurrentBlocker(this); // prepare to block (exit also OK) ! long deadline = 0L; // nonzero if possibly quiescent ! int ac = (int)(c >> RC_SHIFT), md; ! if ((md = mode) < 0) // pool is terminating ! return -1; ! else if ((md & SMASK) + ac <= 0) { ! boolean checkTermination = (md & SHUTDOWN) != 0; ! if ((deadline = System.currentTimeMillis() + keepAlive) == 0L) ! deadline = 1L; // avoid zero ! WorkQueue[] qs = queues; // check for racing submission ! int n = (qs == null) ? 0 : qs.length; ! for (int i = 0; i < n; i += 2) { ! WorkQueue q; ForkJoinTask<?>[] a; int cap, b; ! if (ctl != c) { // already signalled ! checkTermination = false; break; } ! else if ((q = qs[i]) != null && ! (a = q.array) != null && (cap = a.length) > 0 && ! ((b = q.base) != q.top || a[(cap - 1) & b] != null || ! q.source != 0)) { ! if (compareAndSetCtl(c, prevCtl)) ! w.phase = phase; // self-signal ! checkTermination = false; ! break; } } + if (checkTermination && tryTerminate(false, false)) + return -1; // trigger quiescent termination } ! ! for (boolean alt = false;;) { // await activation or termination ! if (w.phase >= 0) ! break; ! else if (mode < 0) return -1; ! else if ((c = ctl) == prevCtl) ! Thread.onSpinWait(); // signal in progress ! else if (!(alt = !alt)) // check between park calls ! Thread.interrupted(); ! else if (deadline == 0L) ! LockSupport.park(); ! else if (deadline - System.currentTimeMillis() > TIMEOUT_SLOP) ! LockSupport.parkUntil(deadline); ! else if (((int)c & SMASK) == (w.config & SMASK) && ! compareAndSetCtl(c, ((UC_MASK & (c - TC_UNIT)) | ! (prevCtl & SP_MASK)))) { ! w.config |= QUIET; // sentinel for deregisterWorker ! return -1; // drop on timeout } + else if ((deadline += keepAlive) == 0L) + deadline = 1L; // not at head; restart timer } + return 0; } ! // Utilities used by ForkJoinTask ! ! /** ! * Returns true if all workers are busy, possibly creating one if allowed ! */ ! final boolean isSaturated() { ! int maxTotal = bounds >>> SWIDTH; ! for (long c;;) { ! if (((int)(c = ctl) & ~UNSIGNALLED) != 0) ! return false; ! if ((short)(c >>> TC_SHIFT) >= maxTotal) ! return true; ! long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK); ! if (compareAndSetCtl(c, nc)) ! return !createWorker(); ! } } /** ! * Returns true if can start terminating if enabled, or already terminated */ ! final boolean canStop() { ! outer: for (long oldSum = 0L;;) { // repeat until stable ! int md; WorkQueue[] qs; long c; ! if ((qs = queues) == null || ((md = mode) & STOP) != 0) ! return true; ! if ((md & SMASK) + (int)((c = ctl) >> RC_SHIFT) > 0) break; + long checkSum = c; + for (int i = 1; i < qs.length; i += 2) { // scan submitters + WorkQueue q; ForkJoinTask<?>[] a; int s = 0, cap; + if ((q = qs[i]) != null && (a = q.array) != null && + (cap = a.length) > 0 && + ((s = q.top) != q.base || a[(cap - 1) & s] != null || + q.source != 0)) + break outer; + checkSum += (((long)i) << 32) ^ s; } + if (oldSum == (oldSum = checkSum) && queues == qs) + return true; + } + return (mode & STOP) != 0; // recheck mode on false return + } + + /** + * Tries to decrement counts (sometimes implicitly) and possibly + * arrange for a compensating worker in preparation for + * blocking. May fail due to interference, in which case -1 is + * returned so caller may retry. A zero return value indicates + * that the caller doesn't need to re-adjust counts when later + * unblocked. + * + * @param c incoming ctl value + * @return UNCOMPENSATE: block then adjust, 0: block, -1 : retry + */ + private int tryCompensate(long c) { + Predicate<? super ForkJoinPool> sat; + int b = bounds; // counts are signed; centered at parallelism level == 0 + int minActive = (short)(b & SMASK), + maxTotal = b >>> SWIDTH, + active = (int)(c >> RC_SHIFT), + total = (short)(c >>> TC_SHIFT), + sp = (int)c & ~UNSIGNALLED; + if (total >= 0) { + if (sp != 0) { // activate idle worker + WorkQueue[] qs; int n; WorkQueue v; + if ((qs = queues) != null && (n = qs.length) > 0 && + (v = qs[sp & (n - 1)]) != null) { + Thread vt = v.owner; + long nc = ((long)v.stackPred & SP_MASK) | (UC_MASK & c); + if (compareAndSetCtl(c, nc)) { + v.phase = sp; + LockSupport.unpark(vt); + return UNCOMPENSATE; } } + return -1; // retry + } + else if (active > minActive) { // reduce parallelism + long nc = ((RC_MASK & (c - RC_UNIT)) | (~RC_MASK & c)); + return compareAndSetCtl(c, nc) ? UNCOMPENSATE : -1; } } + if (total < maxTotal) { // expand pool + long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK); + return (!compareAndSetCtl(c, nc) ? -1 : + !createWorker() ? 0 : UNCOMPENSATE); + } + else if (!compareAndSetCtl(c, c)) // validate + return -1; + else if ((sat = saturate) != null && sat.test(this)) + return 0; + else + throw new RejectedExecutionException( + "Thread limit exceeded replacing blocked worker"); } + + /** + * Readjusts RC count; called from ForkJoinTask after blocking. + */ + final void uncompensate() { + getAndAddCtl(RC_UNIT); } /** ! * Helps if possible until the given task is done. Scans other ! * queues for a task produced by one of w's stealers; returning ! * compensated blocking sentinel if none are found. * ! * @param task the task ! * @param w caller's WorkQueue ! * @return task status on exit, or UNCOMPENSATE for compensated blocking */ ! final int helpJoin(ForkJoinTask<?> task, WorkQueue w) { ! int s = 0; ! if (task != null && w != null) { ! int wsrc = w.source, wid = w.config & SMASK, r = wid + 2; ! boolean scan = true; ! long c = 0L; // track ctl stability ! outer: for (;;) { ! if ((s = task.status) < 0) ! break; ! else if (scan = !scan) { // previous scan was empty ! if (mode < 0) ! ForkJoinTask.cancelIgnoringExceptions(task); ! else if (c == (c = ctl) && (s = tryCompensate(c)) >= 0) ! break; // block ! } ! else { // scan for subtasks ! WorkQueue[] qs = queues; ! int n = (qs == null) ? 0 : qs.length, m = n - 1; ! for (int i = n; i > 0; i -= 2, r += 2) { ! int j; WorkQueue q, x, y; ForkJoinTask<?>[] a; ! if ((q = qs[j = r & m]) != null) { ! int sq = q.source & SMASK, cap, b; if ((a = q.array) != null && (cap = a.length) > 0) { ! int k = (cap - 1) & (b = q.base); ! int nextBase = b + 1, src = j | SRC, sx; ! ForkJoinTask<?> t = WorkQueue.getSlot(a, k); ! boolean eligible = sq == wid || ! ((x = qs[sq & m]) != null && // indirect ! ((sx = (x.source & SMASK)) == wid || ! ((y = qs[sx & m]) != null && // 2-indirect ! (y.source & SMASK) == wid))); ! if ((s = task.status) < 0) ! break outer; ! else if ((q.source & SMASK) != sq || ! q.base != b) ! scan = true; // inconsistent ! else if (t == null) ! scan |= (a[nextBase & (cap - 1)] != null || ! q.top != b); // lagging ! else if (eligible) { ! if (WorkQueue.casSlotToNull(a, k, t)) { ! q.base = nextBase; ! w.source = src; ! t.doExec(); ! w.source = wsrc; ! } ! scan = true; ! break; ! } } } } } } ! } ! return s; } /** ! * Extra helpJoin steps for CountedCompleters. Scans for and runs ! * subtasks of the given root task, returning if none are found. * ! * @param task root of CountedCompleter computation ! * @param w caller's WorkQueue ! * @param owned true if owned by a ForkJoinWorkerThread * @return task status on exit */ ! final int helpComplete(ForkJoinTask<?> task, WorkQueue w, boolean owned) { int s = 0; ! if (task != null && w != null) { ! int r = w.config; ! boolean scan = true, locals = true; ! long c = 0L; ! outer: for (;;) { ! if (locals) { // try locals before scanning ! if ((s = w.helpComplete(task, owned, 0)) < 0) ! break; ! locals = false; ! } ! else if ((s = task.status) < 0) ! break; ! else if (scan = !scan) { ! if (c == (c = ctl)) ! break; ! } ! else { // scan for subtasks ! WorkQueue[] qs = queues; ! int n = (qs == null) ? 0 : qs.length; ! for (int i = n; i > 0; --i, ++r) { ! int j, cap, b; WorkQueue q; ForkJoinTask<?>[] a; ! boolean eligible = false; ! if ((q = qs[j = r & (n - 1)]) != null && ! (a = q.array) != null && (cap = a.length) > 0) { ! int k = (cap - 1) & (b = q.base), nextBase = b + 1; ! ForkJoinTask<?> t = WorkQueue.getSlot(a, k); ! if (t instanceof CountedCompleter) { ! CountedCompleter<?> f = (CountedCompleter<?>)t; ! do {} while (!(eligible = (f == task)) && ! (f = f.completer) != null); } + if ((s = task.status) < 0) + break outer; + else if (q.base != b) + scan = true; // inconsistent + else if (t == null) + scan |= (a[nextBase & (cap - 1)] != null || + q.top != b); + else if (eligible) { + if (WorkQueue.casSlotToNull(a, k, t)) { + q.setBaseOpaque(nextBase); + t.doExec(); + locals = true; } + scan = true; break; } } } } } } return s; } /** + * Scans for and returns a polled task, if available. Used only + * for untracked polls. Begins scan at an index (scanRover) + * advanced on each call, to avoid systematic unfairness. + * + * @param submissionsOnly if true, only scan submission queues + */ + private ForkJoinTask<?> pollScan(boolean submissionsOnly) { + VarHandle.acquireFence(); + int r = scanRover += 0x61c88647; // Weyl increment; raciness OK + if (submissionsOnly) // even indices only + r &= ~1; + int step = (submissionsOnly) ? 2 : 1; + WorkQueue[] qs; int n; + while ((qs = queues) != null && (n = qs.length) > 0) { + boolean scan = false; + for (int i = 0; i < n; i += step) { + int j, cap, b; WorkQueue q; ForkJoinTask<?>[] a; + if ((q = qs[j = (n - 1) & (r + i)]) != null && + (a = q.array) != null && (cap = a.length) > 0) { + int k = (cap - 1) & (b = q.base), nextBase = b + 1; + ForkJoinTask<?> t = WorkQueue.getSlot(a, k); + if (q.base != b) + scan = true; + else if (t == null) + scan |= (q.top != b || a[nextBase & (cap - 1)] != null); + else if (!WorkQueue.casSlotToNull(a, k, t)) + scan = true; + else { + q.setBaseOpaque(nextBase); + return t; + } + } + } + if (!scan && queues == qs) + break; + } + return null; + } + + /** * Runs tasks until {@code isQuiescent()}. Rather than blocking * when tasks cannot be found, rescans until all others cannot * find tasks either. + * + * @param nanos max wait time (Long.MAX_VALUE if effectively untimed) + * @param interruptible true if return on interrupt + * @return positive if quiescent, negative if interrupted, else 0 */ ! final int helpQuiescePool(WorkQueue w, long nanos, boolean interruptible) { ! if (w == null) ! return 0; ! long startTime = System.nanoTime(), parkTime = 0L; ! int prevSrc = w.source, wsrc = prevSrc, cfg = w.config, r = cfg + 1; ! for (boolean active = true, locals = true;;) { ! boolean busy = false, scan = false; ! if (locals) { // run local tasks before (re)polling ! locals = false; ! for (ForkJoinTask<?> u; (u = w.nextLocalTask(cfg)) != null;) ! u.doExec(); ! } ! WorkQueue[] qs = queues; ! int n = (qs == null) ? 0 : qs.length; ! for (int i = n; i > 0; --i, ++r) { ! int j, b, cap; WorkQueue q; ForkJoinTask<?>[] a; ! if ((q = qs[j = (n - 1) & r]) != null && q != w && ! (a = q.array) != null && (cap = a.length) > 0) { ! int k = (cap - 1) & (b = q.base); ! int nextBase = b + 1, src = j | SRC; ! ForkJoinTask<?> t = WorkQueue.getSlot(a, k); ! if (q.base != b) ! busy = scan = true; ! else if (t != null) { ! busy = scan = true; ! if (!active) { // increment before taking ! active = true; ! getAndAddCtl(RC_UNIT); } + if (WorkQueue.casSlotToNull(a, k, t)) { + q.base = nextBase; + w.source = src; + t.doExec(); + w.source = wsrc = prevSrc; + locals = true; } break; } ! else if (!busy) { ! if (q.top != b || a[nextBase & (cap - 1)] != null) ! busy = scan = true; ! else if (q.source != QUIET && q.phase >= 0) ! busy = true; } } ! } ! VarHandle.acquireFence(); ! if (!scan && queues == qs) { ! boolean interrupted; ! if (!busy) { w.source = prevSrc; ! if (!active) ! getAndAddCtl(RC_UNIT); ! return 1; ! } ! if (wsrc != QUIET) ! w.source = wsrc = QUIET; ! if (active) { // decrement ! active = false; ! parkTime = 0L; ! getAndAddCtl(RC_MASK & -RC_UNIT); ! } ! else if (parkTime == 0L) { ! parkTime = 1L << 10; // initially about 1 usec ! Thread.yield(); ! } ! else if ((interrupted = interruptible && Thread.interrupted()) || ! System.nanoTime() - startTime > nanos) { ! getAndAddCtl(RC_UNIT); ! return interrupted ? -1 : 0; } ! else { ! LockSupport.parkNanos(this, parkTime); ! if (parkTime < nanos >>> 8 && parkTime < 1L << 20) ! parkTime <<= 1; // max sleep approx 1 sec or 1% nanos } } } } /** ! * Helps quiesce from external caller until done, interrupted, or timeout * ! * @param nanos max wait time (Long.MAX_VALUE if effectively untimed) ! * @param interruptible true if return on interrupt ! * @return positive if quiescent, negative if interrupted, else 0 */ ! final int externalHelpQuiescePool(long nanos, boolean interruptible) { ! for (long startTime = System.nanoTime(), parkTime = 0L;;) { ! ForkJoinTask<?> t; ! if ((t = pollScan(false)) != null) { ! t.doExec(); ! parkTime = 0L; } ! else if (canStop()) ! return 1; ! else if (parkTime == 0L) { ! parkTime = 1L << 10; ! Thread.yield(); } + else if ((System.nanoTime() - startTime) > nanos) + return 0; + else if (interruptible && Thread.interrupted()) + return -1; + else { + LockSupport.parkNanos(this, parkTime); + if (parkTime < nanos >>> 8 && parkTime < 1L << 20) + parkTime <<= 1; } } } /** * Gets and removes a local or stolen task for the given worker. * * @return a task, if available */ final ForkJoinTask<?> nextTaskFor(WorkQueue w) { ForkJoinTask<?> t; ! if (w == null || (t = w.nextLocalTask(w.config)) == null) t = pollScan(false); return t; } // External operations /** ! * Finds and locks a WorkQueue for an external submitter, or ! * returns null if shutdown or terminating. */ ! final WorkQueue submissionQueue() { ! int r; if ((r = ThreadLocalRandom.getProbe()) == 0) { ! ThreadLocalRandom.localInit(); // initialize caller's probe r = ThreadLocalRandom.getProbe(); } ! for (int id = r << 1;;) { // even indices only ! int md = mode, n, i; WorkQueue q; ReentrantLock lock; ! WorkQueue[] qs = queues; ! if ((md & SHUTDOWN) != 0 || qs == null || (n = qs.length) <= 0) ! return null; ! else if ((q = qs[i = (n - 1) & id]) == null) { ! if ((lock = registrationLock) != null) { ! WorkQueue w = new WorkQueue(id | SRC); ! lock.lock(); // install under lock ! if (qs[i] == null) ! qs[i] = w; // else lost race; discard ! lock.unlock(); } } ! else if (!q.tryLock()) // move and restart ! id = (r = ThreadLocalRandom.advanceProbe(r)) << 1; ! else ! return q; } } + + /** + * Adds the given task to an external submission queue, or throws + * exception if shutdown or terminating. + * + * @param task the task. Caller must ensure non-null. + */ + final void externalPush(ForkJoinTask<?> task) { + WorkQueue q; + if ((q = submissionQueue()) == null) + throw new RejectedExecutionException(); // shutdown or disabled + else if (q.lockedPush(task)) + signalWork(); } /** * Pushes a possibly-external submission. */ private <T> ForkJoinTask<T> externalSubmit(ForkJoinTask<T> task) { ! Thread t; ForkJoinWorkerThread wt; WorkQueue q; if (task == null) throw new NullPointerException(); if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) && ! (q = (wt = (ForkJoinWorkerThread)t).workQueue) != null && ! wt.pool == this) ! q.push(task, this); else externalPush(task); return task; } /** ! * Returns common pool queue for an external thread that has ! * possibly ever submitted a common pool task (nonzero probe), or ! * null if none. ! */ ! static WorkQueue commonQueue() { ! ForkJoinPool p; WorkQueue[] qs; ! int r = ThreadLocalRandom.getProbe(), n; ! return ((p = common) != null && (qs = p.queues) != null && ! (n = qs.length) > 0 && r != 0) ? ! qs[(n - 1) & (r << 1)] : null; } /** ! * If the given executor is a ForkJoinPool, poll and execute ! * AsynchronousCompletionTasks from worker's queue until none are ! * available or blocker is released. */ ! static void helpAsyncBlocker(Executor e, ManagedBlocker blocker) { ! WorkQueue w = null; Thread t; ForkJoinWorkerThread wt; ! if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) { ! if ((wt = (ForkJoinWorkerThread)t).pool == e) ! w = wt.workQueue; } ! else if (e == common) ! w = commonQueue(); ! if (w != null) ! w.helpAsyncBlocker(blocker); } /** * Returns a cheap heuristic guide for task partitioning when * programmers, frameworks, tools, or languages have little or no
*** 2049,2135 **** * if no work and no active workers * @param enable if true, terminate when next possible * @return true if terminating or terminated */ private boolean tryTerminate(boolean now, boolean enable) { ! int md; // 3 phases: try to set SHUTDOWN, then STOP, then TERMINATED ! ! while (((md = mode) & SHUTDOWN) == 0) { ! if (!enable || this == common) // cannot shutdown return false; ! else ! MODE.compareAndSet(this, md, md | SHUTDOWN); ! } ! ! while (((md = mode) & STOP) == 0) { // try to initiate termination ! if (!now) { // check if quiescent & empty ! for (long oldSum = 0L;;) { // repeat until stable ! boolean running = false; ! long checkSum = ctl; ! WorkQueue[] ws = workQueues; ! if ((md & SMASK) + (int)(checkSum >> RC_SHIFT) > 0) ! running = true; ! else if (ws != null) { ! WorkQueue w; ! for (int i = 0; i < ws.length; ++i) { ! if ((w = ws[i]) != null) { ! int s = w.source, p = w.phase; ! int d = w.id, b = w.base; ! if (b != w.top || ! ((d & 1) == 1 && (s >= 0 || p >= 0))) { ! running = true; ! break; // working, scanning, or have work ! } ! checkSum += (((long)s << 48) + ((long)p << 32) + ! ((long)b << 16) + (long)d); ! } ! } } ! if (((md = mode) & STOP) != 0) ! break; // already triggered ! else if (running) return false; ! else if (workQueues == ws && oldSum == (oldSum = checkSum)) ! break; ! } } ! if ((md & STOP) == 0) ! MODE.compareAndSet(this, md, md | STOP); ! } ! ! while (((md = mode) & TERMINATED) == 0) { // help terminate others ! for (long oldSum = 0L;;) { // repeat until stable ! WorkQueue[] ws; WorkQueue w; ! long checkSum = ctl; ! if ((ws = workQueues) != null) { ! for (int i = 0; i < ws.length; ++i) { ! if ((w = ws[i]) != null) { ! ForkJoinWorkerThread wt = w.owner; ! w.cancelAll(); // clear queues ! if (wt != null) { ! try { // unblock join or park ! wt.interrupt(); } catch (Throwable ignore) { } } - checkSum += ((long)w.phase << 32) + w.base; } } ! } ! if (((md = mode) & TERMINATED) != 0 || ! (workQueues == ws && oldSum == (oldSum = checkSum))) ! break; ! } ! if ((md & TERMINATED) != 0) ! break; ! else if ((md & SMASK) + (short)(ctl >>> TC_SHIFT) > 0) ! break; ! else if (MODE.compareAndSet(this, md, md | TERMINATED)) { ! synchronized (this) { ! notifyAll(); // for awaitTermination ! } ! break; } } return true; } --- 2279,2323 ---- * if no work and no active workers * @param enable if true, terminate when next possible * @return true if terminating or terminated */ private boolean tryTerminate(boolean now, boolean enable) { ! int md; // try to set SHUTDOWN, then STOP, then help terminate ! if (((md = mode) & SHUTDOWN) == 0) { ! if (!enable) return false; ! md = getAndBitwiseOrMode(SHUTDOWN); } ! if ((md & STOP) == 0) { ! if (!now && !canStop()) return false; ! md = getAndBitwiseOrMode(STOP); } ! for (int k = 0; k < 2; ++k) { // twice in case of lagging qs updates ! for (ForkJoinTask<?> t; (t = pollScan(false)) != null; ) ! ForkJoinTask.cancelIgnoringExceptions(t); // help cancel ! WorkQueue[] qs; int n; WorkQueue q; Thread thread; ! if ((qs = queues) != null && (n = qs.length) > 0) { ! for (int j = 1; j < n; j += 2) { // unblock other workers ! if ((q = qs[j]) != null && (thread = q.owner) != null && ! !thread.isInterrupted()) { ! try { ! thread.interrupt(); } catch (Throwable ignore) { } } } } ! ReentrantLock lock; Condition cond; // signal when no workers ! if (((md = mode) & TERMINATED) == 0 && ! (md & SMASK) + (short)(ctl >>> TC_SHIFT) <= 0 && ! (getAndBitwiseOrMode(TERMINATED) & TERMINATED) == 0 && ! (lock = registrationLock) != null) { ! lock.lock(); ! if ((cond = termination) != null) ! cond.signalAll(); ! lock.unlock(); } } return true; }
*** 2296,2336 **** int maximumPoolSize, int minimumRunnable, Predicate<? super ForkJoinPool> saturate, long keepAliveTime, TimeUnit unit) { ! // check, encode, pack parameters ! if (parallelism <= 0 || parallelism > MAX_CAP || ! maximumPoolSize < parallelism || keepAliveTime <= 0L) throw new IllegalArgumentException(); ! if (factory == null) throw new NullPointerException(); - long ms = Math.max(unit.toMillis(keepAliveTime), TIMEOUT_SLOP); - - int corep = Math.min(Math.max(corePoolSize, parallelism), MAX_CAP); - long c = ((((long)(-corep) << TC_SHIFT) & TC_MASK) | - (((long)(-parallelism) << RC_SHIFT) & RC_MASK)); - int m = parallelism | (asyncMode ? FIFO : 0); - int maxSpares = Math.min(maximumPoolSize, MAX_CAP) - parallelism; - int minAvail = Math.min(Math.max(minimumRunnable, 0), MAX_CAP); - int b = ((minAvail - parallelism) & SMASK) | (maxSpares << SWIDTH); - int n = (parallelism > 1) ? parallelism - 1 : 1; // at least 2 slots - n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; - n = (n + 1) << 1; // power of two, including space for submission queues - - this.workerNamePrefix = "ForkJoinPool-" + nextPoolId() + "-worker-"; - this.workQueues = new WorkQueue[n]; this.factory = factory; this.ueh = handler; this.saturate = saturate; ! this.keepAlive = ms; ! this.bounds = b; ! this.mode = m; ! this.ctl = c; ! checkPermission(); } private static Object newInstanceFromSystemProperty(String property) throws ReflectiveOperationException { String className = System.getProperty(property); return (className == null) ? null --- 2484,2518 ---- int maximumPoolSize, int minimumRunnable, Predicate<? super ForkJoinPool> saturate, long keepAliveTime, TimeUnit unit) { ! checkPermission(); ! int p = parallelism; ! if (p <= 0 || p > MAX_CAP || p > maximumPoolSize || keepAliveTime <= 0L) throw new IllegalArgumentException(); ! if (factory == null || unit == null) throw new NullPointerException(); this.factory = factory; this.ueh = handler; this.saturate = saturate; ! this.keepAlive = Math.max(unit.toMillis(keepAliveTime), TIMEOUT_SLOP); ! int size = 1 << (33 - Integer.numberOfLeadingZeros(p - 1)); ! int corep = Math.min(Math.max(corePoolSize, p), MAX_CAP); ! int maxSpares = Math.min(maximumPoolSize, MAX_CAP) - p; ! int minAvail = Math.min(Math.max(minimumRunnable, 0), MAX_CAP); ! this.bounds = ((minAvail - p) & SMASK) | (maxSpares << SWIDTH); ! this.mode = p | (asyncMode ? FIFO : 0); ! this.ctl = ((((long)(-corep) << TC_SHIFT) & TC_MASK) | ! (((long)(-p) << RC_SHIFT) & RC_MASK)); ! this.registrationLock = new ReentrantLock(); ! this.queues = new WorkQueue[size]; ! String pid = Integer.toString(getAndAddPoolIds(1) + 1); ! this.workerNamePrefix = "ForkJoinPool-" + pid + "-worker-"; } + // helper method for commonPool constructor private static Object newInstanceFromSystemProperty(String property) throws ReflectiveOperationException { String className = System.getProperty(property); return (className == null) ? null
*** 2341,2393 **** /** * Constructor for common pool using parameters possibly * overridden by system properties */ private ForkJoinPool(byte forCommonPoolOnly) { ! int parallelism = -1; ForkJoinWorkerThreadFactory fac = null; UncaughtExceptionHandler handler = null; try { // ignore exceptions in accessing/parsing properties - String pp = System.getProperty - ("java.util.concurrent.ForkJoinPool.common.parallelism"); - if (pp != null) - parallelism = Integer.parseInt(pp); fac = (ForkJoinWorkerThreadFactory) newInstanceFromSystemProperty( "java.util.concurrent.ForkJoinPool.common.threadFactory"); handler = (UncaughtExceptionHandler) newInstanceFromSystemProperty( "java.util.concurrent.ForkJoinPool.common.exceptionHandler"); } catch (Exception ignore) { } ! ! if (fac == null) { ! if (System.getSecurityManager() == null) ! fac = defaultForkJoinWorkerThreadFactory; ! else // use security-managed default ! fac = new InnocuousForkJoinWorkerThreadFactory(); ! } ! if (parallelism < 0 && // default 1 less than #cores ! (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0) ! parallelism = 1; ! if (parallelism > MAX_CAP) ! parallelism = MAX_CAP; ! ! long c = ((((long)(-parallelism) << TC_SHIFT) & TC_MASK) | ! (((long)(-parallelism) << RC_SHIFT) & RC_MASK)); ! int b = ((1 - parallelism) & SMASK) | (COMMON_MAX_SPARES << SWIDTH); ! int n = (parallelism > 1) ? parallelism - 1 : 1; ! n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; ! n = (n + 1) << 1; ! ! this.workerNamePrefix = "ForkJoinPool.commonPool-worker-"; ! this.workQueues = new WorkQueue[n]; ! this.factory = fac; this.ueh = handler; - this.saturate = null; this.keepAlive = DEFAULT_KEEPALIVE; ! this.bounds = b; ! this.mode = parallelism; ! this.ctl = c; } /** * Returns the common pool instance. This pool is statically * constructed; its run state is unaffected by attempts to {@link --- 2523,2559 ---- /** * Constructor for common pool using parameters possibly * overridden by system properties */ private ForkJoinPool(byte forCommonPoolOnly) { ! int parallelism = Runtime.getRuntime().availableProcessors() - 1; ForkJoinWorkerThreadFactory fac = null; UncaughtExceptionHandler handler = null; try { // ignore exceptions in accessing/parsing properties fac = (ForkJoinWorkerThreadFactory) newInstanceFromSystemProperty( "java.util.concurrent.ForkJoinPool.common.threadFactory"); handler = (UncaughtExceptionHandler) newInstanceFromSystemProperty( "java.util.concurrent.ForkJoinPool.common.exceptionHandler"); + String pp = System.getProperty + ("java.util.concurrent.ForkJoinPool.common.parallelism"); + if (pp != null) + parallelism = Integer.parseInt(pp); } catch (Exception ignore) { } ! int p = this.mode = Math.min(Math.max(parallelism, 0), MAX_CAP); ! int size = 1 << (33 - Integer.numberOfLeadingZeros(p > 0 ? p - 1 : 1)); ! this.factory = (fac != null) ? fac : ! new DefaultCommonPoolForkJoinWorkerThreadFactory(); this.ueh = handler; this.keepAlive = DEFAULT_KEEPALIVE; ! this.saturate = null; ! this.workerNamePrefix = null; ! this.bounds = ((1 - p) & SMASK) | (COMMON_MAX_SPARES << SWIDTH); ! this.ctl = ((((long)(-p) << TC_SHIFT) & TC_MASK) | ! (((long)(-p) << RC_SHIFT) & RC_MASK)); ! this.queues = new WorkQueue[size]; ! this.registrationLock = new ReentrantLock(); } /** * Returns the common pool instance. This pool is statically * constructed; its run state is unaffected by attempts to {@link
*** 2424,2435 **** * @throws NullPointerException if the task is null * @throws RejectedExecutionException if the task cannot be * scheduled for execution */ public <T> T invoke(ForkJoinTask<T> task) { - if (task == null) - throw new NullPointerException(); externalSubmit(task); return task.join(); } /** --- 2590,2599 ----
*** 2449,2467 **** /** * @throws NullPointerException if the task is null * @throws RejectedExecutionException if the task cannot be * scheduled for execution */ public void execute(Runnable task) { ! if (task == null) ! throw new NullPointerException(); ! ForkJoinTask<?> job; ! if (task instanceof ForkJoinTask<?>) // avoid re-wrap ! job = (ForkJoinTask<?>) task; ! else ! job = new ForkJoinTask.RunnableExecuteAction(task); ! externalSubmit(job); } /** * Submits a ForkJoinTask for execution. * --- 2613,2628 ---- /** * @throws NullPointerException if the task is null * @throws RejectedExecutionException if the task cannot be * scheduled for execution */ + @Override + @SuppressWarnings("unchecked") public void execute(Runnable task) { ! externalSubmit((task instanceof ForkJoinTask<?>) ! ? (ForkJoinTask<Void>) task // avoid re-wrap ! : new ForkJoinTask.RunnableExecuteAction(task)); } /** * Submits a ForkJoinTask for execution. *
*** 2479,2541 **** /** * @throws NullPointerException if the task is null * @throws RejectedExecutionException if the task cannot be * scheduled for execution */ public <T> ForkJoinTask<T> submit(Callable<T> task) { return externalSubmit(new ForkJoinTask.AdaptedCallable<T>(task)); } /** * @throws NullPointerException if the task is null * @throws RejectedExecutionException if the task cannot be * scheduled for execution */ public <T> ForkJoinTask<T> submit(Runnable task, T result) { return externalSubmit(new ForkJoinTask.AdaptedRunnable<T>(task, result)); } /** * @throws NullPointerException if the task is null * @throws RejectedExecutionException if the task cannot be * scheduled for execution */ @SuppressWarnings("unchecked") public ForkJoinTask<?> submit(Runnable task) { - if (task == null) - throw new NullPointerException(); return externalSubmit((task instanceof ForkJoinTask<?>) ? (ForkJoinTask<Void>) task // avoid re-wrap : new ForkJoinTask.AdaptedRunnableAction(task)); } /** * @throws NullPointerException {@inheritDoc} * @throws RejectedExecutionException {@inheritDoc} */ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) { - // In previous versions of this class, this method constructed - // a task to run ForkJoinTask.invokeAll, but now external - // invocation of multiple tasks is at least as efficient. ArrayList<Future<T>> futures = new ArrayList<>(tasks.size()); - try { for (Callable<T> t : tasks) { ! ForkJoinTask<T> f = new ForkJoinTask.AdaptedCallable<T>(t); futures.add(f); externalSubmit(f); } ! for (int i = 0, size = futures.size(); i < size; i++) ((ForkJoinTask<?>)futures.get(i)).quietlyJoin(); return futures; } catch (Throwable t) { ! for (int i = 0, size = futures.size(); i < size; i++) ! futures.get(i).cancel(false); throw t; } } /** * Returns the factory used for constructing new workers. * * @return the factory used for constructing new workers */ --- 2640,2858 ---- /** * @throws NullPointerException if the task is null * @throws RejectedExecutionException if the task cannot be * scheduled for execution */ + @Override public <T> ForkJoinTask<T> submit(Callable<T> task) { return externalSubmit(new ForkJoinTask.AdaptedCallable<T>(task)); } /** * @throws NullPointerException if the task is null * @throws RejectedExecutionException if the task cannot be * scheduled for execution */ + @Override public <T> ForkJoinTask<T> submit(Runnable task, T result) { return externalSubmit(new ForkJoinTask.AdaptedRunnable<T>(task, result)); } /** * @throws NullPointerException if the task is null * @throws RejectedExecutionException if the task cannot be * scheduled for execution */ + @Override @SuppressWarnings("unchecked") public ForkJoinTask<?> submit(Runnable task) { return externalSubmit((task instanceof ForkJoinTask<?>) ? (ForkJoinTask<Void>) task // avoid re-wrap : new ForkJoinTask.AdaptedRunnableAction(task)); } /** * @throws NullPointerException {@inheritDoc} * @throws RejectedExecutionException {@inheritDoc} */ + @Override public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) { ArrayList<Future<T>> futures = new ArrayList<>(tasks.size()); try { for (Callable<T> t : tasks) { ! ForkJoinTask<T> f = ! new ForkJoinTask.AdaptedInterruptibleCallable<T>(t); futures.add(f); externalSubmit(f); } ! for (int i = futures.size() - 1; i >= 0; --i) ((ForkJoinTask<?>)futures.get(i)).quietlyJoin(); return futures; } catch (Throwable t) { ! for (Future<T> e : futures) ! ForkJoinTask.cancelIgnoringExceptions(e); throw t; } } + @Override + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, + long timeout, TimeUnit unit) + throws InterruptedException { + long nanos = unit.toNanos(timeout); + ArrayList<Future<T>> futures = new ArrayList<>(tasks.size()); + try { + for (Callable<T> t : tasks) { + ForkJoinTask<T> f = + new ForkJoinTask.AdaptedInterruptibleCallable<T>(t); + futures.add(f); + externalSubmit(f); + } + long startTime = System.nanoTime(), ns = nanos; + boolean timedOut = (ns < 0L); + for (int i = futures.size() - 1; i >= 0; --i) { + Future<T> f = futures.get(i); + if (!f.isDone()) { + if (timedOut) + ForkJoinTask.cancelIgnoringExceptions(f); + else { + try { + f.get(ns, TimeUnit.NANOSECONDS); + } catch (CancellationException | TimeoutException | + ExecutionException ok) { + } + if ((ns = nanos - (System.nanoTime() - startTime)) < 0L) + timedOut = true; + } + } + } + return futures; + } catch (Throwable t) { + for (Future<T> e : futures) + ForkJoinTask.cancelIgnoringExceptions(e); + throw t; + } + } + + // Task to hold results from InvokeAnyTasks + static final class InvokeAnyRoot<E> extends ForkJoinTask<E> { + private static final long serialVersionUID = 2838392045355241008L; + @SuppressWarnings("serial") // Conditionally serializable + volatile E result; + final AtomicInteger count; // in case all throw + final ForkJoinPool pool; // to check shutdown while collecting + InvokeAnyRoot(int n, ForkJoinPool p) { + pool = p; + count = new AtomicInteger(n); + } + final void tryComplete(Callable<E> c) { // called by InvokeAnyTasks + Throwable ex = null; + boolean failed = (c == null || isCancelled() || + (pool != null && pool.mode < 0)); + if (!failed && !isDone()) { + try { + complete(c.call()); + } catch (Throwable tx) { + ex = tx; + failed = true; + } + } + if ((pool != null && pool.mode < 0) || + (failed && count.getAndDecrement() <= 1)) + trySetThrown(ex != null ? ex : new CancellationException()); + } + public final boolean exec() { return false; } // never forked + public final E getRawResult() { return result; } + public final void setRawResult(E v) { result = v; } + } + + // Variant of AdaptedInterruptibleCallable with results in InvokeAnyRoot + static final class InvokeAnyTask<E> extends ForkJoinTask<E> { + private static final long serialVersionUID = 2838392045355241008L; + final InvokeAnyRoot<E> root; + @SuppressWarnings("serial") // Conditionally serializable + final Callable<E> callable; + transient volatile Thread runner; + InvokeAnyTask(InvokeAnyRoot<E> root, Callable<E> callable) { + this.root = root; + this.callable = callable; + } + public final boolean exec() { + Thread.interrupted(); + runner = Thread.currentThread(); + root.tryComplete(callable); + runner = null; + Thread.interrupted(); + return true; + } + public final boolean cancel(boolean mayInterruptIfRunning) { + Thread t; + boolean stat = super.cancel(false); + if (mayInterruptIfRunning && (t = runner) != null) { + try { + t.interrupt(); + } catch (Throwable ignore) { + } + } + return stat; + } + public final void setRawResult(E v) {} // unused + public final E getRawResult() { return null; } + } + + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks) + throws InterruptedException, ExecutionException { + int n = tasks.size(); + if (n <= 0) + throw new IllegalArgumentException(); + InvokeAnyRoot<T> root = new InvokeAnyRoot<T>(n, this); + ArrayList<InvokeAnyTask<T>> fs = new ArrayList<>(n); + try { + for (Callable<T> c : tasks) { + if (c == null) + throw new NullPointerException(); + InvokeAnyTask<T> f = new InvokeAnyTask<T>(root, c); + fs.add(f); + externalSubmit(f); + if (root.isDone()) + break; + } + return root.get(); + } finally { + for (InvokeAnyTask<T> f : fs) + ForkJoinTask.cancelIgnoringExceptions(f); + } + } + + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks, + long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + long nanos = unit.toNanos(timeout); + int n = tasks.size(); + if (n <= 0) + throw new IllegalArgumentException(); + InvokeAnyRoot<T> root = new InvokeAnyRoot<T>(n, this); + ArrayList<InvokeAnyTask<T>> fs = new ArrayList<>(n); + try { + for (Callable<T> c : tasks) { + if (c == null) + throw new NullPointerException(); + InvokeAnyTask<T> f = new InvokeAnyTask<T>(root, c); + fs.add(f); + externalSubmit(f); + if (root.isDone()) + break; + } + return root.get(nanos, TimeUnit.NANOSECONDS); + } finally { + for (InvokeAnyTask<T> f : fs) + ForkJoinTask.cancelIgnoringExceptions(f); + } + } + /** * Returns the factory used for constructing new workers. * * @return the factory used for constructing new workers */
*** 2602,2617 **** * number of running threads. * * @return the number of worker threads */ public int getRunningThreadCount() { - WorkQueue[] ws; WorkQueue w; VarHandle.acquireFence(); int rc = 0; ! if ((ws = workQueues) != null) { ! for (int i = 1; i < ws.length; i += 2) { ! if ((w = ws[i]) != null && w.isApparentlyUnblocked()) ++rc; } } return rc; } --- 2919,2934 ---- * number of running threads. * * @return the number of worker threads */ public int getRunningThreadCount() { VarHandle.acquireFence(); + WorkQueue[] qs; WorkQueue q; int rc = 0; ! if ((qs = queues) != null) { ! for (int i = 1; i < qs.length; i += 2) { ! if ((q = qs[i]) != null && q.isApparentlyUnblocked()) ++rc; } } return rc; }
*** 2638,2671 **** * threads remain inactive. * * @return {@code true} if all threads are currently idle */ public boolean isQuiescent() { ! for (;;) { ! long c = ctl; ! int md = mode, pc = md & SMASK; ! int tc = pc + (short)(c >>> TC_SHIFT); ! int rc = pc + (int)(c >> RC_SHIFT); ! if ((md & (STOP | TERMINATED)) != 0) ! return true; ! else if (rc > 0) ! return false; ! else { ! WorkQueue[] ws; WorkQueue v; ! if ((ws = workQueues) != null) { ! for (int i = 1; i < ws.length; i += 2) { ! if ((v = ws[i]) != null) { ! if (v.source > 0) ! return false; ! --tc; ! } ! } ! } ! if (tc == 0 && ctl == c) ! return true; ! } ! } } /** * Returns an estimate of the total number of completed tasks that * were executed by a thread other than their submitter. The --- 2955,2965 ---- * threads remain inactive. * * @return {@code true} if all threads are currently idle */ public boolean isQuiescent() { ! return canStop(); } /** * Returns an estimate of the total number of completed tasks that * were executed by a thread other than their submitter. The
*** 2677,2691 **** * * @return the number of steals */ public long getStealCount() { long count = stealCount; ! WorkQueue[] ws; WorkQueue w; ! if ((ws = workQueues) != null) { ! for (int i = 1; i < ws.length; i += 2) { ! if ((w = ws[i]) != null) ! count += (long)w.nsteals & 0xffffffffL; } } return count; } --- 2971,2985 ---- * * @return the number of steals */ public long getStealCount() { long count = stealCount; ! WorkQueue[] qs; WorkQueue q; ! if ((qs = queues) != null) { ! for (int i = 1; i < qs.length; i += 2) { ! if ((q = qs[i]) != null) ! count += (long)q.nsteals & 0xffffffffL; } } return count; }
*** 2698,2714 **** * granularities. * * @return the number of queued tasks */ public long getQueuedTaskCount() { - WorkQueue[] ws; WorkQueue w; VarHandle.acquireFence(); int count = 0; ! if ((ws = workQueues) != null) { ! for (int i = 1; i < ws.length; i += 2) { ! if ((w = ws[i]) != null) ! count += w.queueSize(); } } return count; } --- 2992,3008 ---- * granularities. * * @return the number of queued tasks */ public long getQueuedTaskCount() { VarHandle.acquireFence(); + WorkQueue[] qs; WorkQueue q; int count = 0; ! if ((qs = queues) != null) { ! for (int i = 1; i < qs.length; i += 2) { ! if ((q = qs[i]) != null) ! count += q.queueSize(); } } return count; }
*** 2718,2734 **** * time proportional to the number of submissions. * * @return the number of queued submissions */ public int getQueuedSubmissionCount() { - WorkQueue[] ws; WorkQueue w; VarHandle.acquireFence(); int count = 0; ! if ((ws = workQueues) != null) { ! for (int i = 0; i < ws.length; i += 2) { ! if ((w = ws[i]) != null) ! count += w.queueSize(); } } return count; } --- 3012,3028 ---- * time proportional to the number of submissions. * * @return the number of queued submissions */ public int getQueuedSubmissionCount() { VarHandle.acquireFence(); + WorkQueue[] qs; WorkQueue q; int count = 0; ! if ((qs = queues) != null) { ! for (int i = 0; i < qs.length; i += 2) { ! if ((q = qs[i]) != null) ! count += q.queueSize(); } } return count; }
*** 2737,2751 **** * pool that have not yet begun executing. * * @return {@code true} if there are any queued submissions */ public boolean hasQueuedSubmissions() { - WorkQueue[] ws; WorkQueue w; VarHandle.acquireFence(); ! if ((ws = workQueues) != null) { ! for (int i = 0; i < ws.length; i += 2) { ! if ((w = ws[i]) != null && !w.isEmpty()) return true; } } return false; } --- 3031,3045 ---- * pool that have not yet begun executing. * * @return {@code true} if there are any queued submissions */ public boolean hasQueuedSubmissions() { VarHandle.acquireFence(); ! WorkQueue[] qs; WorkQueue q; ! if ((qs = queues) != null) { ! for (int i = 0; i < qs.length; i += 2) { ! if ((q = qs[i]) != null && !q.isEmpty()) return true; } } return false; }
*** 2777,2799 **** * * @param c the collection to transfer elements into * @return the number of elements transferred */ protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) { - WorkQueue[] ws; WorkQueue w; ForkJoinTask<?> t; - VarHandle.acquireFence(); int count = 0; ! if ((ws = workQueues) != null) { ! for (int i = 0; i < ws.length; ++i) { ! if ((w = ws[i]) != null) { ! while ((t = w.poll()) != null) { c.add(t); ++count; } - } - } - } return count; } /** * Returns a string identifying this pool, as well as its state, --- 3071,3085 ---- * * @param c the collection to transfer elements into * @return the number of elements transferred */ protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) { int count = 0; ! for (ForkJoinTask<?> t; (t = pollScan(false)) != null; ) { c.add(t); ++count; } return count; } /** * Returns a string identifying this pool, as well as its state,
*** 2801,2826 **** * worker and task counts. * * @return a string identifying this pool, as well as its state */ public String toString() { ! // Use a single pass through workQueues to collect counts int md = mode; // read volatile fields first long c = ctl; long st = stealCount; ! long qt = 0L, qs = 0L; int rc = 0; ! WorkQueue[] ws; WorkQueue w; ! if ((ws = workQueues) != null) { ! for (int i = 0; i < ws.length; ++i) { ! if ((w = ws[i]) != null) { ! int size = w.queueSize(); if ((i & 1) == 0) ! qs += size; else { qt += size; ! st += (long)w.nsteals & 0xffffffffL; ! if (w.isApparentlyUnblocked()) ++rc; } } } } --- 3087,3112 ---- * worker and task counts. * * @return a string identifying this pool, as well as its state */ public String toString() { ! // Use a single pass through queues to collect counts int md = mode; // read volatile fields first long c = ctl; long st = stealCount; ! long qt = 0L, ss = 0L; int rc = 0; ! WorkQueue[] qs; WorkQueue q; ! if ((qs = queues) != null) { ! for (int i = 0; i < qs.length; ++i) { ! if ((q = qs[i]) != null) { ! int size = q.queueSize(); if ((i & 1) == 0) ! ss += size; else { qt += size; ! st += (long)q.nsteals & 0xffffffffL; ! if (q.isApparentlyUnblocked()) ++rc; } } } }
*** 2840,2850 **** ", size = " + tc + ", active = " + ac + ", running = " + rc + ", steals = " + st + ", tasks = " + qt + ! ", submissions = " + qs + "]"; } /** * Possibly initiates an orderly shutdown in which previously --- 3126,3136 ---- ", size = " + tc + ", active = " + ac + ", running = " + rc + ", steals = " + st + ", tasks = " + qt + ! ", submissions = " + ss + "]"; } /** * Possibly initiates an orderly shutdown in which previously
*** 2860,2869 **** --- 3146,3156 ---- * because it does not hold {@link * java.lang.RuntimePermission}{@code ("modifyThread")} */ public void shutdown() { checkPermission(); + if (this != common) tryTerminate(false, true); } /** * Possibly attempts to cancel and/or stop all tasks, and reject
*** 2883,2892 **** --- 3170,3180 ---- * because it does not hold {@link * java.lang.RuntimePermission}{@code ("modifyThread")} */ public List<Runnable> shutdownNow() { checkPermission(); + if (this != common) tryTerminate(true, true); return Collections.emptyList(); } /**
*** 2910,2921 **** * they do, they must abort them on interrupt.) * * @return {@code true} if terminating but not yet terminated */ public boolean isTerminating() { ! int md = mode; ! return (md & STOP) != 0 && (md & TERMINATED) == 0; } /** * Returns {@code true} if this pool has been shut down. * --- 3198,3208 ---- * they do, they must abort them on interrupt.) * * @return {@code true} if terminating but not yet terminated */ public boolean isTerminating() { ! return (mode & (STOP | TERMINATED)) == STOP; } /** * Returns {@code true} if this pool has been shut down. *
*** 2939,2971 **** * {@code false} if the timeout elapsed before termination * @throws InterruptedException if interrupted while waiting */ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { ! if (Thread.interrupted()) ! throw new InterruptedException(); if (this == common) { ! awaitQuiescence(timeout, unit); ! return false; } ! long nanos = unit.toNanos(timeout); ! if (isTerminated()) ! return true; ! if (nanos <= 0L) ! return false; ! long deadline = System.nanoTime() + nanos; ! synchronized (this) { ! for (;;) { ! if (isTerminated()) ! return true; ! if (nanos <= 0L) ! return false; ! long millis = TimeUnit.NANOSECONDS.toMillis(nanos); ! wait(millis > 0L ? millis : 1L); ! nanos = deadline - System.nanoTime(); } } } /** * If called by a ForkJoinTask operating in this pool, equivalent * in effect to {@link ForkJoinTask#helpQuiesce}. Otherwise, --- 3226,3261 ---- * {@code false} if the timeout elapsed before termination * @throws InterruptedException if interrupted while waiting */ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { ! ReentrantLock lock; Condition cond; ! long nanos = unit.toNanos(timeout); ! boolean terminated = false; if (this == common) { ! Thread t; ForkJoinWorkerThread wt; int q; ! if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread && ! (wt = (ForkJoinWorkerThread)t).pool == this) ! q = helpQuiescePool(wt.workQueue, nanos, true); ! else ! q = externalHelpQuiescePool(nanos, true); ! if (q < 0) ! throw new InterruptedException(); } ! else if (!(terminated = ((mode & TERMINATED) != 0)) && ! (lock = registrationLock) != null) { ! lock.lock(); ! try { ! if ((cond = termination) == null) ! termination = cond = lock.newCondition(); ! while (!(terminated = ((mode & TERMINATED) != 0)) && nanos > 0L) ! nanos = cond.awaitNanos(nanos); ! } finally { ! lock.unlock(); } } + return terminated; } /** * If called by a ForkJoinTask operating in this pool, equivalent * in effect to {@link ForkJoinTask#helpQuiesce}. Otherwise,
*** 2976,3014 **** * @param unit the time unit of the timeout argument * @return {@code true} if quiescent; {@code false} if the * timeout elapsed. */ public boolean awaitQuiescence(long timeout, TimeUnit unit) { long nanos = unit.toNanos(timeout); ! ForkJoinWorkerThread wt; ! Thread thread = Thread.currentThread(); ! if ((thread instanceof ForkJoinWorkerThread) && ! (wt = (ForkJoinWorkerThread)thread).pool == this) { ! helpQuiescePool(wt.workQueue); ! return true; ! } ! else { ! for (long startTime = System.nanoTime();;) { ! ForkJoinTask<?> t; ! if ((t = pollScan(false)) != null) ! t.doExec(); ! else if (isQuiescent()) ! return true; ! else if ((System.nanoTime() - startTime) > nanos) ! return false; else ! Thread.yield(); // cannot block ! } ! } ! } ! ! /** ! * Waits and/or attempts to assist performing tasks indefinitely ! * until the {@link #commonPool()} {@link #isQuiescent}. ! */ ! static void quiesceCommonPool() { ! common.awaitQuiescence(Long.MAX_VALUE, TimeUnit.NANOSECONDS); } /** * Interface for extending managed parallelism for tasks running * in {@link ForkJoinPool}s. --- 3266,3283 ---- * @param unit the time unit of the timeout argument * @return {@code true} if quiescent; {@code false} if the * timeout elapsed. */ public boolean awaitQuiescence(long timeout, TimeUnit unit) { + Thread t; ForkJoinWorkerThread wt; int q; long nanos = unit.toNanos(timeout); ! if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread && ! (wt = (ForkJoinWorkerThread)t).pool == this) ! q = helpQuiescePool(wt.workQueue, nanos, false); else ! q = externalHelpQuiescePool(nanos, false); ! return (q > 0); } /** * Interface for extending managed parallelism for tasks running * in {@link ForkJoinPool}s.
*** 3016,3033 **** * <p>A {@code ManagedBlocker} provides two methods. Method * {@link #isReleasable} must return {@code true} if blocking is * not necessary. Method {@link #block} blocks the current thread * if necessary (perhaps internally invoking {@code isReleasable} * before actually blocking). These actions are performed by any ! * thread invoking {@link ForkJoinPool#managedBlock(ManagedBlocker)}. ! * The unusual methods in this API accommodate synchronizers that ! * may, but don't usually, block for long periods. Similarly, they ! * allow more efficient internal handling of cases in which ! * additional workers may be, but usually are not, needed to ! * ensure sufficient parallelism. Toward this end, ! * implementations of method {@code isReleasable} must be amenable ! * to repeated invocation. * * <p>For example, here is a ManagedBlocker based on a * ReentrantLock: * <pre> {@code * class ManagedLocker implements ManagedBlocker { --- 3285,3304 ---- * <p>A {@code ManagedBlocker} provides two methods. Method * {@link #isReleasable} must return {@code true} if blocking is * not necessary. Method {@link #block} blocks the current thread * if necessary (perhaps internally invoking {@code isReleasable} * before actually blocking). These actions are performed by any ! * thread invoking {@link ! * ForkJoinPool#managedBlock(ManagedBlocker)}. The unusual ! * methods in this API accommodate synchronizers that may, but ! * don't usually, block for long periods. Similarly, they allow ! * more efficient internal handling of cases in which additional ! * workers may be, but usually are not, needed to ensure ! * sufficient parallelism. Toward this end, implementations of ! * method {@code isReleasable} must be amenable to repeated ! * invocation. Neither method is invoked after a prior invocation ! * of {@code isReleasable} or {@code block} returns {@code true}. * * <p>For example, here is a ManagedBlocker based on a * ReentrantLock: * <pre> {@code * class ManagedLocker implements ManagedBlocker {
*** 3109,3191 **** * @param blocker the blocker task * @throws InterruptedException if {@code blocker.block()} did so */ public static void managedBlock(ManagedBlocker blocker) throws InterruptedException { if (blocker == null) throw new NullPointerException(); ! ForkJoinPool p; ! ForkJoinWorkerThread wt; ! WorkQueue w; ! Thread t = Thread.currentThread(); ! if ((t instanceof ForkJoinWorkerThread) && ! (p = (wt = (ForkJoinWorkerThread)t).pool) != null && ! (w = wt.workQueue) != null) { ! int block; ! while (!blocker.isReleasable()) { ! if ((block = p.tryCompensate(w)) != 0) { try { ! do {} while (!blocker.isReleasable() && ! !blocker.block()); } finally { ! CTL.getAndAdd(p, (block > 0) ? RC_UNIT : 0L); } break; } } } - else { - do {} while (!blocker.isReleasable() && - !blocker.block()); - } - } ! /** ! * If the given executor is a ForkJoinPool, poll and execute ! * AsynchronousCompletionTasks from worker's queue until none are ! * available or blocker is released. ! */ ! static void helpAsyncBlocker(Executor e, ManagedBlocker blocker) { ! if (e instanceof ForkJoinPool) { ! WorkQueue w; ForkJoinWorkerThread wt; WorkQueue[] ws; int r, n; ! ForkJoinPool p = (ForkJoinPool)e; ! Thread thread = Thread.currentThread(); ! if (thread instanceof ForkJoinWorkerThread && ! (wt = (ForkJoinWorkerThread)thread).pool == p) ! w = wt.workQueue; ! else if ((r = ThreadLocalRandom.getProbe()) != 0 && ! (ws = p.workQueues) != null && (n = ws.length) > 0) ! w = ws[(n - 1) & r & SQMASK]; ! else ! w = null; ! if (w != null) ! w.helpAsyncBlocker(blocker); ! } } ! // AbstractExecutorService overrides. These rely on undocumented ! // fact that ForkJoinTask.adapt returns ForkJoinTasks that also ! // implement RunnableFuture. protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new ForkJoinTask.AdaptedRunnable<T>(runnable, value); } protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new ForkJoinTask.AdaptedCallable<T>(callable); } - // VarHandle mechanics - private static final VarHandle CTL; - private static final VarHandle MODE; - static final VarHandle QA; - static { try { MethodHandles.Lookup l = MethodHandles.lookup(); CTL = l.findVarHandle(ForkJoinPool.class, "ctl", long.class); MODE = l.findVarHandle(ForkJoinPool.class, "mode", int.class); ! QA = MethodHandles.arrayElementVarHandle(ForkJoinTask[].class); } catch (ReflectiveOperationException e) { throw new ExceptionInInitializerError(e); } // Reduce the risk of rare disastrous classloading in first call to --- 3380,3447 ---- * @param blocker the blocker task * @throws InterruptedException if {@code blocker.block()} did so */ public static void managedBlock(ManagedBlocker blocker) throws InterruptedException { + Thread t; ForkJoinPool p; + if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread && + (p = ((ForkJoinWorkerThread)t).pool) != null) + p.compensatedBlock(blocker); + else + unmanagedBlock(blocker); + } + + /** ManagedBlock for ForkJoinWorkerThreads */ + private void compensatedBlock(ManagedBlocker blocker) + throws InterruptedException { if (blocker == null) throw new NullPointerException(); ! for (;;) { ! int comp; boolean done; ! long c = ctl; ! if (blocker.isReleasable()) ! break; ! if ((comp = tryCompensate(c)) >= 0) { ! long post = (comp == 0) ? 0L : RC_UNIT; try { ! done = blocker.block(); } finally { ! getAndAddCtl(post); } + if (done) break; } } } ! /** ManagedBlock for external threads */ ! private static void unmanagedBlock(ManagedBlocker blocker) ! throws InterruptedException { ! if (blocker == null) throw new NullPointerException(); ! do {} while (!blocker.isReleasable() && !blocker.block()); } ! // AbstractExecutorService.newTaskFor overrides rely on ! // undocumented fact that ForkJoinTask.adapt returns ForkJoinTasks ! // that also implement RunnableFuture. + @Override protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new ForkJoinTask.AdaptedRunnable<T>(runnable, value); } + @Override protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new ForkJoinTask.AdaptedCallable<T>(callable); } static { try { MethodHandles.Lookup l = MethodHandles.lookup(); CTL = l.findVarHandle(ForkJoinPool.class, "ctl", long.class); MODE = l.findVarHandle(ForkJoinPool.class, "mode", int.class); ! THREADIDS = l.findVarHandle(ForkJoinPool.class, "threadIds", int.class); ! POOLIDS = l.findStaticVarHandle(ForkJoinPool.class, "poolIds", int.class); } catch (ReflectiveOperationException e) { throw new ExceptionInInitializerError(e); } // Reduce the risk of rare disastrous classloading in first call to
*** 3202,3241 **** COMMON_MAX_SPARES = commonMaxSpares; defaultForkJoinWorkerThreadFactory = new DefaultForkJoinWorkerThreadFactory(); modifyThreadPermission = new RuntimePermission("modifyThread"); - common = AccessController.doPrivileged(new PrivilegedAction<>() { public ForkJoinPool run() { return new ForkJoinPool((byte)0); }}); COMMON_PARALLELISM = Math.max(common.mode & SMASK, 1); } - - /** - * Factory for innocuous worker threads. - */ - private static final class InnocuousForkJoinWorkerThreadFactory - implements ForkJoinWorkerThreadFactory { - - /** - * An ACC to restrict permissions for the factory itself. - * The constructed workers have no permissions set. - */ - private static final AccessControlContext ACC = contextWithPermissions( - modifyThreadPermission, - new RuntimePermission("enableContextClassLoaderOverride"), - new RuntimePermission("modifyThreadGroup"), - new RuntimePermission("getClassLoader"), - new RuntimePermission("setContextClassLoader")); - - public final ForkJoinWorkerThread newThread(ForkJoinPool pool) { - return AccessController.doPrivileged( - new PrivilegedAction<>() { - public ForkJoinWorkerThread run() { - return new ForkJoinWorkerThread. - InnocuousForkJoinWorkerThread(pool); }}, - ACC); - } - } } --- 3458,3469 ----
< prev index next >