167 * maximum number of running threads to 32767. Attempts to create
168 * pools with greater than the maximum number result in
169 * {@code IllegalArgumentException}.
170 *
171 * <p>This implementation rejects submitted tasks (that is, by throwing
172 * {@link RejectedExecutionException}) only when the pool is shut down
173 * or internal resources have been exhausted.
174 *
175 * @since 1.7
176 * @author Doug Lea
177 */
178 public class ForkJoinPool extends AbstractExecutorService {
179
180 /*
181 * Implementation Overview
182 *
183 * This class and its nested classes provide the main
184 * functionality and control for a set of worker threads:
185 * Submissions from non-FJ threads enter into submission queues.
186 * Workers take these tasks and typically split them into subtasks
187 * that may be stolen by other workers. Preference rules give
188 * first priority to processing tasks from their own queues (LIFO
189 * or FIFO, depending on mode), then to randomized FIFO steals of
190 * tasks in other queues. This framework began as vehicle for
191 * supporting tree-structured parallelism using work-stealing.
192 * Over time, its scalability advantages led to extensions and
193 * changes to better support more diverse usage contexts. Because
194 * most internal methods and nested classes are interrelated,
195 * their main rationale and descriptions are presented here;
196 * individual methods and nested classes contain only brief
197 * comments about details.
198 *
199 * WorkQueues
200 * ==========
201 *
202 * Most operations occur within work-stealing queues (in nested
203 * class WorkQueue). These are special forms of Deques that
204 * support only three of the four possible end-operations -- push,
205 * pop, and poll (aka steal), under the further constraints that
206 * push and pop are called only from the owning thread (or, as
207 * extended here, under a lock), while poll may be called from
208 * other threads. (If you are unfamiliar with them, you probably
209 * want to read Herlihy and Shavit's book "The Art of
210 * Multiprocessor programming", chapter 16 describing these in
211 * more detail before proceeding.) The main work-stealing queue
212 * design is roughly similar to those in the papers "Dynamic
213 * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
214 * (http://research.sun.com/scalable/pubs/index.html) and
215 * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
216 * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
217 * The main differences ultimately stem from GC requirements that
218 * we null out taken slots as soon as we can, to maintain as small
219 * a footprint as possible even in programs generating huge
220 * numbers of tasks. To accomplish this, we shift the CAS
221 * arbitrating pop vs poll (steal) from being on the indices
222 * ("base" and "top") to the slots themselves.
223 *
224 * Adding tasks then takes the form of a classic array push(task)
225 * in a circular buffer:
226 * q.array[q.top++ % length] = task;
227 *
228 * (The actual code needs to null-check and size-check the array,
229 * uses masking, not mod, for indexing a power-of-two-sized array,
230 * properly fences accesses, and possibly signals waiting workers
231 * to start scanning -- see below.) Both a successful pop and
232 * poll mainly entail a CAS of a slot from non-null to null.
233 *
234 * The pop operation (always performed by owner) is:
235 * if ((the task at top slot is not null) and
236 * (CAS slot to null))
237 * decrement top and return task;
238 *
239 * And the poll operation (usually by a stealer) is
240 * if ((the task at base slot is not null) and
241 * (CAS slot to null))
242 * increment base and return task;
243 *
244 * There are several variants of each of these. In particular,
245 * almost all uses of poll occur within scan operations that also
246 * interleave contention tracking (with associated code sprawl.)
247 *
248 * Memory ordering. See "Correct and Efficient Work-Stealing for
249 * Weak Memory Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013
250 * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an
251 * analysis of memory ordering requirements in work-stealing
252 * algorithms similar to (but different than) the one used here.
253 * Extracting tasks in array slots via (fully fenced) CAS provides
254 * primary synchronization. The base and top indices imprecisely
255 * guide where to extract from. We do not always require strict
256 * orderings of array and index updates, so sometimes let them be
257 * subject to compiler and processor reorderings. However, the
258 * volatile "base" index also serves as a basis for memory
259 * ordering: Slot accesses are preceded by a read of base,
260 * ensuring happens-before ordering with respect to stealers (so
261 * the slots themselves can be read via plain array reads.) The
262 * only other memory orderings relied on are maintained in the
263 * course of signalling and activation (see below). A check that
264 * base == top indicates (momentary) emptiness, but otherwise may
265 * err on the side of possibly making the queue appear nonempty
266 * when a push, pop, or poll have not fully committed, or making
267 * it appear empty when an update of top has not yet been visibly
268 * written. (Method isEmpty() checks the case of a partially
269 * completed removal of the last element.) Because of this, the
270 * poll operation, considered individually, is not wait-free. One
271 * thief cannot successfully continue until another in-progress
272 * one (or, if previously empty, a push) visibly completes.
273 * However, in the aggregate, we ensure at least probabilistic
274 * non-blockingness. If an attempted steal fails, a scanning
275 * thief chooses a different random victim target to try next. So,
276 * in order for one thief to progress, it suffices for any
277 * in-progress poll or new push on any empty queue to
278 * complete.
279 *
280 * This approach also enables support of a user mode in which
281 * local task processing is in FIFO, not LIFO order, simply by
282 * using poll rather than pop. This can be useful in
283 * message-passing frameworks in which tasks are never joined.
284 *
285 * WorkQueues are also used in a similar way for tasks submitted
286 * to the pool. We cannot mix these tasks in the same queues used
287 * by workers. Instead, we randomly associate submission queues
288 * with submitting threads, using a form of hashing. The
289 * ThreadLocalRandom probe value serves as a hash code for
290 * choosing existing queues, and may be randomly repositioned upon
291 * contention with other submitters. In essence, submitters act
292 * like workers except that they are restricted to executing local
293 * tasks that they submitted. Insertion of tasks in shared mode
294 * requires a lock but we use only a simple spinlock (using field
295 * phase), because submitters encountering a busy queue move to a
296 * different position to use or create other queues -- they block
297 * only when creating and registering new queues. Because it is
298 * used only as a spinlock, unlocking requires only a "releasing"
299 * store (using setRelease).
300 *
301 * Management
302 * ==========
303 *
304 * The main throughput advantages of work-stealing stem from
305 * decentralized control -- workers mostly take tasks from
306 * themselves or each other, at rates that can exceed a billion
307 * per second. The pool itself creates, activates (enables
308 * scanning for and running tasks), deactivates, blocks, and
309 * terminates threads, all with minimal central information.
310 * There are only a few properties that we can globally track or
311 * maintain, so we pack them into a small number of variables,
312 * often maintaining atomicity without blocking or locking.
313 * Nearly all essentially atomic control state is held in a few
314 * volatile variables that are by far most often read (not
315 * written) as status and consistency checks. We pack as much
316 * information into them as we can.
317 *
318 * Field "ctl" contains 64 bits holding information needed to
319 * atomically decide to add, enqueue (on an event queue), and
320 * dequeue (and release)-activate workers. To enable this
321 * packing, we restrict maximum parallelism to (1<<15)-1 (which is
322 * far in excess of normal operating range) to allow ids, counts,
323 * and their negations (used for thresholding) to fit into 16bit
324 * subfields.
325 *
326 * Field "mode" holds configuration parameters as well as lifetime
327 * status, atomically and monotonically setting SHUTDOWN, STOP,
328 * and finally TERMINATED bits.
329 *
330 * Field "workQueues" holds references to WorkQueues. It is
331 * updated (only during worker creation and termination) under
332 * lock (using field workerNamePrefix as lock), but is otherwise
333 * concurrently readable, and accessed directly. We also ensure
334 * that uses of the array reference itself never become too stale
335 * in case of resizing. To simplify index-based operations, the
336 * array size is always a power of two, and all readers must
337 * tolerate null slots. Worker queues are at odd indices. Shared
338 * (submission) queues are at even indices, up to a maximum of 64
339 * slots, to limit growth even if array needs to expand to add
340 * more workers. Grouping them together in this way simplifies and
341 * speeds up task scanning.
342 *
343 * All worker thread creation is on-demand, triggered by task
344 * submissions, replacement of terminated workers, and/or
345 * compensation for blocked workers. However, all other support
346 * code is set up to work with other policies. To ensure that we
347 * do not hold on to worker references that would prevent GC, all
348 * accesses to workQueues are via indices into the workQueues
349 * array (which is one source of some of the messy code
350 * constructions here). In essence, the workQueues array serves as
351 * a weak reference mechanism. Thus for example the stack top
352 * subfield of ctl stores indices, not references.
353 *
354 * Queuing Idle Workers. Unlike HPC work-stealing frameworks, we
355 * cannot let workers spin indefinitely scanning for tasks when
356 * none can be found immediately, and we cannot start/resume
357 * workers unless there appear to be tasks available. On the
358 * other hand, we must quickly prod them into action when new
359 * tasks are submitted or generated. In many usages, ramp-up time
360 * is the main limiting factor in overall performance, which is
361 * compounded at program start-up by JIT compilation and
399 * exception is propagated, generally to some external caller.
400 * Worker index assignment avoids the bias in scanning that would
401 * occur if entries were sequentially packed starting at the front
402 * of the workQueues array. We treat the array as a simple
403 * power-of-two hash table, expanding as needed. The seedIndex
404 * increment ensures no collisions until a resize is needed or a
405 * worker is deregistered and replaced, and thereafter keeps
406 * probability of collision low. We cannot use
407 * ThreadLocalRandom.getProbe() for similar purposes here because
408 * the thread has not started yet, but do so for creating
409 * submission queues for existing external threads (see
410 * externalPush).
411 *
412 * WorkQueue field "phase" is used by both workers and the pool to
413 * manage and track whether a worker is UNSIGNALLED (possibly
414 * blocked waiting for a signal). When a worker is enqueued its
415 * phase field is set. Note that phase field updates lag queue CAS
416 * releases so usage requires care -- seeing a negative phase does
417 * not guarantee that the worker is available. When queued, the
418 * lower 16 bits of scanState must hold its pool index. So we
419 * place the index there upon initialization (see registerWorker)
420 * and otherwise keep it there or restore it when necessary.
421 *
422 * The ctl field also serves as the basis for memory
423 * synchronization surrounding activation. This uses a more
424 * efficient version of a Dekker-like rule that task producers and
425 * consumers sync with each other by both writing/CASing ctl (even
426 * if to its current value). This would be extremely costly. So
427 * we relax it in several ways: (1) Producers only signal when
428 * their queue is empty. Other workers propagate this signal (in
429 * method scan) when they find tasks; to further reduce flailing,
430 * each worker signals only one other per activation. (2) Workers
431 * only enqueue after scanning (see below) and not finding any
432 * tasks. (3) Rather than CASing ctl to its current value in the
433 * common case where no action is required, we reduce write
434 * contention by equivalently prefacing signalWork when called by
435 * an external task producer using a memory access with
436 * full-volatile semantics or a "fullFence".
437 *
438 * Almost always, too many signals are issued. A task producer
439 * cannot in general tell if some existing worker is in the midst
440 * of finishing one task (or already scanning) and ready to take
441 * another without being signalled. So the producer might instead
442 * activate a different worker that does not find any work, and
443 * then inactivates. This scarcely matters in steady-state
444 * computations involving all workers, but can create contention
445 * and bookkeeping bottlenecks during ramp-up, ramp-down, and small
446 * computations involving only a few workers.
447 *
448 * Scanning. Method runWorker performs top-level scanning for
449 * tasks. Each scan traverses and tries to poll from each queue
450 * starting at a random index and circularly stepping. Scans are
451 * not performed in ideal random permutation order, to reduce
452 * cacheline contention. The pseudorandom generator need not have
453 * high-quality statistical properties in the long term, but just
454 * within computations; We use Marsaglia XorShifts (often via
455 * ThreadLocalRandom.nextSecondarySeed), which are cheap and
456 * suffice. Scanning also employs contention reduction: When
457 * scanning workers fail to extract an apparently existing task,
458 * they soon restart at a different pseudorandom index. This
459 * improves throughput when many threads are trying to take tasks
460 * from few queues, which can be common in some usages. Scans do
461 * not otherwise explicitly take into account core affinities,
462 * loads, cache localities, etc, However, they do exploit temporal
463 * locality (which usually approximates these) by preferring to
464 * re-poll (at most #workers times) from the same queue after a
465 * successful poll before trying others.
466 *
467 * Trimming workers. To release resources after periods of lack of
468 * use, a worker starting to wait when the pool is quiescent will
469 * time out and terminate (see method scan) if the pool has
470 * remained quiescent for period given by field keepAlive.
471 *
472 * Shutdown and Termination. A call to shutdownNow invokes
473 * tryTerminate to atomically set a runState bit. The calling
474 * thread, as well as every other worker thereafter terminating,
475 * helps terminate others by cancelling their unprocessed tasks,
476 * and waking them up, doing so repeatedly until stable. Calls to
477 * non-abrupt shutdown() preface this by checking whether
478 * termination should commence by sweeping through queues (until
479 * stable) to ensure lack of in-flight submissions and workers
480 * about to process them before triggering the "STOP" phase of
481 * termination.
482 *
483 * Joining Tasks
484 * =============
485 *
486 * Any of several actions may be taken when one worker is waiting
487 * to join a task stolen (or always held) by another. Because we
488 * are multiplexing many tasks on to a pool of workers, we can't
489 * always just let them block (as in Thread.join). We also cannot
517 * actively joined task. Thus, the joiner executes a task that
518 * would be on its own local deque if the to-be-joined task had
519 * not been stolen. This is a conservative variant of the approach
520 * described in Wagner & Calder "Leapfrogging: a portable
521 * technique for implementing efficient futures" SIGPLAN Notices,
522 * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs
523 * mainly in that we only record queue ids, not full dependency
524 * links. This requires a linear scan of the workQueues array to
525 * locate stealers, but isolates cost to when it is needed, rather
526 * than adding to per-task overhead. Searches can fail to locate
527 * stealers GC stalls and the like delay recording sources.
528 * Further, even when accurately identified, stealers might not
529 * ever produce a task that the joiner can in turn help with. So,
530 * compensation is tried upon failure to find tasks to run.
531 *
532 * Compensation does not by default aim to keep exactly the target
533 * parallelism number of unblocked threads running at any given
534 * time. Some previous versions of this class employed immediate
535 * compensations for any blocked join. However, in practice, the
536 * vast majority of blockages are transient byproducts of GC and
537 * other JVM or OS activities that are made worse by replacement.
538 * Rather than impose arbitrary policies, we allow users to
539 * override the default of only adding threads upon apparent
540 * starvation. The compensation mechanism may also be bounded.
541 * Bounds for the commonPool (see COMMON_MAX_SPARES) better enable
542 * JVMs to cope with programming errors and abuse before running
543 * out of resources to do so.
544 *
545 * Common Pool
546 * ===========
547 *
548 * The static common pool always exists after static
549 * initialization. Since it (or any other created pool) need
550 * never be used, we minimize initial construction overhead and
551 * footprint to the setup of about a dozen fields.
552 *
553 * When external threads submit to the common pool, they can
554 * perform subtask processing (see externalHelpComplete and
555 * related methods) upon joins. This caller-helps policy makes it
556 * sensible to set common pool parallelism level to one (or more)
557 * less than the total number of available cores, or even zero for
558 * pure caller-runs. We do not need to record whether external
559 * submissions are to the common pool -- if not, external help
560 * methods return quickly. These submitters would otherwise be
561 * blocked waiting for completion, so the extra effort (with
562 * liberally sprinkled task status checks) in inapplicable cases
563 * amounts to an odd form of limited spin-wait before blocking in
564 * ForkJoinTask.join.
565 *
566 * As a more appropriate default in managed environments, unless
567 * overridden by system properties, we use workers of subclass
568 * InnocuousForkJoinWorkerThread when there is a SecurityManager
569 * present. These workers have no permissions set, do not belong
570 * to any user-defined ThreadGroup, and erase all ThreadLocals
571 * after executing any top-level task (see
572 * WorkQueue.afterTopLevelExec). The associated mechanics (mainly
573 * in ForkJoinWorkerThread) may be JVM-dependent and must access
574 * particular Thread class fields to achieve this effect.
575 *
576 * Style notes
577 * ===========
578 *
579 * Memory ordering relies mainly on VarHandles. This can be
580 * awkward and ugly, but also reflects the need to control
581 * outcomes across the unusual cases that arise in very racy code
582 * with very few invariants. All fields are read into locals
583 * before use, and null-checked if they are references. This is
584 * usually done in a "C"-like style of listing declarations at the
585 * heads of methods or blocks, and using inline assignments on
586 * first encounter. Nearly all explicit checks lead to
587 * bypass/return, not exception throws, because they may
588 * legitimately arise due to cancellation/revocation during
589 * shutdown.
590 *
591 * There is a lot of representation-level coupling among classes
592 * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask. The
593 * fields of WorkQueue maintain data structures managed by
594 * ForkJoinPool, so are directly accessed. There is little point
595 * trying to reduce this, since any associated future changes in
596 * representations will need to be accompanied by algorithmic
597 * changes anyway. Several methods intrinsically sprawl because
598 * they must accumulate sets of consistent reads of fields held in
599 * local variables. There are also other coding oddities
600 * (including several unnecessary-looking hoisted null checks)
601 * that help some methods perform reasonably even when interpreted
602 * (not compiled).
603 *
604 * The order of declarations in this file is (with a few exceptions):
605 * (1) Static utility functions
606 * (2) Nested (static) classes
607 * (3) Static fields
608 * (4) Fields, along with constants used when unpacking some of them
609 * (5) Internal control methods
610 * (6) Callbacks and other support for ForkJoinTask methods
611 * (7) Exported methods
612 * (8) Static block initializing statics in minimally dependent order
613 */
614
615 // Static utilities
616
617 /**
618 * If there is a security manager, makes sure caller has
619 * permission to modify threads.
620 */
621 private static void checkPermission() {
622 SecurityManager security = System.getSecurityManager();
686 static final int SWIDTH = 16; // width of short
687 static final int SMASK = 0xffff; // short bits == max index
688 static final int MAX_CAP = 0x7fff; // max #workers - 1
689 static final int SQMASK = 0x007e; // max 64 (even) slots
690
691 // Masks and units for WorkQueue.phase and ctl sp subfield
692 static final int UNSIGNALLED = 1 << 31; // must be negative
693 static final int SS_SEQ = 1 << 16; // version count
694 static final int QLOCK = 1; // must be 1
695
696 // Mode bits and sentinels, some also used in WorkQueue id and.source fields
697 static final int OWNED = 1; // queue has owner thread
698 static final int FIFO = 1 << 16; // fifo queue or access mode
699 static final int SHUTDOWN = 1 << 18;
700 static final int TERMINATED = 1 << 19;
701 static final int STOP = 1 << 31; // must be negative
702 static final int QUIET = 1 << 30; // not scanning or working
703 static final int DORMANT = QUIET | UNSIGNALLED;
704
705 /**
706 * The maximum number of local polls from the same queue before
707 * checking others. This is a safeguard against infinitely unfair
708 * looping under unbounded user task recursion, and must be larger
709 * than plausible cases of intentional bounded task recursion.
710 */
711 static final int POLL_LIMIT = 1 << 10;
712
713 /**
714 * Queues supporting work-stealing as well as external task
715 * submission. See above for descriptions and algorithms.
716 * Performance on most platforms is very sensitive to placement of
717 * instances of both WorkQueues and their arrays -- we absolutely
718 * do not want multiple WorkQueue instances or multiple queue
719 * arrays sharing cache lines. The @Contended annotation alerts
720 * JVMs to try to keep instances apart.
721 */
722 @jdk.internal.vm.annotation.Contended
723 static final class WorkQueue {
724
725 /**
726 * Capacity of work-stealing queue array upon initialization.
727 * Must be a power of two; at least 4, but should be larger to
728 * reduce or eliminate cacheline sharing among queues.
729 * Currently, it is much larger, as a partial workaround for
730 * the fact that JVMs often place arrays in locations that
731 * share GC bookkeeping (especially cardmarks) such that
732 * per-write accesses encounter serious memory contention.
733 */
734 static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
735
736 /**
737 * Maximum size for queue arrays. Must be a power of two less
738 * than or equal to 1 << (31 - width of array entry) to ensure
739 * lack of wraparound of index calculations, but defined to a
740 * value a bit less than this to help users trap runaway
741 * programs before saturating systems.
742 */
743 static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
744
745 // Instance fields
746 volatile int phase; // versioned, negative: queued, 1: locked
747 int stackPred; // pool stack (ctl) predecessor link
748 int nsteals; // number of steals
749 int id; // index, mode, tag
750 volatile int source; // source queue id, or sentinel
751 volatile int base; // index of next slot for poll
752 int top; // index of next slot for push
753 ForkJoinTask<?>[] array; // the elements (initially unallocated)
754 final ForkJoinPool pool; // the containing pool (may be null)
755 final ForkJoinWorkerThread owner; // owning thread or null if shared
756
757 WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) {
758 this.pool = pool;
759 this.owner = owner;
760 // Place indices in the center of array (that is not yet allocated)
761 base = top = INITIAL_QUEUE_CAPACITY >>> 1;
762 }
763
764 /**
765 * Returns an exportable index (used by ForkJoinWorkerThread).
766 */
767 final int getPoolIndex() {
768 return (id & 0xffff) >>> 1; // ignore odd/even tag bit
769 }
770
771 /**
772 * Returns the approximate number of tasks in the queue.
773 */
774 final int queueSize() {
775 int n = base - top; // read base first
776 return (n >= 0) ? 0 : -n; // ignore transient negative
777 }
778
779 /**
780 * Provides a more accurate estimate of whether this queue has
781 * any tasks than does queueSize, by checking whether a
782 * near-empty queue has at least one unclaimed task.
783 */
784 final boolean isEmpty() {
785 ForkJoinTask<?>[] a; int n, al, b;
786 return ((n = (b = base) - top) >= 0 || // possibly one task
787 (n == -1 && ((a = array) == null ||
788 (al = a.length) == 0 ||
789 a[(al - 1) & b] == null)));
790 }
791
792
793 /**
794 * Pushes a task. Call only by owner in unshared queues.
795 *
796 * @param task the task. Caller must ensure non-null.
797 * @throws RejectedExecutionException if array cannot be resized
798 */
799 final void push(ForkJoinTask<?> task) {
800 int s = top; ForkJoinTask<?>[] a; int al, d;
801 if ((a = array) != null && (al = a.length) > 0) {
802 int index = (al - 1) & s;
803 ForkJoinPool p = pool;
804 top = s + 1;
805 QA.setRelease(a, index, task);
806 if ((d = base - s) == 0 && p != null) {
807 VarHandle.fullFence();
808 p.signalWork();
809 }
810 else if (d + al == 1)
811 growArray();
812 }
813 }
814
815 /**
816 * Initializes or doubles the capacity of array. Call either
817 * by owner or with lock held -- it is OK for base, but not
818 * top, to move while resizings are in progress.
819 */
820 final ForkJoinTask<?>[] growArray() {
821 ForkJoinTask<?>[] oldA = array;
822 int oldSize = oldA != null ? oldA.length : 0;
823 int size = oldSize > 0 ? oldSize << 1 : INITIAL_QUEUE_CAPACITY;
824 if (size < INITIAL_QUEUE_CAPACITY || size > MAXIMUM_QUEUE_CAPACITY)
825 throw new RejectedExecutionException("Queue capacity exceeded");
826 int oldMask, t, b;
827 ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size];
828 if (oldA != null && (oldMask = oldSize - 1) > 0 &&
829 (t = top) - (b = base) > 0) {
830 int mask = size - 1;
831 do { // emulate poll from old array, push to new array
832 int index = b & oldMask;
833 ForkJoinTask<?> x = (ForkJoinTask<?>)
834 QA.getAcquire(oldA, index);
835 if (x != null &&
836 QA.compareAndSet(oldA, index, x, null))
837 a[b & mask] = x;
838 } while (++b != t);
839 VarHandle.releaseFence();
840 }
841 return a;
842 }
843
844 /**
845 * Takes next task, if one exists, in LIFO order. Call only
846 * by owner in unshared queues.
847 */
848 final ForkJoinTask<?> pop() {
849 int b = base, s = top, al, i; ForkJoinTask<?>[] a;
850 if ((a = array) != null && b != s && (al = a.length) > 0) {
851 int index = (al - 1) & --s;
852 ForkJoinTask<?> t = (ForkJoinTask<?>)
853 QA.get(a, index);
854 if (t != null &&
855 QA.compareAndSet(a, index, t, null)) {
856 top = s;
857 VarHandle.releaseFence();
858 return t;
859 }
860 }
861 return null;
862 }
863
864 /**
865 * Takes next task, if one exists, in FIFO order.
866 */
867 final ForkJoinTask<?> poll() {
868 for (;;) {
869 int b = base, s = top, d, al; ForkJoinTask<?>[] a;
870 if ((a = array) != null && (d = b - s) < 0 &&
871 (al = a.length) > 0) {
872 int index = (al - 1) & b;
873 ForkJoinTask<?> t = (ForkJoinTask<?>)
874 QA.getAcquire(a, index);
875 if (b++ == base) {
876 if (t != null) {
877 if (QA.compareAndSet(a, index, t, null)) {
878 base = b;
879 return t;
880 }
881 }
882 else if (d == -1)
883 break; // now empty
884 }
885 }
886 else
887 break;
888 }
889 return null;
890 }
891
892 /**
893 * Takes next task, if one exists, in order specified by mode.
894 */
895 final ForkJoinTask<?> nextLocalTask() {
896 return ((id & FIFO) != 0) ? poll() : pop();
897 }
898
899 /**
900 * Returns next task, if one exists, in order specified by mode.
901 */
902 final ForkJoinTask<?> peek() {
903 int al; ForkJoinTask<?>[] a;
904 return ((a = array) != null && (al = a.length) > 0) ?
905 a[(al - 1) &
906 ((id & FIFO) != 0 ? base : top - 1)] : null;
907 }
908
909 /**
910 * Pops the given task only if it is at the current top.
911 */
912 final boolean tryUnpush(ForkJoinTask<?> task) {
913 int b = base, s = top, al; ForkJoinTask<?>[] a;
914 if ((a = array) != null && b != s && (al = a.length) > 0) {
915 int index = (al - 1) & --s;
916 if (QA.compareAndSet(a, index, task, null)) {
917 top = s;
918 VarHandle.releaseFence();
919 return true;
920 }
921 }
922 return false;
923 }
924
925 /**
926 * Removes and cancels all known tasks, ignoring any exceptions.
927 */
928 final void cancelAll() {
929 for (ForkJoinTask<?> t; (t = poll()) != null; )
930 ForkJoinTask.cancelIgnoringExceptions(t);
931 }
932
933 // Specialized execution methods
934
935 /**
936 * Pops and executes up to limit consecutive tasks or until empty.
937 *
938 * @param limit max runs, or zero for no limit
939 */
940 final void localPopAndExec(int limit) {
941 for (;;) {
942 int b = base, s = top, al; ForkJoinTask<?>[] a;
943 if ((a = array) != null && b != s && (al = a.length) > 0) {
944 int index = (al - 1) & --s;
945 ForkJoinTask<?> t = (ForkJoinTask<?>)
946 QA.getAndSet(a, index, null);
947 if (t != null) {
948 top = s;
949 VarHandle.releaseFence();
950 t.doExec();
951 if (limit != 0 && --limit == 0)
952 break;
953 }
954 else
955 break;
956 }
957 else
958 break;
959 }
960 }
961
962 /**
963 * Polls and executes up to limit consecutive tasks or until empty.
964 *
965 * @param limit, or zero for no limit
966 */
967 final void localPollAndExec(int limit) {
968 for (int polls = 0;;) {
969 int b = base, s = top, d, al; ForkJoinTask<?>[] a;
970 if ((a = array) != null && (d = b - s) < 0 &&
971 (al = a.length) > 0) {
972 int index = (al - 1) & b++;
973 ForkJoinTask<?> t = (ForkJoinTask<?>)
974 QA.getAndSet(a, index, null);
975 if (t != null) {
976 base = b;
977 t.doExec();
978 if (limit != 0 && ++polls == limit)
979 break;
980 }
981 else if (d == -1)
982 break; // now empty
983 else
984 polls = 0; // stolen; reset
985 }
986 else
987 break;
988 }
989 }
990
991 /**
992 * If present, removes task from queue and executes it.
993 */
994 final void tryRemoveAndExec(ForkJoinTask<?> task) {
995 ForkJoinTask<?>[] wa; int s, wal;
996 if (base - (s = top) < 0 && // traverse from top
997 (wa = array) != null && (wal = wa.length) > 0) {
998 for (int m = wal - 1, ns = s - 1, i = ns; ; --i) {
999 int index = i & m;
1000 ForkJoinTask<?> t = (ForkJoinTask<?>)
1001 QA.get(wa, index);
1002 if (t == null)
1003 break;
1004 else if (t == task) {
1005 if (QA.compareAndSet(wa, index, t, null)) {
1006 top = ns; // safely shift down
1007 for (int j = i; j != ns; ++j) {
1008 ForkJoinTask<?> f;
1009 int pindex = (j + 1) & m;
1010 f = (ForkJoinTask<?>)QA.get(wa, pindex);
1011 QA.setVolatile(wa, pindex, null);
1012 int jindex = j & m;
1013 QA.setRelease(wa, jindex, f);
1014 }
1015 VarHandle.releaseFence();
1016 t.doExec();
1017 }
1018 break;
1019 }
1020 }
1021 }
1022 }
1023
1024 /**
1025 * Tries to steal and run tasks within the target's
1026 * computation until done, not found, or limit exceeded.
1027 *
1028 * @param task root of CountedCompleter computation
1029 * @param limit max runs, or zero for no limit
1030 * @return task status on exit
1031 */
1032 final int localHelpCC(CountedCompleter<?> task, int limit) {
1033 int status = 0;
1034 if (task != null && (status = task.status) >= 0) {
1035 for (;;) {
1036 boolean help = false;
1037 int b = base, s = top, al; ForkJoinTask<?>[] a;
1038 if ((a = array) != null && b != s && (al = a.length) > 0) {
1039 int index = (al - 1) & (s - 1);
1040 ForkJoinTask<?> o = (ForkJoinTask<?>)
1041 QA.get(a, index);
1042 if (o instanceof CountedCompleter) {
1043 CountedCompleter<?> t = (CountedCompleter<?>)o;
1044 for (CountedCompleter<?> f = t;;) {
1045 if (f != task) {
1046 if ((f = f.completer) == null) // try parent
1047 break;
1048 }
1049 else {
1050 if (QA.compareAndSet(a, index, t, null)) {
1051 top = s - 1;
1052 VarHandle.releaseFence();
1053 t.doExec();
1054 help = true;
1055 }
1056 break;
1057 }
1058 }
1059 }
1060 }
1061 if ((status = task.status) < 0 || !help ||
1062 (limit != 0 && --limit == 0))
1063 break;
1064 }
1065 }
1066 return status;
1067 }
1068
1069 // Operations on shared queues
1070
1071 /**
1072 * Tries to lock shared queue by CASing phase field.
1073 */
1074 final boolean tryLockSharedQueue() {
1075 return PHASE.compareAndSet(this, 0, QLOCK);
1076 }
1077
1078 /**
1079 * Shared version of tryUnpush.
1080 */
1081 final boolean trySharedUnpush(ForkJoinTask<?> task) {
1082 boolean popped = false;
1083 int s = top - 1, al; ForkJoinTask<?>[] a;
1084 if ((a = array) != null && (al = a.length) > 0) {
1085 int index = (al - 1) & s;
1086 ForkJoinTask<?> t = (ForkJoinTask<?>) QA.get(a, index);
1087 if (t == task &&
1088 PHASE.compareAndSet(this, 0, QLOCK)) {
1089 if (top == s + 1 && array == a &&
1090 QA.compareAndSet(a, index, task, null)) {
1091 popped = true;
1092 top = s;
1093 }
1094 PHASE.setRelease(this, 0);
1095 }
1096 }
1097 return popped;
1098 }
1099
1100 /**
1101 * Shared version of localHelpCC.
1102 */
1103 final int sharedHelpCC(CountedCompleter<?> task, int limit) {
1104 int status = 0;
1105 if (task != null && (status = task.status) >= 0) {
1106 for (;;) {
1107 boolean help = false;
1108 int b = base, s = top, al; ForkJoinTask<?>[] a;
1109 if ((a = array) != null && b != s && (al = a.length) > 0) {
1110 int index = (al - 1) & (s - 1);
1111 ForkJoinTask<?> o = (ForkJoinTask<?>)
1112 QA.get(a, index);
1113 if (o instanceof CountedCompleter) {
1114 CountedCompleter<?> t = (CountedCompleter<?>)o;
1115 for (CountedCompleter<?> f = t;;) {
1116 if (f != task) {
1117 if ((f = f.completer) == null)
1118 break;
1119 }
1120 else {
1121 if (PHASE.compareAndSet(this, 0, QLOCK)) {
1122 if (top == s && array == a &&
1123 QA.compareAndSet(a, index, t, null)) {
1124 help = true;
1125 top = s - 1;
1126 }
1127 PHASE.setRelease(this, 0);
1128 if (help)
1129 t.doExec();
1130 }
1131 break;
1132 }
1133 }
1134 }
1135 }
1136 if ((status = task.status) < 0 || !help ||
1137 (limit != 0 && --limit == 0))
1138 break;
1139 }
1140 }
1141 return status;
1142 }
1143
1144 /**
1145 * Returns true if owned and not known to be blocked.
1146 */
1147 final boolean isApparentlyUnblocked() {
1148 Thread wt; Thread.State s;
1149 return ((wt = owner) != null &&
1150 (s = wt.getState()) != Thread.State.BLOCKED &&
1151 s != Thread.State.WAITING &&
1152 s != Thread.State.TIMED_WAITING);
1153 }
1154
1155 // VarHandle mechanics.
1156 private static final VarHandle PHASE;
1157 static {
1158 try {
1159 MethodHandles.Lookup l = MethodHandles.lookup();
1160 PHASE = l.findVarHandle(WorkQueue.class, "phase", int.class);
1161 } catch (ReflectiveOperationException e) {
1162 throw new Error(e);
1163 }
1164 }
1165 }
1166
1167 // static fields (initialized in static initializer below)
1168
1169 /**
1170 * Creates a new ForkJoinWorkerThread. This factory is used unless
1171 * overridden in ForkJoinPool constructors.
1172 */
1173 public static final ForkJoinWorkerThreadFactory
1174 defaultForkJoinWorkerThreadFactory;
1175
1176 /**
1177 * Permission required for callers of methods that may start or
1178 * kill threads.
1179 */
1180 static final RuntimePermission modifyThreadPermission;
1181
1182 /**
1339 (TC_MASK & (c + TC_UNIT)));
1340 if (ctl == c && CTL.compareAndSet(this, c, nc)) {
1341 createWorker();
1342 break;
1343 }
1344 } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
1345 }
1346
1347 /**
1348 * Callback from ForkJoinWorkerThread constructor to establish and
1349 * record its WorkQueue.
1350 *
1351 * @param wt the worker thread
1352 * @return the worker's queue
1353 */
1354 final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
1355 UncaughtExceptionHandler handler;
1356 wt.setDaemon(true); // configure thread
1357 if ((handler = ueh) != null)
1358 wt.setUncaughtExceptionHandler(handler);
1359 WorkQueue w = new WorkQueue(this, wt);
1360 int tid = 0; // for thread name
1361 int fifo = mode & FIFO;
1362 String prefix = workerNamePrefix;
1363 if (prefix != null) {
1364 synchronized (prefix) {
1365 WorkQueue[] ws = workQueues; int n;
1366 int s = indexSeed += SEED_INCREMENT;
1367 if (ws != null && (n = ws.length) > 1) {
1368 int m = n - 1;
1369 tid = s & m;
1370 int i = m & ((s << 1) | 1); // odd-numbered indices
1371 for (int probes = n >>> 1;;) { // find empty slot
1372 WorkQueue q;
1373 if ((q = ws[i]) == null || q.phase == QUIET)
1374 break;
1375 else if (--probes == 0) {
1376 i = n | 1; // resize below
1377 break;
1378 }
1379 else
1380 i = (i + 2) & m;
1381 }
1382
1383 int id = i | fifo | (s & ~(SMASK | FIFO | DORMANT));
1384 w.phase = w.id = id; // now publishable
1385
1386 if (i < n)
1387 ws[i] = w;
1388 else { // expand array
1389 int an = n << 1;
1390 WorkQueue[] as = new WorkQueue[an];
1391 as[i] = w;
1392 int am = an - 1;
1393 for (int j = 0; j < n; ++j) {
1394 WorkQueue v; // copy external queue
1395 if ((v = ws[j]) != null) // position may change
1396 as[v.id & am & SQMASK] = v;
1397 if (++j >= n)
1398 break;
1399 as[j] = ws[j]; // copy worker
1400 }
1401 workQueues = as;
1402 }
1403 }
1404 }
1405 wt.setName(prefix.concat(Integer.toString(tid)));
1406 }
1407 return w;
1408 }
1409
1410 /**
1411 * Final callback from terminating worker, as well as upon failure
1412 * to construct or start a worker. Removes record of worker from
1413 * array, and adjusts counts. If pool is shutting down, tries to
1414 * complete termination.
1415 *
1416 * @param wt the worker thread, or null if construction failed
1417 * @param ex the exception causing failure, or null if none
1418 */
1419 final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
1420 WorkQueue w = null;
1421 int phase = 0;
1422 if (wt != null && (w = wt.workQueue) != null) {
1423 Object lock = workerNamePrefix;
1424 long ns = (long)w.nsteals & 0xffffffffL;
1425 int idx = w.id & SMASK;
1426 if (lock != null) {
1427 WorkQueue[] ws; // remove index from array
1428 synchronized (lock) {
1429 if ((ws = workQueues) != null && ws.length > idx &&
1430 ws[idx] == w)
1431 ws[idx] = null;
1432 stealCount += ns;
1433 }
1434 }
1435 phase = w.phase;
1436 }
1437 if (phase != QUIET) { // else pre-adjusted
1438 long c; // decrement counts
1439 do {} while (!CTL.weakCompareAndSet
1440 (this, c = ctl, ((RC_MASK & (c - RC_UNIT)) |
1441 (TC_MASK & (c - TC_UNIT)) |
1442 (SP_MASK & c))));
1443 }
1444 if (w != null)
1445 w.cancelAll(); // cancel remaining tasks
1446
1447 if (!tryTerminate(false, false) && // possibly replace worker
1448 w != null && w.array != null) // avoid repeated failures
1449 signalWork();
1450
1451 if (ex == null) // help clean on way out
1463 if ((c = ctl) >= 0L) // enough workers
1464 break;
1465 else if ((sp = (int)c) == 0) { // no idle workers
1466 if ((c & ADD_WORKER) != 0L) // too few workers
1467 tryAddWorker(c);
1468 break;
1469 }
1470 else if ((ws = workQueues) == null)
1471 break; // unstarted/terminated
1472 else if (ws.length <= (i = sp & SMASK))
1473 break; // terminated
1474 else if ((v = ws[i]) == null)
1475 break; // terminating
1476 else {
1477 int np = sp & ~UNSIGNALLED;
1478 int vp = v.phase;
1479 long nc = (v.stackPred & SP_MASK) | (UC_MASK & (c + RC_UNIT));
1480 Thread vt = v.owner;
1481 if (sp == vp && CTL.compareAndSet(this, c, nc)) {
1482 v.phase = np;
1483 if (v.source < 0)
1484 LockSupport.unpark(vt);
1485 break;
1486 }
1487 }
1488 }
1489 }
1490
1491 /**
1492 * Tries to decrement counts (sometimes implicitly) and possibly
1493 * arrange for a compensating worker in preparation for blocking:
1494 * If not all core workers yet exist, creates one, else if any are
1495 * unreleased (possibly including caller) releases one, else if
1496 * fewer than the minimum allowed number of workers running,
1497 * checks to see that they are all active, and if so creates an
1498 * extra worker unless over maximum limit and policy is to
1499 * saturate. Most of these steps can fail due to interference, in
1500 * which case 0 is returned so caller will retry. A negative
1501 * return value indicates that the caller doesn't need to
1502 * re-adjust counts when later unblocked.
1503 *
1504 * @return 1: block then adjust, -1: block without adjust, 0 : retry
1505 */
1506 private int tryCompensate(WorkQueue w) {
1507 int t, n, sp;
1508 long c = ctl;
1509 WorkQueue[] ws = workQueues;
1510 if ((t = (short)(c >>> TC_SHIFT)) >= 0) {
1511 if (ws == null || (n = ws.length) <= 0 || w == null)
1512 return 0; // disabled
1513 else if ((sp = (int)c) != 0) { // replace or release
1514 WorkQueue v = ws[sp & (n - 1)];
1515 int wp = w.phase;
1516 long uc = UC_MASK & ((wp < 0) ? c + RC_UNIT : c);
1517 int np = sp & ~UNSIGNALLED;
1518 if (v != null) {
1519 int vp = v.phase;
1520 Thread vt = v.owner;
1521 long nc = ((long)v.stackPred & SP_MASK) | uc;
1522 if (vp == sp && CTL.compareAndSet(this, c, nc)) {
1523 v.phase = np;
1524 if (v.source < 0)
1525 LockSupport.unpark(vt);
1526 return (wp < 0) ? -1 : 1;
1527 }
1528 }
1529 return 0;
1530 }
1531 else if ((int)(c >> RC_SHIFT) - // reduce parallelism
1532 (short)(bounds & SMASK) > 0) {
1533 long nc = ((RC_MASK & (c - RC_UNIT)) | (~RC_MASK & c));
1534 return CTL.compareAndSet(this, c, nc) ? 1 : 0;
1535 }
1536 else { // validate
1537 int md = mode, pc = md & SMASK, tc = pc + t, bc = 0;
1538 boolean unstable = false;
1539 for (int i = 1; i < n; i += 2) {
1540 WorkQueue q; Thread wt; Thread.State ts;
1541 if ((q = ws[i]) != null) {
1542 if (q.source == 0) {
1543 unstable = true;
1544 break;
1561 else if (bc < pc) { // lagging
1562 Thread.yield(); // for retry spins
1563 return 0;
1564 }
1565 else
1566 throw new RejectedExecutionException(
1567 "Thread limit exceeded replacing blocked worker");
1568 }
1569 }
1570 }
1571
1572 long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK); // expand pool
1573 return CTL.compareAndSet(this, c, nc) && createWorker() ? 1 : 0;
1574 }
1575
1576 /**
1577 * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
1578 * See above for explanation.
1579 */
1580 final void runWorker(WorkQueue w) {
1581 WorkQueue[] ws;
1582 w.growArray(); // allocate queue
1583 int r = w.id ^ ThreadLocalRandom.nextSecondarySeed();
1584 if (r == 0) // initial nonzero seed
1585 r = 1;
1586 int lastSignalId = 0; // avoid unneeded signals
1587 while ((ws = workQueues) != null) {
1588 boolean nonempty = false; // scan
1589 for (int n = ws.length, j = n, m = n - 1; j > 0; --j) {
1590 WorkQueue q; int i, b, al; ForkJoinTask<?>[] a;
1591 if ((i = r & m) >= 0 && i < n && // always true
1592 (q = ws[i]) != null && (b = q.base) - q.top < 0 &&
1593 (a = q.array) != null && (al = a.length) > 0) {
1594 int qid = q.id; // (never zero)
1595 int index = (al - 1) & b;
1596 ForkJoinTask<?> t = (ForkJoinTask<?>)
1597 QA.getAcquire(a, index);
1598 if (t != null && b++ == q.base &&
1599 QA.compareAndSet(a, index, t, null)) {
1600 if ((q.base = b) - q.top < 0 && qid != lastSignalId)
1601 signalWork(); // propagate signal
1602 w.source = lastSignalId = qid;
1603 t.doExec();
1604 if ((w.id & FIFO) != 0) // run remaining locals
1605 w.localPollAndExec(POLL_LIMIT);
1606 else
1607 w.localPopAndExec(POLL_LIMIT);
1608 ForkJoinWorkerThread thread = w.owner;
1609 ++w.nsteals;
1610 w.source = 0; // now idle
1611 if (thread != null)
1612 thread.afterTopLevelExec();
1613 }
1614 nonempty = true;
1615 }
1616 else if (nonempty)
1617 break;
1618 else
1619 ++r;
1620 }
1621
1622 if (nonempty) { // move (xorshift)
1623 r ^= r << 13; r ^= r >>> 17; r ^= r << 5;
1624 }
1625 else {
1626 int phase;
1627 lastSignalId = 0; // clear for next scan
1628 if ((phase = w.phase) >= 0) { // enqueue
1629 int np = w.phase = (phase + SS_SEQ) | UNSIGNALLED;
1630 long c, nc;
1631 do {
1632 w.stackPred = (int)(c = ctl);
1633 nc = ((c - RC_UNIT) & UC_MASK) | (SP_MASK & np);
1634 } while (!CTL.weakCompareAndSet(this, c, nc));
1635 }
1636 else { // already queued
1637 int pred = w.stackPred;
1638 w.source = DORMANT; // enable signal
1639 for (int steps = 0;;) {
1640 int md, rc; long c;
1641 if (w.phase >= 0) {
1642 w.source = 0;
1643 break;
1644 }
1645 else if ((md = mode) < 0) // shutting down
1646 return;
1647 else if ((rc = ((md & SMASK) + // possibly quiescent
1648 (int)((c = ctl) >> RC_SHIFT))) <= 0 &&
1649 (md & SHUTDOWN) != 0 &&
1650 tryTerminate(false, false))
1651 return; // help terminate
1652 else if ((++steps & 1) == 0)
1653 Thread.interrupted(); // clear between parks
1654 else if (rc <= 0 && pred != 0 && phase == (int)c) {
1655 long d = keepAlive + System.currentTimeMillis();
1656 LockSupport.parkUntil(this, d);
1657 if (ctl == c &&
1658 d - System.currentTimeMillis() <= TIMEOUT_SLOP) {
1659 long nc = ((UC_MASK & (c - TC_UNIT)) |
1660 (SP_MASK & pred));
1661 if (CTL.compareAndSet(this, c, nc)) {
1662 w.phase = QUIET;
1663 return; // drop on timeout
1664 }
1665 }
1666 }
1667 else
1668 LockSupport.park(this);
1669 }
1670 }
1671 }
1672 }
1673 }
1674
1675 /**
1676 * Helps and/or blocks until the given task is done or timeout.
1677 * First tries locally helping, then scans other queues for a task
1678 * produced by one of w's stealers; compensating and blocking if
1679 * none are found (rescanning if tryCompensate fails).
1680 *
1681 * @param w caller
1682 * @param task the task
1683 * @param deadline for timed waits, if nonzero
1684 * @return task status on exit
1685 */
1686 final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
1687 int s = 0;
1688 if (w != null && task != null &&
1689 (!(task instanceof CountedCompleter) ||
1690 (s = w.localHelpCC((CountedCompleter<?>)task, 0)) >= 0)) {
1691 w.tryRemoveAndExec(task);
1692 int src = w.source, id = w.id;
1693 s = task.status;
1694 while (s >= 0) {
1695 WorkQueue[] ws;
1696 boolean nonempty = false;
1697 int r = ThreadLocalRandom.nextSecondarySeed() | 1; // odd indices
1698 if ((ws = workQueues) != null) { // scan for matching id
1699 for (int n = ws.length, m = n - 1, j = -n; j < n; j += 2) {
1700 WorkQueue q; int i, b, al; ForkJoinTask<?>[] a;
1701 if ((i = (r + j) & m) >= 0 && i < n &&
1702 (q = ws[i]) != null && q.source == id &&
1703 (b = q.base) - q.top < 0 &&
1704 (a = q.array) != null && (al = a.length) > 0) {
1705 int qid = q.id;
1706 int index = (al - 1) & b;
1707 ForkJoinTask<?> t = (ForkJoinTask<?>)
1708 QA.getAcquire(a, index);
1709 if (t != null && b++ == q.base && id == q.source &&
1710 QA.compareAndSet(a, index, t, null)) {
1711 q.base = b;
1712 w.source = qid;
1713 t.doExec();
1714 w.source = src;
1715 }
1716 nonempty = true;
1717 break;
1718 }
1719 }
1720 }
1721 if ((s = task.status) < 0)
1722 break;
1723 else if (!nonempty) {
1724 long ms, ns; int block;
1725 if (deadline == 0L)
1726 ms = 0L; // untimed
1727 else if ((ns = deadline - System.nanoTime()) <= 0L)
1728 break; // timeout
1729 else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
1730 ms = 1L; // avoid 0 for timed wait
1731 if ((block = tryCompensate(w)) != 0) {
1732 task.internalWait(ms);
1733 CTL.getAndAdd(this, (block > 0) ? RC_UNIT : 0L);
1734 }
1735 s = task.status;
1736 }
1737 }
1738 }
1739 return s;
1740 }
1741
1742 /**
1743 * Runs tasks until {@code isQuiescent()}. Rather than blocking
1744 * when tasks cannot be found, rescans until all others cannot
1745 * find tasks either.
1746 */
1747 final void helpQuiescePool(WorkQueue w) {
1748 int prevSrc = w.source, fifo = w.id & FIFO;
1749 for (int source = prevSrc, released = -1;;) { // -1 until known
1750 WorkQueue[] ws;
1751 if (fifo != 0)
1752 w.localPollAndExec(0);
1753 else
1754 w.localPopAndExec(0);
1755 if (released == -1 && w.phase >= 0)
1756 released = 1;
1757 boolean quiet = true, empty = true;
1758 int r = ThreadLocalRandom.nextSecondarySeed();
1759 if ((ws = workQueues) != null) {
1760 for (int n = ws.length, j = n, m = n - 1; j > 0; --j) {
1761 WorkQueue q; int i, b, al; ForkJoinTask<?>[] a;
1762 if ((i = (r - j) & m) >= 0 && i < n && (q = ws[i]) != null) {
1763 if ((b = q.base) - q.top < 0 &&
1764 (a = q.array) != null && (al = a.length) > 0) {
1765 int qid = q.id;
1766 if (released == 0) { // increment
1767 released = 1;
1768 CTL.getAndAdd(this, RC_UNIT);
1769 }
1770 int index = (al - 1) & b;
1771 ForkJoinTask<?> t = (ForkJoinTask<?>)
1772 QA.getAcquire(a, index);
1773 if (t != null && b++ == q.base &&
1774 QA.compareAndSet(a, index, t, null)) {
1775 q.base = b;
1776 w.source = source = q.id;
1777 t.doExec();
1778 w.source = source = prevSrc;
1779 }
1780 quiet = empty = false;
1781 break;
1782 }
1783 else if ((q.source & QUIET) == 0)
1784 quiet = false;
1785 }
1786 }
1787 }
1788 if (quiet) {
1789 if (released == 0)
1790 CTL.getAndAdd(this, RC_UNIT);
1791 w.source = prevSrc;
1792 break;
1793 }
1794 else if (empty) {
1795 if (source != QUIET)
1796 w.source = source = QUIET;
1797 if (released == 1) { // decrement
1798 released = 0;
1799 CTL.getAndAdd(this, RC_MASK & -RC_UNIT);
1800 }
1801 }
1802 }
1803 }
1804
1805 /**
1806 * Scans for and returns a polled task, if available.
1807 * Used only for untracked polls.
1808 *
1809 * @param submissionsOnly if true, only scan submission queues
1810 */
1811 private ForkJoinTask<?> pollScan(boolean submissionsOnly) {
1812 WorkQueue[] ws; int n;
1813 rescan: while ((mode & STOP) == 0 && (ws = workQueues) != null &&
1814 (n = ws.length) > 0) {
1815 int m = n - 1;
1816 int r = ThreadLocalRandom.nextSecondarySeed();
1817 int h = r >>> 16;
1818 int origin, step;
1819 if (submissionsOnly) {
1820 origin = (r & ~1) & m; // even indices and steps
1821 step = (h & ~1) | 2;
1822 }
1823 else {
1824 origin = r & m;
1825 step = h | 1;
1826 }
1827 for (int k = origin, oldSum = 0, checkSum = 0;;) {
1828 WorkQueue q; int b, al; ForkJoinTask<?>[] a;
1829 if ((q = ws[k]) != null) {
1830 checkSum += b = q.base;
1831 if (b - q.top < 0 &&
1832 (a = q.array) != null && (al = a.length) > 0) {
1833 int index = (al - 1) & b;
1834 ForkJoinTask<?> t = (ForkJoinTask<?>)
1835 QA.getAcquire(a, index);
1836 if (t != null && b++ == q.base &&
1837 QA.compareAndSet(a, index, t, null)) {
1838 q.base = b;
1839 return t;
1840 }
1841 else
1842 break; // restart
1843 }
1844 }
1845 if ((k = (k + step) & m) == origin) {
1846 if (oldSum == (oldSum = checkSum))
1847 break rescan;
1848 checkSum = 0;
1849 }
1850 }
1851 }
1852 return null;
1853 }
1854
1855 /**
1856 * Gets and removes a local or stolen task for the given worker.
1857 *
1858 * @return a task, if available
1859 */
1860 final ForkJoinTask<?> nextTaskFor(WorkQueue w) {
1861 ForkJoinTask<?> t;
1862 if (w != null &&
1863 (t = (w.id & FIFO) != 0 ? w.poll() : w.pop()) != null)
1864 return t;
1865 else
1866 return pollScan(false);
1867 }
1868
1869 // External operations
1870
1871 /**
1872 * Adds the given task to a submission queue at submitter's
1873 * current queue, creating one if null or contended.
1874 *
1875 * @param task the task. Caller must ensure non-null.
1876 */
1877 final void externalPush(ForkJoinTask<?> task) {
1878 int r; // initialize caller's probe
1879 if ((r = ThreadLocalRandom.getProbe()) == 0) {
1880 ThreadLocalRandom.localInit();
1881 r = ThreadLocalRandom.getProbe();
1882 }
1883 for (;;) {
1884 int md = mode, n;
1885 WorkQueue[] ws = workQueues;
1886 if ((md & SHUTDOWN) != 0 || ws == null || (n = ws.length) <= 0)
1887 throw new RejectedExecutionException();
1888 else {
1889 WorkQueue q;
1890 boolean push = false, grow = false;
1891 if ((q = ws[(n - 1) & r & SQMASK]) == null) {
1892 Object lock = workerNamePrefix;
1893 int qid = (r | QUIET) & ~(FIFO | OWNED);
1894 q = new WorkQueue(this, null);
1895 q.id = qid;
1896 q.source = QUIET;
1897 q.phase = QLOCK; // lock queue
1898 if (lock != null) {
1899 synchronized (lock) { // lock pool to install
1900 int i;
1901 if ((ws = workQueues) != null &&
1902 (n = ws.length) > 0 &&
1903 ws[i = qid & (n - 1) & SQMASK] == null) {
1904 ws[i] = q;
1905 push = grow = true;
1906 }
1907 }
1908 }
1909 }
1910 else if (q.tryLockSharedQueue()) {
1911 int b = q.base, s = q.top, al, d; ForkJoinTask<?>[] a;
1912 if ((a = q.array) != null && (al = a.length) > 0 &&
1913 al - 1 + (d = b - s) > 0) {
1914 a[(al - 1) & s] = task;
1915 q.top = s + 1; // relaxed writes OK here
1916 q.phase = 0;
1917 if (d < 0 && q.base - s < -1)
1918 break; // no signal needed
1919 }
1920 else
1921 grow = true;
1922 push = true;
1923 }
1924 if (push) {
1925 if (grow) {
1926 try {
1927 q.growArray();
1928 int s = q.top, al; ForkJoinTask<?>[] a;
1929 if ((a = q.array) != null && (al = a.length) > 0) {
1930 a[(al - 1) & s] = task;
1931 q.top = s + 1;
1932 }
1933 } finally {
1934 q.phase = 0;
1935 }
1936 }
1937 signalWork();
1938 break;
1939 }
1940 else // move if busy
1941 r = ThreadLocalRandom.advanceProbe(r);
1942 }
1943 }
1944 }
1945
1946 /**
1947 * Pushes a possibly-external submission.
1948 */
1949 private <T> ForkJoinTask<T> externalSubmit(ForkJoinTask<T> task) {
1950 Thread t; ForkJoinWorkerThread w; WorkQueue q;
1951 if (task == null)
1952 throw new NullPointerException();
1953 if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
1954 (w = (ForkJoinWorkerThread)t).pool == this &&
1955 (q = w.workQueue) != null)
1956 q.push(task);
1957 else
1958 externalPush(task);
1959 return task;
1960 }
1961
1963 * Returns common pool queue for an external thread.
1964 */
1965 static WorkQueue commonSubmitterQueue() {
1966 ForkJoinPool p = common;
1967 int r = ThreadLocalRandom.getProbe();
1968 WorkQueue[] ws; int n;
1969 return (p != null && (ws = p.workQueues) != null &&
1970 (n = ws.length) > 0) ?
1971 ws[(n - 1) & r & SQMASK] : null;
1972 }
1973
1974 /**
1975 * Performs tryUnpush for an external submitter.
1976 */
1977 final boolean tryExternalUnpush(ForkJoinTask<?> task) {
1978 int r = ThreadLocalRandom.getProbe();
1979 WorkQueue[] ws; WorkQueue w; int n;
1980 return ((ws = workQueues) != null &&
1981 (n = ws.length) > 0 &&
1982 (w = ws[(n - 1) & r & SQMASK]) != null &&
1983 w.trySharedUnpush(task));
1984 }
1985
1986 /**
1987 * Performs helpComplete for an external submitter.
1988 */
1989 final int externalHelpComplete(CountedCompleter<?> task, int maxTasks) {
1990 int r = ThreadLocalRandom.getProbe();
1991 WorkQueue[] ws; WorkQueue w; int n;
1992 return ((ws = workQueues) != null && (n = ws.length) > 0 &&
1993 (w = ws[(n - 1) & r & SQMASK]) != null) ?
1994 w.sharedHelpCC(task, maxTasks) : 0;
1995 }
1996
1997 /**
1998 * Tries to steal and run tasks within the target's computation.
1999 * The maxTasks argument supports external usages; internal calls
2000 * use zero, allowing unbounded steps (external calls trap
2001 * non-positive values).
2002 *
2003 * @param w caller
2004 * @param maxTasks if non-zero, the maximum number of other tasks to run
2005 * @return task status on exit
2006 */
2007 final int helpComplete(WorkQueue w, CountedCompleter<?> task,
2008 int maxTasks) {
2009 return (w == null) ? 0 : w.localHelpCC(task, maxTasks);
2010 }
2011
2012 /**
2013 * Returns a cheap heuristic guide for task partitioning when
2014 * programmers, frameworks, tools, or languages have little or no
2015 * idea about task granularity. In essence, by offering this
2016 * method, we ask users only about tradeoffs in overhead vs
2017 * expected throughput and its variance, rather than how finely to
2018 * partition tasks.
2019 *
2020 * In a steady state strict (tree-structured) computation, each
2021 * thread makes available for stealing enough tasks for other
2022 * threads to remain active. Inductively, if all threads play by
2023 * the same rules, each thread should make available only a
2024 * constant number of tasks.
2025 *
2026 * The minimum useful constant is just 1. But using a value of 1
2027 * would require immediate replenishment upon each steal to
2028 * maintain enough tasks, which is infeasible. Further,
2029 * partitionings/granularities of offered tasks should minimize
2080 */
2081 private boolean tryTerminate(boolean now, boolean enable) {
2082 int md; // 3 phases: try to set SHUTDOWN, then STOP, then TERMINATED
2083
2084 while (((md = mode) & SHUTDOWN) == 0) {
2085 if (!enable || this == common) // cannot shutdown
2086 return false;
2087 else
2088 MODE.compareAndSet(this, md, md | SHUTDOWN);
2089 }
2090
2091 while (((md = mode) & STOP) == 0) { // try to initiate termination
2092 if (!now) { // check if quiescent & empty
2093 for (long oldSum = 0L;;) { // repeat until stable
2094 boolean running = false;
2095 long checkSum = ctl;
2096 WorkQueue[] ws = workQueues;
2097 if ((md & SMASK) + (int)(checkSum >> RC_SHIFT) > 0)
2098 running = true;
2099 else if (ws != null) {
2100 WorkQueue w; int b;
2101 for (int i = 0; i < ws.length; ++i) {
2102 if ((w = ws[i]) != null) {
2103 checkSum += (b = w.base) + w.id;
2104 if (b != w.top ||
2105 ((i & 1) == 1 && w.source >= 0)) {
2106 running = true;
2107 break;
2108 }
2109 }
2110 }
2111 }
2112 if (((md = mode) & STOP) != 0)
2113 break; // already triggered
2114 else if (running)
2115 return false;
2116 else if (workQueues == ws && oldSum == (oldSum = checkSum))
2117 break;
2118 }
2119 }
2120 if ((md & STOP) == 0)
2121 MODE.compareAndSet(this, md, md | STOP);
2122 }
2123
2124 while (((md = mode) & TERMINATED) == 0) { // help terminate others
2125 for (long oldSum = 0L;;) { // repeat until stable
2126 WorkQueue[] ws; WorkQueue w;
2127 long checkSum = ctl;
2128 if ((ws = workQueues) != null) {
2129 for (int i = 0; i < ws.length; ++i) {
2130 if ((w = ws[i]) != null) {
2131 ForkJoinWorkerThread wt = w.owner;
2132 w.cancelAll(); // clear queues
2133 if (wt != null) {
2134 try { // unblock join or park
2135 wt.interrupt();
2136 } catch (Throwable ignore) {
2137 }
2138 }
2139 checkSum += w.base + w.id;
2140 }
2141 }
2142 }
2143 if (((md = mode) & TERMINATED) != 0 ||
2144 (workQueues == ws && oldSum == (oldSum = checkSum)))
2145 break;
2146 }
2147 if ((md & TERMINATED) != 0)
2148 break;
2149 else if ((md & SMASK) + (short)(ctl >>> TC_SHIFT) > 0)
2150 break;
2151 else if (MODE.compareAndSet(this, md, md | TERMINATED)) {
2152 synchronized (this) {
2153 notifyAll(); // for awaitTermination
2154 }
2155 break;
2156 }
2157 }
2158 return true;
2159 }
2612
2613 /**
2614 * Returns {@code true} if this pool uses local first-in-first-out
2615 * scheduling mode for forked tasks that are never joined.
2616 *
2617 * @return {@code true} if this pool uses async mode
2618 */
2619 public boolean getAsyncMode() {
2620 return (mode & FIFO) != 0;
2621 }
2622
2623 /**
2624 * Returns an estimate of the number of worker threads that are
2625 * not blocked waiting to join tasks or for other managed
2626 * synchronization. This method may overestimate the
2627 * number of running threads.
2628 *
2629 * @return the number of worker threads
2630 */
2631 public int getRunningThreadCount() {
2632 int rc = 0;
2633 WorkQueue[] ws; WorkQueue w;
2634 if ((ws = workQueues) != null) {
2635 for (int i = 1; i < ws.length; i += 2) {
2636 if ((w = ws[i]) != null && w.isApparentlyUnblocked())
2637 ++rc;
2638 }
2639 }
2640 return rc;
2641 }
2642
2643 /**
2644 * Returns an estimate of the number of threads that are currently
2645 * stealing or executing tasks. This method may overestimate the
2646 * number of active threads.
2647 *
2648 * @return the number of active threads
2649 */
2650 public int getActiveThreadCount() {
2651 int r = (mode & SMASK) + (int)(ctl >> RC_SHIFT);
2652 return (r <= 0) ? 0 : r; // suppress momentarily negative values
2653 }
2661 * idleness of all threads, but will eventually become true if
2662 * threads remain inactive.
2663 *
2664 * @return {@code true} if all threads are currently idle
2665 */
2666 public boolean isQuiescent() {
2667 for (;;) {
2668 long c = ctl;
2669 int md = mode, pc = md & SMASK;
2670 int tc = pc + (short)(c >>> TC_SHIFT);
2671 int rc = pc + (int)(c >> RC_SHIFT);
2672 if ((md & (STOP | TERMINATED)) != 0)
2673 return true;
2674 else if (rc > 0)
2675 return false;
2676 else {
2677 WorkQueue[] ws; WorkQueue v;
2678 if ((ws = workQueues) != null) {
2679 for (int i = 1; i < ws.length; i += 2) {
2680 if ((v = ws[i]) != null) {
2681 if ((v.source & QUIET) == 0)
2682 return false;
2683 --tc;
2684 }
2685 }
2686 }
2687 if (tc == 0 && ctl == c)
2688 return true;
2689 }
2690 }
2691 }
2692
2693 /**
2694 * Returns an estimate of the total number of tasks stolen from
2695 * one thread's work queue by another. The reported value
2696 * underestimates the actual total number of steals when the pool
2697 * is not quiescent. This value may be useful for monitoring and
2698 * tuning fork/join programs: in general, steal counts should be
2699 * high enough to keep threads busy, but low enough to avoid
2700 * overhead and contention across threads.
2701 *
2707 if ((ws = workQueues) != null) {
2708 for (int i = 1; i < ws.length; i += 2) {
2709 if ((w = ws[i]) != null)
2710 count += (long)w.nsteals & 0xffffffffL;
2711 }
2712 }
2713 return count;
2714 }
2715
2716 /**
2717 * Returns an estimate of the total number of tasks currently held
2718 * in queues by worker threads (but not including tasks submitted
2719 * to the pool that have not begun executing). This value is only
2720 * an approximation, obtained by iterating across all threads in
2721 * the pool. This method may be useful for tuning task
2722 * granularities.
2723 *
2724 * @return the number of queued tasks
2725 */
2726 public long getQueuedTaskCount() {
2727 long count = 0;
2728 WorkQueue[] ws; WorkQueue w;
2729 if ((ws = workQueues) != null) {
2730 for (int i = 1; i < ws.length; i += 2) {
2731 if ((w = ws[i]) != null)
2732 count += w.queueSize();
2733 }
2734 }
2735 return count;
2736 }
2737
2738 /**
2739 * Returns an estimate of the number of tasks submitted to this
2740 * pool that have not yet begun executing. This method may take
2741 * time proportional to the number of submissions.
2742 *
2743 * @return the number of queued submissions
2744 */
2745 public int getQueuedSubmissionCount() {
2746 int count = 0;
2747 WorkQueue[] ws; WorkQueue w;
2748 if ((ws = workQueues) != null) {
2749 for (int i = 0; i < ws.length; i += 2) {
2750 if ((w = ws[i]) != null)
2751 count += w.queueSize();
2752 }
2753 }
2754 return count;
2755 }
2756
2757 /**
2758 * Returns {@code true} if there are any tasks submitted to this
2759 * pool that have not yet begun executing.
2760 *
2761 * @return {@code true} if there are any queued submissions
2762 */
2763 public boolean hasQueuedSubmissions() {
2764 WorkQueue[] ws; WorkQueue w;
2765 if ((ws = workQueues) != null) {
2766 for (int i = 0; i < ws.length; i += 2) {
2767 if ((w = ws[i]) != null && !w.isEmpty())
2768 return true;
2769 }
2770 }
2771 return false;
2772 }
2773
2774 /**
2775 * Removes and returns the next unexecuted submission if one is
2776 * available. This method may be useful in extensions to this
2777 * class that re-assign work in systems with multiple pools.
2778 *
2779 * @return the next submission, or {@code null} if none
2780 */
2781 protected ForkJoinTask<?> pollSubmission() {
2782 return pollScan(true);
2783 }
2784
2785 /**
2786 * Removes all available unexecuted submitted and forked tasks
2787 * from scheduling queues and adds them to the given collection,
2788 * without altering their execution status. These may include
2789 * artificially generated or wrapped tasks. This method is
2790 * designed to be invoked only when the pool is known to be
2791 * quiescent. Invocations at other times may not remove all
2792 * tasks. A failure encountered while attempting to add elements
2793 * to collection {@code c} may result in elements being in
2794 * neither, either or both collections when the associated
2795 * exception is thrown. The behavior of this operation is
2796 * undefined if the specified collection is modified while the
2797 * operation is in progress.
2798 *
2799 * @param c the collection to transfer elements into
2800 * @return the number of elements transferred
2801 */
2802 protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
2803 int count = 0;
2804 WorkQueue[] ws; WorkQueue w; ForkJoinTask<?> t;
2805 if ((ws = workQueues) != null) {
2806 for (int i = 0; i < ws.length; ++i) {
2807 if ((w = ws[i]) != null) {
2808 while ((t = w.poll()) != null) {
2809 c.add(t);
2810 ++count;
2811 }
2812 }
2813 }
2814 }
2815 return count;
2816 }
2817
2818 /**
2819 * Returns a string identifying this pool, as well as its state,
2820 * including indications of run state, parallelism level, and
2821 * worker and task counts.
2822 *
2823 * @return a string identifying this pool, as well as its state
2824 */
2825 public String toString() {
2826 // Use a single pass through workQueues to collect counts
2827 long qt = 0L, qs = 0L; int rc = 0;
2828 long st = stealCount;
2829 WorkQueue[] ws; WorkQueue w;
2830 if ((ws = workQueues) != null) {
2831 for (int i = 0; i < ws.length; ++i) {
2832 if ((w = ws[i]) != null) {
2833 int size = w.queueSize();
2834 if ((i & 1) == 0)
2835 qs += size;
2836 else {
2837 qt += size;
2838 st += (long)w.nsteals & 0xffffffffL;
2839 if (w.isApparentlyUnblocked())
2840 ++rc;
2841 }
2842 }
2843 }
2844 }
2845
2846 int md = mode;
2847 int pc = (md & SMASK);
2848 long c = ctl;
2849 int tc = pc + (short)(c >>> TC_SHIFT);
2850 int ac = pc + (int)(c >> RC_SHIFT);
2851 if (ac < 0) // ignore transient negative
2852 ac = 0;
2853 String level = ((md & TERMINATED) != 0 ? "Terminated" :
2854 (md & STOP) != 0 ? "Terminating" :
2855 (md & SHUTDOWN) != 0 ? "Shutting down" :
2856 "Running");
2857 return super.toString() +
2858 "[" + level +
2859 ", parallelism = " + pc +
2860 ", size = " + tc +
2861 ", active = " + ac +
2862 ", running = " + rc +
2863 ", steals = " + st +
2864 ", tasks = " + qt +
2865 ", submissions = " + qs +
2866 "]";
2867 }
2868
3114 * {@code blocker.block()} until either method returns {@code true}.
3115 * Every call to {@code blocker.block()} is preceded by a call to
3116 * {@code blocker.isReleasable()} that returned {@code false}.
3117 *
3118 * <p>If not running in a ForkJoinPool, this method is
3119 * behaviorally equivalent to
3120 * <pre> {@code
3121 * while (!blocker.isReleasable())
3122 * if (blocker.block())
3123 * break;}</pre>
3124 *
3125 * If running in a ForkJoinPool, the pool may first be expanded to
3126 * ensure sufficient parallelism available during the call to
3127 * {@code blocker.block()}.
3128 *
3129 * @param blocker the blocker task
3130 * @throws InterruptedException if {@code blocker.block()} did so
3131 */
3132 public static void managedBlock(ManagedBlocker blocker)
3133 throws InterruptedException {
3134 ForkJoinPool p;
3135 ForkJoinWorkerThread wt;
3136 WorkQueue w;
3137 Thread t = Thread.currentThread();
3138 if ((t instanceof ForkJoinWorkerThread) &&
3139 (p = (wt = (ForkJoinWorkerThread)t).pool) != null &&
3140 (w = wt.workQueue) != null) {
3141 int block;
3142 while (!blocker.isReleasable()) {
3143 if ((block = p.tryCompensate(w)) != 0) {
3144 try {
3145 do {} while (!blocker.isReleasable() &&
3146 !blocker.block());
3147 } finally {
3148 CTL.getAndAdd(p, (block > 0) ? RC_UNIT : 0L);
3149 }
3150 break;
3151 }
3152 }
3153 }
3154 else {
3155 do {} while (!blocker.isReleasable() &&
3156 !blocker.block());
3157 }
3158 }
3159
3160 /**
3161 * If the given executor is a ForkJoinPool, poll and execute
3162 * AsynchronousCompletionTasks from worker's queue until none are
3163 * available or blocker is released.
3164 */
3165 static void helpAsyncBlocker(Executor e, ManagedBlocker blocker) {
3166 if (blocker != null && (e instanceof ForkJoinPool)) {
3167 WorkQueue w; ForkJoinWorkerThread wt; WorkQueue[] ws; int r, n;
3168 ForkJoinPool p = (ForkJoinPool)e;
3169 Thread thread = Thread.currentThread();
3170 if (thread instanceof ForkJoinWorkerThread &&
3171 (wt = (ForkJoinWorkerThread)thread).pool == p)
3172 w = wt.workQueue;
3173 else if ((r = ThreadLocalRandom.getProbe()) != 0 &&
3174 (ws = p.workQueues) != null && (n = ws.length) > 0)
3175 w = ws[(n - 1) & r & SQMASK];
3176 else
3177 w = null;
3178 if (w != null) {
3179 for (;;) {
3180 int b = w.base, s = w.top, d, al; ForkJoinTask<?>[] a;
3181 if ((a = w.array) != null && (d = b - s) < 0 &&
3182 (al = a.length) > 0) {
3183 int index = (al - 1) & b;
3184 ForkJoinTask<?> t = (ForkJoinTask<?>)
3185 QA.getAcquire(a, index);
3186 if (blocker.isReleasable())
3187 break;
3188 else if (b++ == w.base) {
3189 if (t == null) {
3190 if (d == -1)
3191 break;
3192 }
3193 else if (!(t instanceof CompletableFuture.
3194 AsynchronousCompletionTask))
3195 break;
3196 else if (QA.compareAndSet(a, index, t, null)) {
3197 w.base = b;
3198 t.doExec();
3199 }
3200 }
3201 }
3202 else
3203 break;
3204 }
3205 }
3206 }
3207 }
3208
3209 // AbstractExecutorService overrides. These rely on undocumented
3210 // fact that ForkJoinTask.adapt returns ForkJoinTasks that also
3211 // implement RunnableFuture.
3212
3213 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
3214 return new ForkJoinTask.AdaptedRunnable<T>(runnable, value);
3215 }
3216
3217 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
3218 return new ForkJoinTask.AdaptedCallable<T>(callable);
3219 }
3220
3221 // VarHandle mechanics
3222 private static final VarHandle CTL;
3223 private static final VarHandle MODE;
3224 private static final VarHandle QA;
3225
3226 static {
3227 try {
3228 MethodHandles.Lookup l = MethodHandles.lookup();
3229 CTL = l.findVarHandle(ForkJoinPool.class, "ctl", long.class);
3230 MODE = l.findVarHandle(ForkJoinPool.class, "mode", int.class);
3231 QA = MethodHandles.arrayElementVarHandle(ForkJoinTask[].class);
3232 } catch (ReflectiveOperationException e) {
3233 throw new Error(e);
3234 }
3235
3236 // Reduce the risk of rare disastrous classloading in first call to
3237 // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
3238 Class<?> ensureLoaded = LockSupport.class;
3239
3240 int commonMaxSpares = DEFAULT_COMMON_MAX_SPARES;
3241 try {
3242 String p = System.getProperty
3243 ("java.util.concurrent.ForkJoinPool.common.maximumSpares");
3244 if (p != null)
3245 commonMaxSpares = Integer.parseInt(p);
3246 } catch (Exception ignore) {}
3247 COMMON_MAX_SPARES = commonMaxSpares;
3248
3249 defaultForkJoinWorkerThreadFactory =
3250 new DefaultForkJoinWorkerThreadFactory();
3251 modifyThreadPermission = new RuntimePermission("modifyThread");
3252
3253 common = AccessController.doPrivileged(new PrivilegedAction<>() {
|
167 * maximum number of running threads to 32767. Attempts to create
168 * pools with greater than the maximum number result in
169 * {@code IllegalArgumentException}.
170 *
171 * <p>This implementation rejects submitted tasks (that is, by throwing
172 * {@link RejectedExecutionException}) only when the pool is shut down
173 * or internal resources have been exhausted.
174 *
175 * @since 1.7
176 * @author Doug Lea
177 */
178 public class ForkJoinPool extends AbstractExecutorService {
179
180 /*
181 * Implementation Overview
182 *
183 * This class and its nested classes provide the main
184 * functionality and control for a set of worker threads:
185 * Submissions from non-FJ threads enter into submission queues.
186 * Workers take these tasks and typically split them into subtasks
187 * that may be stolen by other workers. Work-stealing based on
188 * randomized scans generally leads to better throughput than
189 * "work dealing" in which producers assign tasks to idle threads,
190 * in part because threads that have finished other tasks before
191 * the signalled thread wakes up (which can be a long time) can
192 * take the task instead. Preference rules give first priority to
193 * processing tasks from their own queues (LIFO or FIFO, depending
194 * on mode), then to randomized FIFO steals of tasks in other
195 * queues. This framework began as vehicle for supporting
196 * tree-structured parallelism using work-stealing. Over time,
197 * its scalability advantages led to extensions and changes to
198 * better support more diverse usage contexts. Because most
199 * internal methods and nested classes are interrelated, their
200 * main rationale and descriptions are presented here; individual
201 * methods and nested classes contain only brief comments about
202 * details.
203 *
204 * WorkQueues
205 * ==========
206 *
207 * Most operations occur within work-stealing queues (in nested
208 * class WorkQueue). These are special forms of Deques that
209 * support only three of the four possible end-operations -- push,
210 * pop, and poll (aka steal), under the further constraints that
211 * push and pop are called only from the owning thread (or, as
212 * extended here, under a lock), while poll may be called from
213 * other threads. (If you are unfamiliar with them, you probably
214 * want to read Herlihy and Shavit's book "The Art of
215 * Multiprocessor programming", chapter 16 describing these in
216 * more detail before proceeding.) The main work-stealing queue
217 * design is roughly similar to those in the papers "Dynamic
218 * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
219 * (http://research.sun.com/scalable/pubs/index.html) and
220 * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
221 * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
222 * The main differences ultimately stem from GC requirements that
223 * we null out taken slots as soon as we can, to maintain as small
224 * a footprint as possible even in programs generating huge
225 * numbers of tasks. To accomplish this, we shift the CAS
226 * arbitrating pop vs poll (steal) from being on the indices
227 * ("base" and "top") to the slots themselves.
228 *
229 * Adding tasks then takes the form of a classic array push(task)
230 * in a circular buffer:
231 * q.array[q.top++ % length] = task;
232 *
233 * (The actual code needs to null-check and size-check the array,
234 * uses masking, not mod, for indexing a power-of-two-sized array,
235 * adds a release fence for publication, and possibly signals
236 * waiting workers to start scanning -- see below.) Both a
237 * successful pop and poll mainly entail a CAS of a slot from
238 * non-null to null.
239 *
240 * The pop operation (always performed by owner) is:
241 * if ((the task at top slot is not null) and
242 * (CAS slot to null))
243 * decrement top and return task;
244 *
245 * And the poll operation (usually by a stealer) is
246 * if ((the task at base slot is not null) and
247 * (CAS slot to null))
248 * increment base and return task;
249 *
250 * There are several variants of each of these. Most uses occur
251 * within operations that also interleave contention or emptiness
252 * tracking or inspection of elements before extracting them, so
253 * must interleave these with the above code. When performed by
254 * owner, getAndSet is used instead of CAS (see for example method
255 * nextLocalTask) which is usually more efficient, and possible
256 * because the top index cannot independently change during the
257 * operation.
258 *
259 * Memory ordering. See "Correct and Efficient Work-Stealing for
260 * Weak Memory Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013
261 * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an
262 * analysis of memory ordering requirements in work-stealing
263 * algorithms similar to (but different than) the one used here.
264 * Extracting tasks in array slots via (fully fenced) CAS provides
265 * primary synchronization. The base and top indices imprecisely
266 * guide where to extract from. We do not usually require strict
267 * orderings of array and index updates. Many index accesses use
268 * plain mode, with ordering constrained by surrounding context
269 * (usually with respect to element CASes or the two WorkQueue
270 * volatile fields source and phase). When not otherwise already
271 * constrained, reads of "base" by queue owners use acquire-mode,
272 * and some externally callable methods preface accesses with
273 * acquire fences. Additionally, to ensure that index update
274 * writes are not coalesced or postponed in loops etc, "opaque"
275 * mode is used in a few cases where timely writes are not
276 * otherwise ensured. The "locked" versions of push- and pop-
277 * based methods for shared queues differ from owned versions
278 * because locking already forces some of the ordering.
279 *
280 * Because indices and slot contents cannot always be consistent,
281 * a check that base == top indicates (momentary) emptiness, but
282 * otherwise may err on the side of possibly making the queue
283 * appear nonempty when a push, pop, or poll have not fully
284 * committed, or making it appear empty when an update of top has
285 * not yet been visibly written. (Method isEmpty() checks the
286 * case of a partially completed removal of the last element.)
287 * Because of this, the poll operation, considered individually,
288 * is not wait-free. One thief cannot successfully continue until
289 * another in-progress one (or, if previously empty, a push)
290 * visibly completes. This can stall threads when required to
291 * consume from a given queue (see method poll()). However, in
292 * the aggregate, we ensure at least probabilistic
293 * non-blockingness. If an attempted steal fails, a scanning
294 * thief chooses a different random victim target to try next. So,
295 * in order for one thief to progress, it suffices for any
296 * in-progress poll or new push on any empty queue to complete.
297 *
298 * This approach also enables support of a user mode in which
299 * local task processing is in FIFO, not LIFO order, simply by
300 * using poll rather than pop. This can be useful in
301 * message-passing frameworks in which tasks are never joined.
302 *
303 * WorkQueues are also used in a similar way for tasks submitted
304 * to the pool. We cannot mix these tasks in the same queues used
305 * by workers. Instead, we randomly associate submission queues
306 * with submitting threads, using a form of hashing. The
307 * ThreadLocalRandom probe value serves as a hash code for
308 * choosing existing queues, and may be randomly repositioned upon
309 * contention with other submitters. In essence, submitters act
310 * like workers except that they are restricted to executing local
311 * tasks that they submitted. Insertion of tasks in shared mode
312 * requires a lock but we use only a simple spinlock (using field
313 * phase), because submitters encountering a busy queue move to a
314 * different position to use or create other queues -- they block
315 * only when creating and registering new queues. Because it is
316 * used only as a spinlock, unlocking requires only a "releasing"
317 * store (using setRelease) unless otherwise signalling.
318 *
319 * Management
320 * ==========
321 *
322 * The main throughput advantages of work-stealing stem from
323 * decentralized control -- workers mostly take tasks from
324 * themselves or each other, at rates that can exceed a billion
325 * per second. The pool itself creates, activates (enables
326 * scanning for and running tasks), deactivates, blocks, and
327 * terminates threads, all with minimal central information.
328 * There are only a few properties that we can globally track or
329 * maintain, so we pack them into a small number of variables,
330 * often maintaining atomicity without blocking or locking.
331 * Nearly all essentially atomic control state is held in a few
332 * volatile variables that are by far most often read (not
333 * written) as status and consistency checks. We pack as much
334 * information into them as we can.
335 *
336 * Field "ctl" contains 64 bits holding information needed to
337 * atomically decide to add, enqueue (on an event queue), and
338 * dequeue and release workers. To enable this packing, we
339 * restrict maximum parallelism to (1<<15)-1 (which is far in
340 * excess of normal operating range) to allow ids, counts, and
341 * their negations (used for thresholding) to fit into 16bit
342 * subfields.
343 *
344 * Field "mode" holds configuration parameters as well as lifetime
345 * status, atomically and monotonically setting SHUTDOWN, STOP,
346 * and finally TERMINATED bits.
347 *
348 * Field "workQueues" holds references to WorkQueues. It is
349 * updated (only during worker creation and termination) under
350 * lock (using field workerNamePrefix as lock), but is otherwise
351 * concurrently readable, and accessed directly. We also ensure
352 * that uses of the array reference itself never become too stale
353 * in case of resizing, by arranging that (re-)reads are separated
354 * by at least one acquiring read access. To simplify index-based
355 * operations, the array size is always a power of two, and all
356 * readers must tolerate null slots. Worker queues are at odd
357 * indices. Shared (submission) queues are at even indices, up to
358 * a maximum of 64 slots, to limit growth even if the array needs
359 * to expand to add more workers. Grouping them together in this
360 * way simplifies and speeds up task scanning.
361 *
362 * All worker thread creation is on-demand, triggered by task
363 * submissions, replacement of terminated workers, and/or
364 * compensation for blocked workers. However, all other support
365 * code is set up to work with other policies. To ensure that we
366 * do not hold on to worker references that would prevent GC, all
367 * accesses to workQueues are via indices into the workQueues
368 * array (which is one source of some of the messy code
369 * constructions here). In essence, the workQueues array serves as
370 * a weak reference mechanism. Thus for example the stack top
371 * subfield of ctl stores indices, not references.
372 *
373 * Queuing Idle Workers. Unlike HPC work-stealing frameworks, we
374 * cannot let workers spin indefinitely scanning for tasks when
375 * none can be found immediately, and we cannot start/resume
376 * workers unless there appear to be tasks available. On the
377 * other hand, we must quickly prod them into action when new
378 * tasks are submitted or generated. In many usages, ramp-up time
379 * is the main limiting factor in overall performance, which is
380 * compounded at program start-up by JIT compilation and
418 * exception is propagated, generally to some external caller.
419 * Worker index assignment avoids the bias in scanning that would
420 * occur if entries were sequentially packed starting at the front
421 * of the workQueues array. We treat the array as a simple
422 * power-of-two hash table, expanding as needed. The seedIndex
423 * increment ensures no collisions until a resize is needed or a
424 * worker is deregistered and replaced, and thereafter keeps
425 * probability of collision low. We cannot use
426 * ThreadLocalRandom.getProbe() for similar purposes here because
427 * the thread has not started yet, but do so for creating
428 * submission queues for existing external threads (see
429 * externalPush).
430 *
431 * WorkQueue field "phase" is used by both workers and the pool to
432 * manage and track whether a worker is UNSIGNALLED (possibly
433 * blocked waiting for a signal). When a worker is enqueued its
434 * phase field is set. Note that phase field updates lag queue CAS
435 * releases so usage requires care -- seeing a negative phase does
436 * not guarantee that the worker is available. When queued, the
437 * lower 16 bits of scanState must hold its pool index. So we
438 * place the index there upon initialization and otherwise keep it
439 * there or restore it when necessary.
440 *
441 * The ctl field also serves as the basis for memory
442 * synchronization surrounding activation. This uses a more
443 * efficient version of a Dekker-like rule that task producers and
444 * consumers sync with each other by both writing/CASing ctl (even
445 * if to its current value). This would be extremely costly. So
446 * we relax it in several ways: (1) Producers only signal when
447 * their queue is possibly empty at some point during a push
448 * operation (which requires conservatively checking size zero or
449 * one to cover races). (2) Other workers propagate this signal
450 * when they find tasks in a queue with size greater than one. (3)
451 * Workers only enqueue after scanning (see below) and not finding
452 * any tasks. (4) Rather than CASing ctl to its current value in
453 * the common case where no action is required, we reduce write
454 * contention by equivalently prefacing signalWork when called by
455 * an external task producer using a memory access with
456 * full-volatile semantics or a "fullFence".
457 *
458 * Almost always, too many signals are issued, in part because a
459 * task producer cannot tell if some existing worker is in the
460 * midst of finishing one task (or already scanning) and ready to
461 * take another without being signalled. So the producer might
462 * instead activate a different worker that does not find any
463 * work, and then inactivates. This scarcely matters in
464 * steady-state computations involving all workers, but can create
465 * contention and bookkeeping bottlenecks during ramp-up,
466 * ramp-down, and small computations involving only a few workers.
467 *
468 * Scanning. Method scan (from runWorker) performs top-level
469 * scanning for tasks. (Similar scans appear in helpQuiesce and
470 * pollScan.) Each scan traverses and tries to poll from each
471 * queue starting at a random index. Scans are not performed in
472 * ideal random permutation order, to reduce cacheline
473 * contention. The pseudorandom generator need not have
474 * high-quality statistical properties in the long term, but just
475 * within computations; We use Marsaglia XorShifts (often via
476 * ThreadLocalRandom.nextSecondarySeed), which are cheap and
477 * suffice. Scanning also includes contention reduction: When
478 * scanning workers fail to extract an apparently existing task,
479 * they soon restart at a different pseudorandom index. This form
480 * of backoff improves throughput when many threads are trying to
481 * take tasks from few queues, which can be common in some usages.
482 * Scans do not otherwise explicitly take into account core
483 * affinities, loads, cache localities, etc, However, they do
484 * exploit temporal locality (which usually approximates these) by
485 * preferring to re-poll from the same queue after a successful
486 * poll before trying others (see method topLevelExec). However
487 * this preference is bounded (see TOP_BOUND_SHIFT) as a safeguard
488 * against infinitely unfair looping under unbounded user task
489 * recursion, and also to reduce long-term contention when many
490 * threads poll few queues holding many small tasks. The bound is
491 * high enough to avoid much impact on locality and scheduling
492 * overhead.
493 *
494 * Trimming workers. To release resources after periods of lack of
495 * use, a worker starting to wait when the pool is quiescent will
496 * time out and terminate (see method runWorker) if the pool has
497 * remained quiescent for period given by field keepAlive.
498 *
499 * Shutdown and Termination. A call to shutdownNow invokes
500 * tryTerminate to atomically set a runState bit. The calling
501 * thread, as well as every other worker thereafter terminating,
502 * helps terminate others by cancelling their unprocessed tasks,
503 * and waking them up, doing so repeatedly until stable. Calls to
504 * non-abrupt shutdown() preface this by checking whether
505 * termination should commence by sweeping through queues (until
506 * stable) to ensure lack of in-flight submissions and workers
507 * about to process them before triggering the "STOP" phase of
508 * termination.
509 *
510 * Joining Tasks
511 * =============
512 *
513 * Any of several actions may be taken when one worker is waiting
514 * to join a task stolen (or always held) by another. Because we
515 * are multiplexing many tasks on to a pool of workers, we can't
516 * always just let them block (as in Thread.join). We also cannot
544 * actively joined task. Thus, the joiner executes a task that
545 * would be on its own local deque if the to-be-joined task had
546 * not been stolen. This is a conservative variant of the approach
547 * described in Wagner & Calder "Leapfrogging: a portable
548 * technique for implementing efficient futures" SIGPLAN Notices,
549 * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs
550 * mainly in that we only record queue ids, not full dependency
551 * links. This requires a linear scan of the workQueues array to
552 * locate stealers, but isolates cost to when it is needed, rather
553 * than adding to per-task overhead. Searches can fail to locate
554 * stealers GC stalls and the like delay recording sources.
555 * Further, even when accurately identified, stealers might not
556 * ever produce a task that the joiner can in turn help with. So,
557 * compensation is tried upon failure to find tasks to run.
558 *
559 * Compensation does not by default aim to keep exactly the target
560 * parallelism number of unblocked threads running at any given
561 * time. Some previous versions of this class employed immediate
562 * compensations for any blocked join. However, in practice, the
563 * vast majority of blockages are transient byproducts of GC and
564 * other JVM or OS activities that are made worse by replacement
565 * when they cause longer-term oversubscription. Rather than
566 * impose arbitrary policies, we allow users to override the
567 * default of only adding threads upon apparent starvation. The
568 * compensation mechanism may also be bounded. Bounds for the
569 * commonPool (see COMMON_MAX_SPARES) better enable JVMs to cope
570 * with programming errors and abuse before running out of
571 * resources to do so.
572 *
573 * Common Pool
574 * ===========
575 *
576 * The static common pool always exists after static
577 * initialization. Since it (or any other created pool) need
578 * never be used, we minimize initial construction overhead and
579 * footprint to the setup of about a dozen fields.
580 *
581 * When external threads submit to the common pool, they can
582 * perform subtask processing (see externalHelpComplete and
583 * related methods) upon joins. This caller-helps policy makes it
584 * sensible to set common pool parallelism level to one (or more)
585 * less than the total number of available cores, or even zero for
586 * pure caller-runs. We do not need to record whether external
587 * submissions are to the common pool -- if not, external help
588 * methods return quickly. These submitters would otherwise be
589 * blocked waiting for completion, so the extra effort (with
590 * liberally sprinkled task status checks) in inapplicable cases
591 * amounts to an odd form of limited spin-wait before blocking in
592 * ForkJoinTask.join.
593 *
594 * As a more appropriate default in managed environments, unless
595 * overridden by system properties, we use workers of subclass
596 * InnocuousForkJoinWorkerThread when there is a SecurityManager
597 * present. These workers have no permissions set, do not belong
598 * to any user-defined ThreadGroup, and erase all ThreadLocals
599 * after executing any top-level task (see
600 * WorkQueue.afterTopLevelExec). The associated mechanics (mainly
601 * in ForkJoinWorkerThread) may be JVM-dependent and must access
602 * particular Thread class fields to achieve this effect.
603 *
604 * Memory placement
605 * ================
606 *
607 * Performance can be very sensitive to placement of instances of
608 * ForkJoinPool and WorkQueues and their queue arrays. To reduce
609 * false-sharing impact, the @Contended annotation isolates
610 * adjacent WorkQueue instances, as well as the ForkJoinPool.ctl
611 * field. WorkQueue arrays are allocated (by their threads) with
612 * larger initial sizes than most ever need, mostly to reduce
613 * false sharing with current garbage collectors that use cardmark
614 * tables.
615 *
616 * Style notes
617 * ===========
618 *
619 * Memory ordering relies mainly on VarHandles. This can be
620 * awkward and ugly, but also reflects the need to control
621 * outcomes across the unusual cases that arise in very racy code
622 * with very few invariants. All fields are read into locals
623 * before use, and null-checked if they are references. Array
624 * accesses using masked indices include checks (that are always
625 * true) that the array length is non-zero to avoid compilers
626 * inserting more expensive traps. This is usually done in a
627 * "C"-like style of listing declarations at the heads of methods
628 * or blocks, and using inline assignments on first encounter.
629 * Nearly all explicit checks lead to bypass/return, not exception
630 * throws, because they may legitimately arise due to
631 * cancellation/revocation during shutdown.
632 *
633 * There is a lot of representation-level coupling among classes
634 * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask. The
635 * fields of WorkQueue maintain data structures managed by
636 * ForkJoinPool, so are directly accessed. There is little point
637 * trying to reduce this, since any associated future changes in
638 * representations will need to be accompanied by algorithmic
639 * changes anyway. Several methods intrinsically sprawl because
640 * they must accumulate sets of consistent reads of fields held in
641 * local variables. Some others are artificially broken up to
642 * reduce producer/consumer imbalances due to dynamic compilation.
643 * There are also other coding oddities (including several
644 * unnecessary-looking hoisted null checks) that help some methods
645 * perform reasonably even when interpreted (not compiled).
646 *
647 * The order of declarations in this file is (with a few exceptions):
648 * (1) Static utility functions
649 * (2) Nested (static) classes
650 * (3) Static fields
651 * (4) Fields, along with constants used when unpacking some of them
652 * (5) Internal control methods
653 * (6) Callbacks and other support for ForkJoinTask methods
654 * (7) Exported methods
655 * (8) Static block initializing statics in minimally dependent order
656 */
657
658 // Static utilities
659
660 /**
661 * If there is a security manager, makes sure caller has
662 * permission to modify threads.
663 */
664 private static void checkPermission() {
665 SecurityManager security = System.getSecurityManager();
729 static final int SWIDTH = 16; // width of short
730 static final int SMASK = 0xffff; // short bits == max index
731 static final int MAX_CAP = 0x7fff; // max #workers - 1
732 static final int SQMASK = 0x007e; // max 64 (even) slots
733
734 // Masks and units for WorkQueue.phase and ctl sp subfield
735 static final int UNSIGNALLED = 1 << 31; // must be negative
736 static final int SS_SEQ = 1 << 16; // version count
737 static final int QLOCK = 1; // must be 1
738
739 // Mode bits and sentinels, some also used in WorkQueue id and.source fields
740 static final int OWNED = 1; // queue has owner thread
741 static final int FIFO = 1 << 16; // fifo queue or access mode
742 static final int SHUTDOWN = 1 << 18;
743 static final int TERMINATED = 1 << 19;
744 static final int STOP = 1 << 31; // must be negative
745 static final int QUIET = 1 << 30; // not scanning or working
746 static final int DORMANT = QUIET | UNSIGNALLED;
747
748 /**
749 * Initial capacity of work-stealing queue array.
750 * Must be a power of two, at least 2.
751 */
752 static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
753
754 /**
755 * Maximum capacity for queue arrays. Must be a power of two less
756 * than or equal to 1 << (31 - width of array entry) to ensure
757 * lack of wraparound of index calculations, but defined to a
758 * value a bit less than this to help users trap runaway programs
759 * before saturating systems.
760 */
761 static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
762
763 /**
764 * The maximum number of top-level polls per worker before
765 * checking other queues, expressed as a bit shift to, in effect,
766 * multiply by pool size, and then use as random value mask, so
767 * average bound is about poolSize*(1<<TOP_BOUND_SHIFT). See
768 * above for rationale.
769 */
770 static final int TOP_BOUND_SHIFT = 10;
771
772 /**
773 * Queues supporting work-stealing as well as external task
774 * submission. See above for descriptions and algorithms.
775 */
776 @jdk.internal.vm.annotation.Contended
777 static final class WorkQueue {
778 volatile int source; // source queue id, or sentinel
779 int id; // pool index, mode, tag
780 int base; // index of next slot for poll
781 int top; // index of next slot for push
782 volatile int phase; // versioned, negative: queued, 1: locked
783 int stackPred; // pool stack (ctl) predecessor link
784 int nsteals; // number of steals
785 ForkJoinTask<?>[] array; // the queued tasks; power of 2 size
786 final ForkJoinPool pool; // the containing pool (may be null)
787 final ForkJoinWorkerThread owner; // owning thread or null if shared
788
789 WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) {
790 this.pool = pool;
791 this.owner = owner;
792 // Place indices in the center of array (that is not yet allocated)
793 base = top = INITIAL_QUEUE_CAPACITY >>> 1;
794 }
795
796 /**
797 * Tries to lock shared queue by CASing phase field.
798 */
799 final boolean tryLockPhase() {
800 return PHASE.compareAndSet(this, 0, 1);
801 }
802
803 final void releasePhaseLock() {
804 PHASE.setRelease(this, 0);
805 }
806
807 /**
808 * Returns an exportable index (used by ForkJoinWorkerThread).
809 */
810 final int getPoolIndex() {
811 return (id & 0xffff) >>> 1; // ignore odd/even tag bit
812 }
813
814 /**
815 * Returns the approximate number of tasks in the queue.
816 */
817 final int queueSize() {
818 int n = (int)BASE.getAcquire(this) - top;
819 return (n >= 0) ? 0 : -n; // ignore transient negative
820 }
821
822 /**
823 * Provides a more accurate estimate of whether this queue has
824 * any tasks than does queueSize, by checking whether a
825 * near-empty queue has at least one unclaimed task.
826 */
827 final boolean isEmpty() {
828 ForkJoinTask<?>[] a; int n, cap, b;
829 VarHandle.acquireFence(); // needed by external callers
830 return ((n = (b = base) - top) >= 0 || // possibly one task
831 (n == -1 && ((a = array) == null ||
832 (cap = a.length) == 0 ||
833 a[(cap - 1) & b] == null)));
834 }
835
836 /**
837 * Pushes a task. Call only by owner in unshared queues.
838 *
839 * @param task the task. Caller must ensure non-null.
840 * @throws RejectedExecutionException if array cannot be resized
841 */
842 final void push(ForkJoinTask<?> task) {
843 ForkJoinTask<?>[] a;
844 int s = top, d, cap, m;
845 ForkJoinPool p = pool;
846 if ((a = array) != null && (cap = a.length) > 0) {
847 QA.setRelease(a, (m = cap - 1) & s, task);
848 top = s + 1;
849 if (((d = s - (int)BASE.getAcquire(this)) & ~1) == 0 &&
850 p != null) { // size 0 or 1
851 VarHandle.fullFence();
852 p.signalWork();
853 }
854 else if (d == m)
855 growArray(false);
856 }
857 }
858
859 /**
860 * Version of push for shared queues. Call only with phase lock held.
861 * @return true if should signal work
862 */
863 final boolean lockedPush(ForkJoinTask<?> task) {
864 ForkJoinTask<?>[] a;
865 boolean signal = false;
866 int s = top, b = base, cap, d;
867 if ((a = array) != null && (cap = a.length) > 0) {
868 a[(cap - 1) & s] = task;
869 top = s + 1;
870 if (b - s + cap - 1 == 0)
871 growArray(true);
872 else {
873 phase = 0; // full volatile unlock
874 if (((s - base) & ~1) == 0) // size 0 or 1
875 signal = true;
876 }
877 }
878 return signal;
879 }
880
881 /**
882 * Doubles the capacity of array. Call either by owner or with
883 * lock held -- it is OK for base, but not top, to move while
884 * resizings are in progress.
885 */
886 final void growArray(boolean locked) {
887 ForkJoinTask<?>[] newA = null;
888 try {
889 ForkJoinTask<?>[] oldA; int oldSize, newSize;
890 if ((oldA = array) != null && (oldSize = oldA.length) > 0 &&
891 (newSize = oldSize << 1) <= MAXIMUM_QUEUE_CAPACITY &&
892 newSize > 0) {
893 try {
894 newA = new ForkJoinTask<?>[newSize];
895 } catch (OutOfMemoryError ex) {
896 }
897 if (newA != null) { // poll from old array, push to new
898 int oldMask = oldSize - 1, newMask = newSize - 1;
899 for (int s = top - 1, k = oldMask; k >= 0; --k) {
900 ForkJoinTask<?> x = (ForkJoinTask<?>)
901 QA.getAndSet(oldA, s & oldMask, null);
902 if (x != null)
903 newA[s-- & newMask] = x;
904 else
905 break;
906 }
907 array = newA;
908 VarHandle.releaseFence();
909 }
910 }
911 } finally {
912 if (locked)
913 phase = 0;
914 }
915 if (newA == null)
916 throw new RejectedExecutionException("Queue capacity exceeded");
917 }
918
919 /**
920 * Takes next task, if one exists, in FIFO order.
921 */
922 final ForkJoinTask<?> poll() {
923 int b, k, cap; ForkJoinTask<?>[] a;
924 while ((a = array) != null && (cap = a.length) > 0 &&
925 top - (b = base) > 0) {
926 ForkJoinTask<?> t = (ForkJoinTask<?>)
927 QA.getAcquire(a, k = (cap - 1) & b);
928 if (base == b++) {
929 if (t == null)
930 Thread.yield(); // await index advance
931 else if (QA.compareAndSet(a, k, t, null)) {
932 BASE.setOpaque(this, b);
933 return t;
934 }
935 }
936 }
937 return null;
938 }
939
940 /**
941 * Takes next task, if one exists, in order specified by mode.
942 */
943 final ForkJoinTask<?> nextLocalTask() {
944 ForkJoinTask<?> t = null;
945 int md = id, b, s, d, cap; ForkJoinTask<?>[] a;
946 if ((a = array) != null && (cap = a.length) > 0 &&
947 (d = (s = top) - (b = base)) > 0) {
948 if ((md & FIFO) == 0 || d == 1) {
949 if ((t = (ForkJoinTask<?>)
950 QA.getAndSet(a, (cap - 1) & --s, null)) != null)
951 TOP.setOpaque(this, s);
952 }
953 else if ((t = (ForkJoinTask<?>)
954 QA.getAndSet(a, (cap - 1) & b++, null)) != null) {
955 BASE.setOpaque(this, b);
956 }
957 else // on contention in FIFO mode, use regular poll
958 t = poll();
959 }
960 return t;
961 }
962
963 /**
964 * Returns next task, if one exists, in order specified by mode.
965 */
966 final ForkJoinTask<?> peek() {
967 int cap; ForkJoinTask<?>[] a;
968 return ((a = array) != null && (cap = a.length) > 0) ?
969 a[(cap - 1) & ((id & FIFO) != 0 ? base : top - 1)] : null;
970 }
971
972 /**
973 * Pops the given task only if it is at the current top.
974 */
975 final boolean tryUnpush(ForkJoinTask<?> task) {
976 boolean popped = false;
977 int s, cap; ForkJoinTask<?>[] a;
978 if ((a = array) != null && (cap = a.length) > 0 &&
979 (s = top) != base &&
980 (popped = QA.compareAndSet(a, (cap - 1) & --s, task, null)))
981 TOP.setOpaque(this, s);
982 return popped;
983 }
984
985 /**
986 * Shared version of tryUnpush.
987 */
988 final boolean tryLockedUnpush(ForkJoinTask<?> task) {
989 boolean popped = false;
990 int s = top - 1, k, cap; ForkJoinTask<?>[] a;
991 if ((a = array) != null && (cap = a.length) > 0 &&
992 a[k = (cap - 1) & s] == task && tryLockPhase()) {
993 if (top == s + 1 && array == a &&
994 (popped = QA.compareAndSet(a, k, task, null)))
995 top = s;
996 releasePhaseLock();
997 }
998 return popped;
999 }
1000
1001 /**
1002 * Removes and cancels all known tasks, ignoring any exceptions.
1003 */
1004 final void cancelAll() {
1005 for (ForkJoinTask<?> t; (t = poll()) != null; )
1006 ForkJoinTask.cancelIgnoringExceptions(t);
1007 }
1008
1009 // Specialized execution methods
1010
1011 /**
1012 * Runs the given (stolen) task if nonnull, as well as
1013 * remaining local tasks and others available from the given
1014 * queue, up to bound n (to avoid infinite unfairness).
1015 */
1016 final void topLevelExec(ForkJoinTask<?> t, WorkQueue q, int n) {
1017 if (t != null && q != null) { // hoist checks
1018 int nstolen = 1;
1019 for (;;) {
1020 t.doExec();
1021 if (n-- < 0)
1022 break;
1023 else if ((t = nextLocalTask()) == null) {
1024 if ((t = q.poll()) == null)
1025 break;
1026 else
1027 ++nstolen;
1028 }
1029 }
1030 ForkJoinWorkerThread thread = owner;
1031 nsteals += nstolen;
1032 source = 0;
1033 if (thread != null)
1034 thread.afterTopLevelExec();
1035 }
1036 }
1037
1038 /**
1039 * If present, removes task from queue and executes it.
1040 */
1041 final void tryRemoveAndExec(ForkJoinTask<?> task) {
1042 ForkJoinTask<?>[] a; int s, cap;
1043 if ((a = array) != null && (cap = a.length) > 0 &&
1044 (s = top) - base > 0) { // traverse from top
1045 for (int m = cap - 1, ns = s - 1, i = ns; ; --i) {
1046 int index = i & m;
1047 ForkJoinTask<?> t = (ForkJoinTask<?>)QA.get(a, index);
1048 if (t == null)
1049 break;
1050 else if (t == task) {
1051 if (QA.compareAndSet(a, index, t, null)) {
1052 top = ns; // safely shift down
1053 for (int j = i; j != ns; ++j) {
1054 ForkJoinTask<?> f;
1055 int pindex = (j + 1) & m;
1056 f = (ForkJoinTask<?>)QA.get(a, pindex);
1057 QA.setVolatile(a, pindex, null);
1058 int jindex = j & m;
1059 QA.setRelease(a, jindex, f);
1060 }
1061 VarHandle.releaseFence();
1062 t.doExec();
1063 }
1064 break;
1065 }
1066 }
1067 }
1068 }
1069
1070 /**
1071 * Tries to pop and run tasks within the target's computation
1072 * until done, not found, or limit exceeded.
1073 *
1074 * @param task root of CountedCompleter computation
1075 * @param limit max runs, or zero for no limit
1076 * @param shared true if must lock to extract task
1077 * @return task status on exit
1078 */
1079 final int helpCC(CountedCompleter<?> task, int limit, boolean shared) {
1080 int status = 0;
1081 if (task != null && (status = task.status) >= 0) {
1082 int s, k, cap; ForkJoinTask<?>[] a;
1083 while ((a = array) != null && (cap = a.length) > 0 &&
1084 (s = top) - base > 0) {
1085 CountedCompleter<?> v = null;
1086 ForkJoinTask<?> o = a[k = (cap - 1) & (s - 1)];
1087 if (o instanceof CountedCompleter) {
1088 CountedCompleter<?> t = (CountedCompleter<?>)o;
1089 for (CountedCompleter<?> f = t;;) {
1090 if (f != task) {
1091 if ((f = f.completer) == null)
1092 break;
1093 }
1094 else if (shared) {
1095 if (tryLockPhase()) {
1096 if (top == s && array == a &&
1097 QA.compareAndSet(a, k, t, null)) {
1098 top = s - 1;
1099 v = t;
1100 }
1101 releasePhaseLock();
1102 }
1103 break;
1104 }
1105 else {
1106 if (QA.compareAndSet(a, k, t, null)) {
1107 top = s - 1;
1108 v = t;
1109 }
1110 break;
1111 }
1112 }
1113 }
1114 if (v != null)
1115 v.doExec();
1116 if ((status = task.status) < 0 || v == null ||
1117 (limit != 0 && --limit == 0))
1118 break;
1119 }
1120 }
1121 return status;
1122 }
1123
1124 /**
1125 * Tries to poll and run AsynchronousCompletionTasks until
1126 * none found or blocker is released
1127 *
1128 * @param blocker the blocker
1129 */
1130 final void helpAsyncBlocker(ManagedBlocker blocker) {
1131 if (blocker != null) {
1132 int b, k, cap; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
1133 while ((a = array) != null && (cap = a.length) > 0 &&
1134 top - (b = base) > 0) {
1135 t = (ForkJoinTask<?>)QA.getAcquire(a, k = (cap - 1) & b);
1136 if (blocker.isReleasable())
1137 break;
1138 else if (base == b++ && t != null) {
1139 if (!(t instanceof CompletableFuture.
1140 AsynchronousCompletionTask))
1141 break;
1142 else if (QA.compareAndSet(a, k, t, null)) {
1143 BASE.setOpaque(this, b);
1144 t.doExec();
1145 }
1146 }
1147 }
1148 }
1149 }
1150
1151 /**
1152 * Returns true if owned and not known to be blocked.
1153 */
1154 final boolean isApparentlyUnblocked() {
1155 Thread wt; Thread.State s;
1156 return ((wt = owner) != null &&
1157 (s = wt.getState()) != Thread.State.BLOCKED &&
1158 s != Thread.State.WAITING &&
1159 s != Thread.State.TIMED_WAITING);
1160 }
1161
1162 // VarHandle mechanics.
1163 static final VarHandle PHASE;
1164 static final VarHandle BASE;
1165 static final VarHandle TOP;
1166 static {
1167 try {
1168 MethodHandles.Lookup l = MethodHandles.lookup();
1169 PHASE = l.findVarHandle(WorkQueue.class, "phase", int.class);
1170 BASE = l.findVarHandle(WorkQueue.class, "base", int.class);
1171 TOP = l.findVarHandle(WorkQueue.class, "top", int.class);
1172 } catch (ReflectiveOperationException e) {
1173 throw new ExceptionInInitializerError(e);
1174 }
1175 }
1176 }
1177
1178 // static fields (initialized in static initializer below)
1179
1180 /**
1181 * Creates a new ForkJoinWorkerThread. This factory is used unless
1182 * overridden in ForkJoinPool constructors.
1183 */
1184 public static final ForkJoinWorkerThreadFactory
1185 defaultForkJoinWorkerThreadFactory;
1186
1187 /**
1188 * Permission required for callers of methods that may start or
1189 * kill threads.
1190 */
1191 static final RuntimePermission modifyThreadPermission;
1192
1193 /**
1350 (TC_MASK & (c + TC_UNIT)));
1351 if (ctl == c && CTL.compareAndSet(this, c, nc)) {
1352 createWorker();
1353 break;
1354 }
1355 } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
1356 }
1357
1358 /**
1359 * Callback from ForkJoinWorkerThread constructor to establish and
1360 * record its WorkQueue.
1361 *
1362 * @param wt the worker thread
1363 * @return the worker's queue
1364 */
1365 final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
1366 UncaughtExceptionHandler handler;
1367 wt.setDaemon(true); // configure thread
1368 if ((handler = ueh) != null)
1369 wt.setUncaughtExceptionHandler(handler);
1370 int tid = 0; // for thread name
1371 int idbits = mode & FIFO;
1372 String prefix = workerNamePrefix;
1373 WorkQueue w = new WorkQueue(this, wt);
1374 if (prefix != null) {
1375 synchronized (prefix) {
1376 WorkQueue[] ws = workQueues; int n;
1377 int s = indexSeed += SEED_INCREMENT;
1378 idbits |= (s & ~(SMASK | FIFO | DORMANT));
1379 if (ws != null && (n = ws.length) > 1) {
1380 int m = n - 1;
1381 tid = m & ((s << 1) | 1); // odd-numbered indices
1382 for (int probes = n >>> 1;;) { // find empty slot
1383 WorkQueue q;
1384 if ((q = ws[tid]) == null || q.phase == QUIET)
1385 break;
1386 else if (--probes == 0) {
1387 tid = n | 1; // resize below
1388 break;
1389 }
1390 else
1391 tid = (tid + 2) & m;
1392 }
1393 w.phase = w.id = tid | idbits; // now publishable
1394
1395 if (tid < n)
1396 ws[tid] = w;
1397 else { // expand array
1398 int an = n << 1;
1399 WorkQueue[] as = new WorkQueue[an];
1400 as[tid] = w;
1401 int am = an - 1;
1402 for (int j = 0; j < n; ++j) {
1403 WorkQueue v; // copy external queue
1404 if ((v = ws[j]) != null) // position may change
1405 as[v.id & am & SQMASK] = v;
1406 if (++j >= n)
1407 break;
1408 as[j] = ws[j]; // copy worker
1409 }
1410 workQueues = as;
1411 }
1412 }
1413 }
1414 wt.setName(prefix.concat(Integer.toString(tid)));
1415 }
1416 return w;
1417 }
1418
1419 /**
1420 * Final callback from terminating worker, as well as upon failure
1421 * to construct or start a worker. Removes record of worker from
1422 * array, and adjusts counts. If pool is shutting down, tries to
1423 * complete termination.
1424 *
1425 * @param wt the worker thread, or null if construction failed
1426 * @param ex the exception causing failure, or null if none
1427 */
1428 final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
1429 WorkQueue w = null;
1430 int phase = 0;
1431 if (wt != null && (w = wt.workQueue) != null) {
1432 Object lock = workerNamePrefix;
1433 int wid = w.id;
1434 long ns = (long)w.nsteals & 0xffffffffL;
1435 if (lock != null) {
1436 synchronized (lock) {
1437 WorkQueue[] ws; int n, i; // remove index from array
1438 if ((ws = workQueues) != null && (n = ws.length) > 0 &&
1439 ws[i = wid & (n - 1)] == w)
1440 ws[i] = null;
1441 stealCount += ns;
1442 }
1443 }
1444 phase = w.phase;
1445 }
1446 if (phase != QUIET) { // else pre-adjusted
1447 long c; // decrement counts
1448 do {} while (!CTL.weakCompareAndSet
1449 (this, c = ctl, ((RC_MASK & (c - RC_UNIT)) |
1450 (TC_MASK & (c - TC_UNIT)) |
1451 (SP_MASK & c))));
1452 }
1453 if (w != null)
1454 w.cancelAll(); // cancel remaining tasks
1455
1456 if (!tryTerminate(false, false) && // possibly replace worker
1457 w != null && w.array != null) // avoid repeated failures
1458 signalWork();
1459
1460 if (ex == null) // help clean on way out
1472 if ((c = ctl) >= 0L) // enough workers
1473 break;
1474 else if ((sp = (int)c) == 0) { // no idle workers
1475 if ((c & ADD_WORKER) != 0L) // too few workers
1476 tryAddWorker(c);
1477 break;
1478 }
1479 else if ((ws = workQueues) == null)
1480 break; // unstarted/terminated
1481 else if (ws.length <= (i = sp & SMASK))
1482 break; // terminated
1483 else if ((v = ws[i]) == null)
1484 break; // terminating
1485 else {
1486 int np = sp & ~UNSIGNALLED;
1487 int vp = v.phase;
1488 long nc = (v.stackPred & SP_MASK) | (UC_MASK & (c + RC_UNIT));
1489 Thread vt = v.owner;
1490 if (sp == vp && CTL.compareAndSet(this, c, nc)) {
1491 v.phase = np;
1492 if (vt != null && v.source < 0)
1493 LockSupport.unpark(vt);
1494 break;
1495 }
1496 }
1497 }
1498 }
1499
1500 /**
1501 * Tries to decrement counts (sometimes implicitly) and possibly
1502 * arrange for a compensating worker in preparation for blocking:
1503 * If not all core workers yet exist, creates one, else if any are
1504 * unreleased (possibly including caller) releases one, else if
1505 * fewer than the minimum allowed number of workers running,
1506 * checks to see that they are all active, and if so creates an
1507 * extra worker unless over maximum limit and policy is to
1508 * saturate. Most of these steps can fail due to interference, in
1509 * which case 0 is returned so caller will retry. A negative
1510 * return value indicates that the caller doesn't need to
1511 * re-adjust counts when later unblocked.
1512 *
1513 * @return 1: block then adjust, -1: block without adjust, 0 : retry
1514 */
1515 private int tryCompensate(WorkQueue w) {
1516 int t, n, sp;
1517 long c = ctl;
1518 WorkQueue[] ws = workQueues;
1519 if ((t = (short)(c >>> TC_SHIFT)) >= 0) {
1520 if (ws == null || (n = ws.length) <= 0 || w == null)
1521 return 0; // disabled
1522 else if ((sp = (int)c) != 0) { // replace or release
1523 WorkQueue v = ws[sp & (n - 1)];
1524 int wp = w.phase;
1525 long uc = UC_MASK & ((wp < 0) ? c + RC_UNIT : c);
1526 int np = sp & ~UNSIGNALLED;
1527 if (v != null) {
1528 int vp = v.phase;
1529 Thread vt = v.owner;
1530 long nc = ((long)v.stackPred & SP_MASK) | uc;
1531 if (vp == sp && CTL.compareAndSet(this, c, nc)) {
1532 v.phase = np;
1533 if (vt != null && v.source < 0)
1534 LockSupport.unpark(vt);
1535 return (wp < 0) ? -1 : 1;
1536 }
1537 }
1538 return 0;
1539 }
1540 else if ((int)(c >> RC_SHIFT) - // reduce parallelism
1541 (short)(bounds & SMASK) > 0) {
1542 long nc = ((RC_MASK & (c - RC_UNIT)) | (~RC_MASK & c));
1543 return CTL.compareAndSet(this, c, nc) ? 1 : 0;
1544 }
1545 else { // validate
1546 int md = mode, pc = md & SMASK, tc = pc + t, bc = 0;
1547 boolean unstable = false;
1548 for (int i = 1; i < n; i += 2) {
1549 WorkQueue q; Thread wt; Thread.State ts;
1550 if ((q = ws[i]) != null) {
1551 if (q.source == 0) {
1552 unstable = true;
1553 break;
1570 else if (bc < pc) { // lagging
1571 Thread.yield(); // for retry spins
1572 return 0;
1573 }
1574 else
1575 throw new RejectedExecutionException(
1576 "Thread limit exceeded replacing blocked worker");
1577 }
1578 }
1579 }
1580
1581 long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK); // expand pool
1582 return CTL.compareAndSet(this, c, nc) && createWorker() ? 1 : 0;
1583 }
1584
1585 /**
1586 * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
1587 * See above for explanation.
1588 */
1589 final void runWorker(WorkQueue w) {
1590 int r = (w.id ^ ThreadLocalRandom.nextSecondarySeed()) | FIFO; // rng
1591 w.array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY]; // initialize
1592 for (;;) {
1593 int phase;
1594 if (scan(w, r)) { // scan until apparently empty
1595 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // move (xorshift)
1596 }
1597 else if ((phase = w.phase) >= 0) { // enqueue, then rescan
1598 long np = (w.phase = (phase + SS_SEQ) | UNSIGNALLED) & SP_MASK;
1599 long c, nc;
1600 do {
1601 w.stackPred = (int)(c = ctl);
1602 nc = ((c - RC_UNIT) & UC_MASK) | np;
1603 } while (!CTL.weakCompareAndSet(this, c, nc));
1604 }
1605 else { // already queued
1606 int pred = w.stackPred;
1607 Thread.interrupted(); // clear before park
1608 w.source = DORMANT; // enable signal
1609 long c = ctl;
1610 int md = mode, rc = (md & SMASK) + (int)(c >> RC_SHIFT);
1611 if (md < 0) // terminating
1612 break;
1613 else if (rc <= 0 && (md & SHUTDOWN) != 0 &&
1614 tryTerminate(false, false))
1615 break; // quiescent shutdown
1616 else if (rc <= 0 && pred != 0 && phase == (int)c) {
1617 long nc = (UC_MASK & (c - TC_UNIT)) | (SP_MASK & pred);
1618 long d = keepAlive + System.currentTimeMillis();
1619 LockSupport.parkUntil(this, d);
1620 if (ctl == c && // drop on timeout if all idle
1621 d - System.currentTimeMillis() <= TIMEOUT_SLOP &&
1622 CTL.compareAndSet(this, c, nc)) {
1623 w.phase = QUIET;
1624 break;
1625 }
1626 }
1627 else if (w.phase < 0)
1628 LockSupport.park(this); // OK if spuriously woken
1629 w.source = 0; // disable signal
1630 }
1631 }
1632 }
1633
1634 /**
1635 * Scans for and if found executes one or more top-level tasks from a queue.
1636 *
1637 * @return true if found an apparently non-empty queue, and
1638 * possibly ran task(s).
1639 */
1640 private boolean scan(WorkQueue w, int r) {
1641 WorkQueue[] ws; int n;
1642 if ((ws = workQueues) != null && (n = ws.length) > 0 && w != null) {
1643 for (int m = n - 1, j = r & m;;) {
1644 WorkQueue q; int b;
1645 if ((q = ws[j]) != null && q.top != (b = q.base)) {
1646 int qid = q.id;
1647 ForkJoinTask<?>[] a; int cap, k; ForkJoinTask<?> t;
1648 if ((a = q.array) != null && (cap = a.length) > 0) {
1649 t = (ForkJoinTask<?>)QA.getAcquire(a, k = (cap - 1) & b);
1650 if (q.base == b++ && t != null &&
1651 QA.compareAndSet(a, k, t, null)) {
1652 q.base = b;
1653 w.source = qid;
1654 if (q.top - b > 0)
1655 signalWork();
1656 w.topLevelExec(t, q, // random fairness bound
1657 r & ((n << TOP_BOUND_SHIFT) - 1));
1658 }
1659 }
1660 return true;
1661 }
1662 else if (--n > 0)
1663 j = (j + 1) & m;
1664 else
1665 break;
1666 }
1667 }
1668 return false;
1669 }
1670
1671 /**
1672 * Helps and/or blocks until the given task is done or timeout.
1673 * First tries locally helping, then scans other queues for a task
1674 * produced by one of w's stealers; compensating and blocking if
1675 * none are found (rescanning if tryCompensate fails).
1676 *
1677 * @param w caller
1678 * @param task the task
1679 * @param deadline for timed waits, if nonzero
1680 * @return task status on exit
1681 */
1682 final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
1683 int s = 0;
1684 int seed = ThreadLocalRandom.nextSecondarySeed();
1685 if (w != null && task != null &&
1686 (!(task instanceof CountedCompleter) ||
1687 (s = w.helpCC((CountedCompleter<?>)task, 0, false)) >= 0)) {
1688 w.tryRemoveAndExec(task);
1689 int src = w.source, id = w.id;
1690 int r = (seed >>> 16) | 1, step = (seed & ~1) | 2;
1691 s = task.status;
1692 while (s >= 0) {
1693 WorkQueue[] ws;
1694 int n = (ws = workQueues) == null ? 0 : ws.length, m = n - 1;
1695 while (n > 0) {
1696 WorkQueue q; int b;
1697 if ((q = ws[r & m]) != null && q.source == id &&
1698 q.top != (b = q.base)) {
1699 ForkJoinTask<?>[] a; int cap, k;
1700 int qid = q.id;
1701 if ((a = q.array) != null && (cap = a.length) > 0) {
1702 ForkJoinTask<?> t = (ForkJoinTask<?>)
1703 QA.getAcquire(a, k = (cap - 1) & b);
1704 if (q.source == id && q.base == b++ &&
1705 t != null && QA.compareAndSet(a, k, t, null)) {
1706 q.base = b;
1707 w.source = qid;
1708 t.doExec();
1709 w.source = src;
1710 }
1711 }
1712 break;
1713 }
1714 else {
1715 r += step;
1716 --n;
1717 }
1718 }
1719 if ((s = task.status) < 0)
1720 break;
1721 else if (n == 0) { // empty scan
1722 long ms, ns; int block;
1723 if (deadline == 0L)
1724 ms = 0L; // untimed
1725 else if ((ns = deadline - System.nanoTime()) <= 0L)
1726 break; // timeout
1727 else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
1728 ms = 1L; // avoid 0 for timed wait
1729 if ((block = tryCompensate(w)) != 0) {
1730 task.internalWait(ms);
1731 CTL.getAndAdd(this, (block > 0) ? RC_UNIT : 0L);
1732 }
1733 s = task.status;
1734 }
1735 }
1736 }
1737 return s;
1738 }
1739
1740 /**
1741 * Runs tasks until {@code isQuiescent()}. Rather than blocking
1742 * when tasks cannot be found, rescans until all others cannot
1743 * find tasks either.
1744 */
1745 final void helpQuiescePool(WorkQueue w) {
1746 int prevSrc = w.source;
1747 int seed = ThreadLocalRandom.nextSecondarySeed();
1748 int r = seed >>> 16, step = r | 1;
1749 for (int source = prevSrc, released = -1;;) { // -1 until known
1750 ForkJoinTask<?> localTask; WorkQueue[] ws;
1751 while ((localTask = w.nextLocalTask()) != null)
1752 localTask.doExec();
1753 if (w.phase >= 0 && released == -1)
1754 released = 1;
1755 boolean quiet = true, empty = true;
1756 int n = (ws = workQueues) == null ? 0 : ws.length;
1757 for (int m = n - 1; n > 0; r += step, --n) {
1758 WorkQueue q; int b;
1759 if ((q = ws[r & m]) != null) {
1760 int qs = q.source;
1761 if (q.top != (b = q.base)) {
1762 quiet = empty = false;
1763 ForkJoinTask<?>[] a; int cap, k;
1764 int qid = q.id;
1765 if ((a = q.array) != null && (cap = a.length) > 0) {
1766 if (released == 0) { // increment
1767 released = 1;
1768 CTL.getAndAdd(this, RC_UNIT);
1769 }
1770 ForkJoinTask<?> t = (ForkJoinTask<?>)
1771 QA.getAcquire(a, k = (cap - 1) & b);
1772 if (q.base == b++ && t != null &&
1773 QA.compareAndSet(a, k, t, null)) {
1774 q.base = b;
1775 w.source = qid;
1776 t.doExec();
1777 w.source = source = prevSrc;
1778 }
1779 }
1780 break;
1781 }
1782 else if ((qs & QUIET) == 0)
1783 quiet = false;
1784 }
1785 }
1786 if (quiet) {
1787 if (released == 0)
1788 CTL.getAndAdd(this, RC_UNIT);
1789 w.source = prevSrc;
1790 break;
1791 }
1792 else if (empty) {
1793 if (source != QUIET)
1794 w.source = source = QUIET;
1795 if (released == 1) { // decrement
1796 released = 0;
1797 CTL.getAndAdd(this, RC_MASK & -RC_UNIT);
1798 }
1799 }
1800 }
1801 }
1802
1803 /**
1804 * Scans for and returns a polled task, if available.
1805 * Used only for untracked polls.
1806 *
1807 * @param submissionsOnly if true, only scan submission queues
1808 */
1809 private ForkJoinTask<?> pollScan(boolean submissionsOnly) {
1810 WorkQueue[] ws; int n;
1811 rescan: while ((mode & STOP) == 0 && (ws = workQueues) != null &&
1812 (n = ws.length) > 0) {
1813 int m = n - 1;
1814 int r = ThreadLocalRandom.nextSecondarySeed();
1815 int h = r >>> 16;
1816 int origin, step;
1817 if (submissionsOnly) {
1818 origin = (r & ~1) & m; // even indices and steps
1819 step = (h & ~1) | 2;
1820 }
1821 else {
1822 origin = r & m;
1823 step = h | 1;
1824 }
1825 boolean nonempty = false;
1826 for (int i = origin, oldSum = 0, checkSum = 0;;) {
1827 WorkQueue q;
1828 if ((q = ws[i]) != null) {
1829 int b; ForkJoinTask<?> t;
1830 if (q.top - (b = q.base) > 0) {
1831 nonempty = true;
1832 if ((t = q.poll()) != null)
1833 return t;
1834 }
1835 else
1836 checkSum += b + q.id;
1837 }
1838 if ((i = (i + step) & m) == origin) {
1839 if (!nonempty && oldSum == (oldSum = checkSum))
1840 break rescan;
1841 checkSum = 0;
1842 nonempty = false;
1843 }
1844 }
1845 }
1846 return null;
1847 }
1848
1849 /**
1850 * Gets and removes a local or stolen task for the given worker.
1851 *
1852 * @return a task, if available
1853 */
1854 final ForkJoinTask<?> nextTaskFor(WorkQueue w) {
1855 ForkJoinTask<?> t;
1856 if (w == null || (t = w.nextLocalTask()) == null)
1857 t = pollScan(false);
1858 return t;
1859 }
1860
1861 // External operations
1862
1863 /**
1864 * Adds the given task to a submission queue at submitter's
1865 * current queue, creating one if null or contended.
1866 *
1867 * @param task the task. Caller must ensure non-null.
1868 */
1869 final void externalPush(ForkJoinTask<?> task) {
1870 int r; // initialize caller's probe
1871 if ((r = ThreadLocalRandom.getProbe()) == 0) {
1872 ThreadLocalRandom.localInit();
1873 r = ThreadLocalRandom.getProbe();
1874 }
1875 for (;;) {
1876 WorkQueue q;
1877 int md = mode, n;
1878 WorkQueue[] ws = workQueues;
1879 if ((md & SHUTDOWN) != 0 || ws == null || (n = ws.length) <= 0)
1880 throw new RejectedExecutionException();
1881 else if ((q = ws[(n - 1) & r & SQMASK]) == null) { // add queue
1882 int qid = (r | QUIET) & ~(FIFO | OWNED);
1883 Object lock = workerNamePrefix;
1884 ForkJoinTask<?>[] qa =
1885 new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
1886 q = new WorkQueue(this, null);
1887 q.array = qa;
1888 q.id = qid;
1889 q.source = QUIET;
1890 if (lock != null) { // unless disabled, lock pool to install
1891 synchronized (lock) {
1892 WorkQueue[] vs; int i, vn;
1893 if ((vs = workQueues) != null && (vn = vs.length) > 0 &&
1894 vs[i = qid & (vn - 1) & SQMASK] == null)
1895 vs[i] = q; // else another thread already installed
1896 }
1897 }
1898 }
1899 else if (!q.tryLockPhase()) // move if busy
1900 r = ThreadLocalRandom.advanceProbe(r);
1901 else {
1902 if (q.lockedPush(task))
1903 signalWork();
1904 return;
1905 }
1906 }
1907 }
1908
1909 /**
1910 * Pushes a possibly-external submission.
1911 */
1912 private <T> ForkJoinTask<T> externalSubmit(ForkJoinTask<T> task) {
1913 Thread t; ForkJoinWorkerThread w; WorkQueue q;
1914 if (task == null)
1915 throw new NullPointerException();
1916 if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
1917 (w = (ForkJoinWorkerThread)t).pool == this &&
1918 (q = w.workQueue) != null)
1919 q.push(task);
1920 else
1921 externalPush(task);
1922 return task;
1923 }
1924
1926 * Returns common pool queue for an external thread.
1927 */
1928 static WorkQueue commonSubmitterQueue() {
1929 ForkJoinPool p = common;
1930 int r = ThreadLocalRandom.getProbe();
1931 WorkQueue[] ws; int n;
1932 return (p != null && (ws = p.workQueues) != null &&
1933 (n = ws.length) > 0) ?
1934 ws[(n - 1) & r & SQMASK] : null;
1935 }
1936
1937 /**
1938 * Performs tryUnpush for an external submitter.
1939 */
1940 final boolean tryExternalUnpush(ForkJoinTask<?> task) {
1941 int r = ThreadLocalRandom.getProbe();
1942 WorkQueue[] ws; WorkQueue w; int n;
1943 return ((ws = workQueues) != null &&
1944 (n = ws.length) > 0 &&
1945 (w = ws[(n - 1) & r & SQMASK]) != null &&
1946 w.tryLockedUnpush(task));
1947 }
1948
1949 /**
1950 * Performs helpComplete for an external submitter.
1951 */
1952 final int externalHelpComplete(CountedCompleter<?> task, int maxTasks) {
1953 int r = ThreadLocalRandom.getProbe();
1954 WorkQueue[] ws; WorkQueue w; int n;
1955 return ((ws = workQueues) != null && (n = ws.length) > 0 &&
1956 (w = ws[(n - 1) & r & SQMASK]) != null) ?
1957 w.helpCC(task, maxTasks, true) : 0;
1958 }
1959
1960 /**
1961 * Tries to steal and run tasks within the target's computation.
1962 * The maxTasks argument supports external usages; internal calls
1963 * use zero, allowing unbounded steps (external calls trap
1964 * non-positive values).
1965 *
1966 * @param w caller
1967 * @param maxTasks if non-zero, the maximum number of other tasks to run
1968 * @return task status on exit
1969 */
1970 final int helpComplete(WorkQueue w, CountedCompleter<?> task,
1971 int maxTasks) {
1972 return (w == null) ? 0 : w.helpCC(task, maxTasks, false);
1973 }
1974
1975 /**
1976 * Returns a cheap heuristic guide for task partitioning when
1977 * programmers, frameworks, tools, or languages have little or no
1978 * idea about task granularity. In essence, by offering this
1979 * method, we ask users only about tradeoffs in overhead vs
1980 * expected throughput and its variance, rather than how finely to
1981 * partition tasks.
1982 *
1983 * In a steady state strict (tree-structured) computation, each
1984 * thread makes available for stealing enough tasks for other
1985 * threads to remain active. Inductively, if all threads play by
1986 * the same rules, each thread should make available only a
1987 * constant number of tasks.
1988 *
1989 * The minimum useful constant is just 1. But using a value of 1
1990 * would require immediate replenishment upon each steal to
1991 * maintain enough tasks, which is infeasible. Further,
1992 * partitionings/granularities of offered tasks should minimize
2043 */
2044 private boolean tryTerminate(boolean now, boolean enable) {
2045 int md; // 3 phases: try to set SHUTDOWN, then STOP, then TERMINATED
2046
2047 while (((md = mode) & SHUTDOWN) == 0) {
2048 if (!enable || this == common) // cannot shutdown
2049 return false;
2050 else
2051 MODE.compareAndSet(this, md, md | SHUTDOWN);
2052 }
2053
2054 while (((md = mode) & STOP) == 0) { // try to initiate termination
2055 if (!now) { // check if quiescent & empty
2056 for (long oldSum = 0L;;) { // repeat until stable
2057 boolean running = false;
2058 long checkSum = ctl;
2059 WorkQueue[] ws = workQueues;
2060 if ((md & SMASK) + (int)(checkSum >> RC_SHIFT) > 0)
2061 running = true;
2062 else if (ws != null) {
2063 WorkQueue w;
2064 for (int i = 0; i < ws.length; ++i) {
2065 if ((w = ws[i]) != null) {
2066 int s = w.source, p = w.phase;
2067 int d = w.id, b = w.base;
2068 if (b != w.top ||
2069 ((d & 1) == 1 && (s >= 0 || p >= 0))) {
2070 running = true;
2071 break; // working, scanning, or have work
2072 }
2073 checkSum += (((long)s << 48) + ((long)p << 32) +
2074 ((long)b << 16) + (long)d);
2075 }
2076 }
2077 }
2078 if (((md = mode) & STOP) != 0)
2079 break; // already triggered
2080 else if (running)
2081 return false;
2082 else if (workQueues == ws && oldSum == (oldSum = checkSum))
2083 break;
2084 }
2085 }
2086 if ((md & STOP) == 0)
2087 MODE.compareAndSet(this, md, md | STOP);
2088 }
2089
2090 while (((md = mode) & TERMINATED) == 0) { // help terminate others
2091 for (long oldSum = 0L;;) { // repeat until stable
2092 WorkQueue[] ws; WorkQueue w;
2093 long checkSum = ctl;
2094 if ((ws = workQueues) != null) {
2095 for (int i = 0; i < ws.length; ++i) {
2096 if ((w = ws[i]) != null) {
2097 ForkJoinWorkerThread wt = w.owner;
2098 w.cancelAll(); // clear queues
2099 if (wt != null) {
2100 try { // unblock join or park
2101 wt.interrupt();
2102 } catch (Throwable ignore) {
2103 }
2104 }
2105 checkSum += ((long)w.phase << 32) + w.base;
2106 }
2107 }
2108 }
2109 if (((md = mode) & TERMINATED) != 0 ||
2110 (workQueues == ws && oldSum == (oldSum = checkSum)))
2111 break;
2112 }
2113 if ((md & TERMINATED) != 0)
2114 break;
2115 else if ((md & SMASK) + (short)(ctl >>> TC_SHIFT) > 0)
2116 break;
2117 else if (MODE.compareAndSet(this, md, md | TERMINATED)) {
2118 synchronized (this) {
2119 notifyAll(); // for awaitTermination
2120 }
2121 break;
2122 }
2123 }
2124 return true;
2125 }
2578
2579 /**
2580 * Returns {@code true} if this pool uses local first-in-first-out
2581 * scheduling mode for forked tasks that are never joined.
2582 *
2583 * @return {@code true} if this pool uses async mode
2584 */
2585 public boolean getAsyncMode() {
2586 return (mode & FIFO) != 0;
2587 }
2588
2589 /**
2590 * Returns an estimate of the number of worker threads that are
2591 * not blocked waiting to join tasks or for other managed
2592 * synchronization. This method may overestimate the
2593 * number of running threads.
2594 *
2595 * @return the number of worker threads
2596 */
2597 public int getRunningThreadCount() {
2598 WorkQueue[] ws; WorkQueue w;
2599 VarHandle.acquireFence();
2600 int rc = 0;
2601 if ((ws = workQueues) != null) {
2602 for (int i = 1; i < ws.length; i += 2) {
2603 if ((w = ws[i]) != null && w.isApparentlyUnblocked())
2604 ++rc;
2605 }
2606 }
2607 return rc;
2608 }
2609
2610 /**
2611 * Returns an estimate of the number of threads that are currently
2612 * stealing or executing tasks. This method may overestimate the
2613 * number of active threads.
2614 *
2615 * @return the number of active threads
2616 */
2617 public int getActiveThreadCount() {
2618 int r = (mode & SMASK) + (int)(ctl >> RC_SHIFT);
2619 return (r <= 0) ? 0 : r; // suppress momentarily negative values
2620 }
2628 * idleness of all threads, but will eventually become true if
2629 * threads remain inactive.
2630 *
2631 * @return {@code true} if all threads are currently idle
2632 */
2633 public boolean isQuiescent() {
2634 for (;;) {
2635 long c = ctl;
2636 int md = mode, pc = md & SMASK;
2637 int tc = pc + (short)(c >>> TC_SHIFT);
2638 int rc = pc + (int)(c >> RC_SHIFT);
2639 if ((md & (STOP | TERMINATED)) != 0)
2640 return true;
2641 else if (rc > 0)
2642 return false;
2643 else {
2644 WorkQueue[] ws; WorkQueue v;
2645 if ((ws = workQueues) != null) {
2646 for (int i = 1; i < ws.length; i += 2) {
2647 if ((v = ws[i]) != null) {
2648 if (v.source > 0)
2649 return false;
2650 --tc;
2651 }
2652 }
2653 }
2654 if (tc == 0 && ctl == c)
2655 return true;
2656 }
2657 }
2658 }
2659
2660 /**
2661 * Returns an estimate of the total number of tasks stolen from
2662 * one thread's work queue by another. The reported value
2663 * underestimates the actual total number of steals when the pool
2664 * is not quiescent. This value may be useful for monitoring and
2665 * tuning fork/join programs: in general, steal counts should be
2666 * high enough to keep threads busy, but low enough to avoid
2667 * overhead and contention across threads.
2668 *
2674 if ((ws = workQueues) != null) {
2675 for (int i = 1; i < ws.length; i += 2) {
2676 if ((w = ws[i]) != null)
2677 count += (long)w.nsteals & 0xffffffffL;
2678 }
2679 }
2680 return count;
2681 }
2682
2683 /**
2684 * Returns an estimate of the total number of tasks currently held
2685 * in queues by worker threads (but not including tasks submitted
2686 * to the pool that have not begun executing). This value is only
2687 * an approximation, obtained by iterating across all threads in
2688 * the pool. This method may be useful for tuning task
2689 * granularities.
2690 *
2691 * @return the number of queued tasks
2692 */
2693 public long getQueuedTaskCount() {
2694 WorkQueue[] ws; WorkQueue w;
2695 VarHandle.acquireFence();
2696 int count = 0;
2697 if ((ws = workQueues) != null) {
2698 for (int i = 1; i < ws.length; i += 2) {
2699 if ((w = ws[i]) != null)
2700 count += w.queueSize();
2701 }
2702 }
2703 return count;
2704 }
2705
2706 /**
2707 * Returns an estimate of the number of tasks submitted to this
2708 * pool that have not yet begun executing. This method may take
2709 * time proportional to the number of submissions.
2710 *
2711 * @return the number of queued submissions
2712 */
2713 public int getQueuedSubmissionCount() {
2714 WorkQueue[] ws; WorkQueue w;
2715 VarHandle.acquireFence();
2716 int count = 0;
2717 if ((ws = workQueues) != null) {
2718 for (int i = 0; i < ws.length; i += 2) {
2719 if ((w = ws[i]) != null)
2720 count += w.queueSize();
2721 }
2722 }
2723 return count;
2724 }
2725
2726 /**
2727 * Returns {@code true} if there are any tasks submitted to this
2728 * pool that have not yet begun executing.
2729 *
2730 * @return {@code true} if there are any queued submissions
2731 */
2732 public boolean hasQueuedSubmissions() {
2733 WorkQueue[] ws; WorkQueue w;
2734 VarHandle.acquireFence();
2735 if ((ws = workQueues) != null) {
2736 for (int i = 0; i < ws.length; i += 2) {
2737 if ((w = ws[i]) != null && !w.isEmpty())
2738 return true;
2739 }
2740 }
2741 return false;
2742 }
2743
2744 /**
2745 * Removes and returns the next unexecuted submission if one is
2746 * available. This method may be useful in extensions to this
2747 * class that re-assign work in systems with multiple pools.
2748 *
2749 * @return the next submission, or {@code null} if none
2750 */
2751 protected ForkJoinTask<?> pollSubmission() {
2752 return pollScan(true);
2753 }
2754
2755 /**
2756 * Removes all available unexecuted submitted and forked tasks
2757 * from scheduling queues and adds them to the given collection,
2758 * without altering their execution status. These may include
2759 * artificially generated or wrapped tasks. This method is
2760 * designed to be invoked only when the pool is known to be
2761 * quiescent. Invocations at other times may not remove all
2762 * tasks. A failure encountered while attempting to add elements
2763 * to collection {@code c} may result in elements being in
2764 * neither, either or both collections when the associated
2765 * exception is thrown. The behavior of this operation is
2766 * undefined if the specified collection is modified while the
2767 * operation is in progress.
2768 *
2769 * @param c the collection to transfer elements into
2770 * @return the number of elements transferred
2771 */
2772 protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
2773 WorkQueue[] ws; WorkQueue w; ForkJoinTask<?> t;
2774 VarHandle.acquireFence();
2775 int count = 0;
2776 if ((ws = workQueues) != null) {
2777 for (int i = 0; i < ws.length; ++i) {
2778 if ((w = ws[i]) != null) {
2779 while ((t = w.poll()) != null) {
2780 c.add(t);
2781 ++count;
2782 }
2783 }
2784 }
2785 }
2786 return count;
2787 }
2788
2789 /**
2790 * Returns a string identifying this pool, as well as its state,
2791 * including indications of run state, parallelism level, and
2792 * worker and task counts.
2793 *
2794 * @return a string identifying this pool, as well as its state
2795 */
2796 public String toString() {
2797 // Use a single pass through workQueues to collect counts
2798 int md = mode; // read volatile fields first
2799 long c = ctl;
2800 long st = stealCount;
2801 long qt = 0L, qs = 0L; int rc = 0;
2802 WorkQueue[] ws; WorkQueue w;
2803 if ((ws = workQueues) != null) {
2804 for (int i = 0; i < ws.length; ++i) {
2805 if ((w = ws[i]) != null) {
2806 int size = w.queueSize();
2807 if ((i & 1) == 0)
2808 qs += size;
2809 else {
2810 qt += size;
2811 st += (long)w.nsteals & 0xffffffffL;
2812 if (w.isApparentlyUnblocked())
2813 ++rc;
2814 }
2815 }
2816 }
2817 }
2818
2819 int pc = (md & SMASK);
2820 int tc = pc + (short)(c >>> TC_SHIFT);
2821 int ac = pc + (int)(c >> RC_SHIFT);
2822 if (ac < 0) // ignore transient negative
2823 ac = 0;
2824 String level = ((md & TERMINATED) != 0 ? "Terminated" :
2825 (md & STOP) != 0 ? "Terminating" :
2826 (md & SHUTDOWN) != 0 ? "Shutting down" :
2827 "Running");
2828 return super.toString() +
2829 "[" + level +
2830 ", parallelism = " + pc +
2831 ", size = " + tc +
2832 ", active = " + ac +
2833 ", running = " + rc +
2834 ", steals = " + st +
2835 ", tasks = " + qt +
2836 ", submissions = " + qs +
2837 "]";
2838 }
2839
3085 * {@code blocker.block()} until either method returns {@code true}.
3086 * Every call to {@code blocker.block()} is preceded by a call to
3087 * {@code blocker.isReleasable()} that returned {@code false}.
3088 *
3089 * <p>If not running in a ForkJoinPool, this method is
3090 * behaviorally equivalent to
3091 * <pre> {@code
3092 * while (!blocker.isReleasable())
3093 * if (blocker.block())
3094 * break;}</pre>
3095 *
3096 * If running in a ForkJoinPool, the pool may first be expanded to
3097 * ensure sufficient parallelism available during the call to
3098 * {@code blocker.block()}.
3099 *
3100 * @param blocker the blocker task
3101 * @throws InterruptedException if {@code blocker.block()} did so
3102 */
3103 public static void managedBlock(ManagedBlocker blocker)
3104 throws InterruptedException {
3105 if (blocker == null) throw new NullPointerException();
3106 ForkJoinPool p;
3107 ForkJoinWorkerThread wt;
3108 WorkQueue w;
3109 Thread t = Thread.currentThread();
3110 if ((t instanceof ForkJoinWorkerThread) &&
3111 (p = (wt = (ForkJoinWorkerThread)t).pool) != null &&
3112 (w = wt.workQueue) != null) {
3113 int block;
3114 while (!blocker.isReleasable()) {
3115 if ((block = p.tryCompensate(w)) != 0) {
3116 try {
3117 do {} while (!blocker.isReleasable() &&
3118 !blocker.block());
3119 } finally {
3120 CTL.getAndAdd(p, (block > 0) ? RC_UNIT : 0L);
3121 }
3122 break;
3123 }
3124 }
3125 }
3126 else {
3127 do {} while (!blocker.isReleasable() &&
3128 !blocker.block());
3129 }
3130 }
3131
3132 /**
3133 * If the given executor is a ForkJoinPool, poll and execute
3134 * AsynchronousCompletionTasks from worker's queue until none are
3135 * available or blocker is released.
3136 */
3137 static void helpAsyncBlocker(Executor e, ManagedBlocker blocker) {
3138 if (e instanceof ForkJoinPool) {
3139 WorkQueue w; ForkJoinWorkerThread wt; WorkQueue[] ws; int r, n;
3140 ForkJoinPool p = (ForkJoinPool)e;
3141 Thread thread = Thread.currentThread();
3142 if (thread instanceof ForkJoinWorkerThread &&
3143 (wt = (ForkJoinWorkerThread)thread).pool == p)
3144 w = wt.workQueue;
3145 else if ((r = ThreadLocalRandom.getProbe()) != 0 &&
3146 (ws = p.workQueues) != null && (n = ws.length) > 0)
3147 w = ws[(n - 1) & r & SQMASK];
3148 else
3149 w = null;
3150 if (w != null)
3151 w.helpAsyncBlocker(blocker);
3152 }
3153 }
3154
3155 // AbstractExecutorService overrides. These rely on undocumented
3156 // fact that ForkJoinTask.adapt returns ForkJoinTasks that also
3157 // implement RunnableFuture.
3158
3159 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
3160 return new ForkJoinTask.AdaptedRunnable<T>(runnable, value);
3161 }
3162
3163 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
3164 return new ForkJoinTask.AdaptedCallable<T>(callable);
3165 }
3166
3167 // VarHandle mechanics
3168 private static final VarHandle CTL;
3169 private static final VarHandle MODE;
3170 static final VarHandle QA;
3171
3172 static {
3173 try {
3174 MethodHandles.Lookup l = MethodHandles.lookup();
3175 CTL = l.findVarHandle(ForkJoinPool.class, "ctl", long.class);
3176 MODE = l.findVarHandle(ForkJoinPool.class, "mode", int.class);
3177 QA = MethodHandles.arrayElementVarHandle(ForkJoinTask[].class);
3178 } catch (ReflectiveOperationException e) {
3179 throw new ExceptionInInitializerError(e);
3180 }
3181
3182 // Reduce the risk of rare disastrous classloading in first call to
3183 // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
3184 Class<?> ensureLoaded = LockSupport.class;
3185
3186 int commonMaxSpares = DEFAULT_COMMON_MAX_SPARES;
3187 try {
3188 String p = System.getProperty
3189 ("java.util.concurrent.ForkJoinPool.common.maximumSpares");
3190 if (p != null)
3191 commonMaxSpares = Integer.parseInt(p);
3192 } catch (Exception ignore) {}
3193 COMMON_MAX_SPARES = commonMaxSpares;
3194
3195 defaultForkJoinWorkerThreadFactory =
3196 new DefaultForkJoinWorkerThreadFactory();
3197 modifyThreadPermission = new RuntimePermission("modifyThread");
3198
3199 common = AccessController.doPrivileged(new PrivilegedAction<>() {
|