1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent; 37 38 import java.lang.Thread.UncaughtExceptionHandler; 39 import java.lang.invoke.MethodHandles; 40 import java.lang.invoke.VarHandle; 41 import java.security.AccessController; 42 import java.security.AccessControlContext; 43 import java.security.Permission; 44 import java.security.Permissions; 45 import java.security.PrivilegedAction; 46 import java.security.ProtectionDomain; 47 import java.util.ArrayList; 48 import java.util.Collection; 49 import java.util.Collections; 50 import java.util.List; 51 import java.util.function.Predicate; 52 import java.util.concurrent.locks.LockSupport; 53 54 /** 55 * An {@link ExecutorService} for running {@link ForkJoinTask}s. 56 * A {@code ForkJoinPool} provides the entry point for submissions 57 * from non-{@code ForkJoinTask} clients, as well as management and 58 * monitoring operations. 59 * 60 * <p>A {@code ForkJoinPool} differs from other kinds of {@link 61 * ExecutorService} mainly by virtue of employing 62 * <em>work-stealing</em>: all threads in the pool attempt to find and 63 * execute tasks submitted to the pool and/or created by other active 64 * tasks (eventually blocking waiting for work if none exist). This 65 * enables efficient processing when most tasks spawn other subtasks 66 * (as do most {@code ForkJoinTask}s), as well as when many small 67 * tasks are submitted to the pool from external clients. Especially 68 * when setting <em>asyncMode</em> to true in constructors, {@code 69 * ForkJoinPool}s may also be appropriate for use with event-style 70 * tasks that are never joined. All worker threads are initialized 71 * with {@link Thread#isDaemon} set {@code true}. 72 * 73 * <p>A static {@link #commonPool()} is available and appropriate for 74 * most applications. The common pool is used by any ForkJoinTask that 75 * is not explicitly submitted to a specified pool. Using the common 76 * pool normally reduces resource usage (its threads are slowly 77 * reclaimed during periods of non-use, and reinstated upon subsequent 78 * use). 79 * 80 * <p>For applications that require separate or custom pools, a {@code 81 * ForkJoinPool} may be constructed with a given target parallelism 82 * level; by default, equal to the number of available processors. 83 * The pool attempts to maintain enough active (or available) threads 84 * by dynamically adding, suspending, or resuming internal worker 85 * threads, even if some tasks are stalled waiting to join others. 86 * However, no such adjustments are guaranteed in the face of blocked 87 * I/O or other unmanaged synchronization. The nested {@link 88 * ManagedBlocker} interface enables extension of the kinds of 89 * synchronization accommodated. The default policies may be 90 * overridden using a constructor with parameters corresponding to 91 * those documented in class {@link ThreadPoolExecutor}. 92 * 93 * <p>In addition to execution and lifecycle control methods, this 94 * class provides status check methods (for example 95 * {@link #getStealCount}) that are intended to aid in developing, 96 * tuning, and monitoring fork/join applications. Also, method 97 * {@link #toString} returns indications of pool state in a 98 * convenient form for informal monitoring. 99 * 100 * <p>As is the case with other ExecutorServices, there are three 101 * main task execution methods summarized in the following table. 102 * These are designed to be used primarily by clients not already 103 * engaged in fork/join computations in the current pool. The main 104 * forms of these methods accept instances of {@code ForkJoinTask}, 105 * but overloaded forms also allow mixed execution of plain {@code 106 * Runnable}- or {@code Callable}- based activities as well. However, 107 * tasks that are already executing in a pool should normally instead 108 * use the within-computation forms listed in the table unless using 109 * async event-style tasks that are not usually joined, in which case 110 * there is little difference among choice of methods. 111 * 112 * <table class="plain"> 113 * <caption>Summary of task execution methods</caption> 114 * <tr> 115 * <td></td> 116 * <td style="text-align:center"> <b>Call from non-fork/join clients</b></td> 117 * <td style="text-align:center"> <b>Call from within fork/join computations</b></td> 118 * </tr> 119 * <tr> 120 * <td> <b>Arrange async execution</b></td> 121 * <td> {@link #execute(ForkJoinTask)}</td> 122 * <td> {@link ForkJoinTask#fork}</td> 123 * </tr> 124 * <tr> 125 * <td> <b>Await and obtain result</b></td> 126 * <td> {@link #invoke(ForkJoinTask)}</td> 127 * <td> {@link ForkJoinTask#invoke}</td> 128 * </tr> 129 * <tr> 130 * <td> <b>Arrange exec and obtain Future</b></td> 131 * <td> {@link #submit(ForkJoinTask)}</td> 132 * <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td> 133 * </tr> 134 * </table> 135 * 136 * <p>The parameters used to construct the common pool may be controlled by 137 * setting the following {@linkplain System#getProperty system properties}: 138 * <ul> 139 * <li>{@code java.util.concurrent.ForkJoinPool.common.parallelism} 140 * - the parallelism level, a non-negative integer 141 * <li>{@code java.util.concurrent.ForkJoinPool.common.threadFactory} 142 * - the class name of a {@link ForkJoinWorkerThreadFactory}. 143 * The {@linkplain ClassLoader#getSystemClassLoader() system class loader} 144 * is used to load this class. 145 * <li>{@code java.util.concurrent.ForkJoinPool.common.exceptionHandler} 146 * - the class name of a {@link UncaughtExceptionHandler}. 147 * The {@linkplain ClassLoader#getSystemClassLoader() system class loader} 148 * is used to load this class. 149 * <li>{@code java.util.concurrent.ForkJoinPool.common.maximumSpares} 150 * - the maximum number of allowed extra threads to maintain target 151 * parallelism (default 256). 152 * </ul> 153 * If no thread factory is supplied via a system property, then the 154 * common pool uses a factory that uses the system class loader as the 155 * {@linkplain Thread#getContextClassLoader() thread context class loader}. 156 * In addition, if a {@link SecurityManager} is present, then 157 * the common pool uses a factory supplying threads that have no 158 * {@link Permissions} enabled. 159 * 160 * Upon any error in establishing these settings, default parameters 161 * are used. It is possible to disable or limit the use of threads in 162 * the common pool by setting the parallelism property to zero, and/or 163 * using a factory that may return {@code null}. However doing so may 164 * cause unjoined tasks to never be executed. 165 * 166 * <p><b>Implementation notes</b>: This implementation restricts the 167 * maximum number of running threads to 32767. Attempts to create 168 * pools with greater than the maximum number result in 169 * {@code IllegalArgumentException}. 170 * 171 * <p>This implementation rejects submitted tasks (that is, by throwing 172 * {@link RejectedExecutionException}) only when the pool is shut down 173 * or internal resources have been exhausted. 174 * 175 * @since 1.7 176 * @author Doug Lea 177 */ 178 public class ForkJoinPool extends AbstractExecutorService { 179 180 /* 181 * Implementation Overview 182 * 183 * This class and its nested classes provide the main 184 * functionality and control for a set of worker threads: 185 * Submissions from non-FJ threads enter into submission queues. 186 * Workers take these tasks and typically split them into subtasks 187 * that may be stolen by other workers. Preference rules give 188 * first priority to processing tasks from their own queues (LIFO 189 * or FIFO, depending on mode), then to randomized FIFO steals of 190 * tasks in other queues. This framework began as vehicle for 191 * supporting tree-structured parallelism using work-stealing. 192 * Over time, its scalability advantages led to extensions and 193 * changes to better support more diverse usage contexts. Because 194 * most internal methods and nested classes are interrelated, 195 * their main rationale and descriptions are presented here; 196 * individual methods and nested classes contain only brief 197 * comments about details. 198 * 199 * WorkQueues 200 * ========== 201 * 202 * Most operations occur within work-stealing queues (in nested 203 * class WorkQueue). These are special forms of Deques that 204 * support only three of the four possible end-operations -- push, 205 * pop, and poll (aka steal), under the further constraints that 206 * push and pop are called only from the owning thread (or, as 207 * extended here, under a lock), while poll may be called from 208 * other threads. (If you are unfamiliar with them, you probably 209 * want to read Herlihy and Shavit's book "The Art of 210 * Multiprocessor programming", chapter 16 describing these in 211 * more detail before proceeding.) The main work-stealing queue 212 * design is roughly similar to those in the papers "Dynamic 213 * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005 214 * (http://research.sun.com/scalable/pubs/index.html) and 215 * "Idempotent work stealing" by Michael, Saraswat, and Vechev, 216 * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186). 217 * The main differences ultimately stem from GC requirements that 218 * we null out taken slots as soon as we can, to maintain as small 219 * a footprint as possible even in programs generating huge 220 * numbers of tasks. To accomplish this, we shift the CAS 221 * arbitrating pop vs poll (steal) from being on the indices 222 * ("base" and "top") to the slots themselves. 223 * 224 * Adding tasks then takes the form of a classic array push(task) 225 * in a circular buffer: 226 * q.array[q.top++ % length] = task; 227 * 228 * (The actual code needs to null-check and size-check the array, 229 * uses masking, not mod, for indexing a power-of-two-sized array, 230 * properly fences accesses, and possibly signals waiting workers 231 * to start scanning -- see below.) Both a successful pop and 232 * poll mainly entail a CAS of a slot from non-null to null. 233 * 234 * The pop operation (always performed by owner) is: 235 * if ((the task at top slot is not null) and 236 * (CAS slot to null)) 237 * decrement top and return task; 238 * 239 * And the poll operation (usually by a stealer) is 240 * if ((the task at base slot is not null) and 241 * (CAS slot to null)) 242 * increment base and return task; 243 * 244 * There are several variants of each of these. In particular, 245 * almost all uses of poll occur within scan operations that also 246 * interleave contention tracking (with associated code sprawl.) 247 * 248 * Memory ordering. See "Correct and Efficient Work-Stealing for 249 * Weak Memory Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013 250 * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an 251 * analysis of memory ordering requirements in work-stealing 252 * algorithms similar to (but different than) the one used here. 253 * Extracting tasks in array slots via (fully fenced) CAS provides 254 * primary synchronization. The base and top indices imprecisely 255 * guide where to extract from. We do not always require strict 256 * orderings of array and index updates, so sometimes let them be 257 * subject to compiler and processor reorderings. However, the 258 * volatile "base" index also serves as a basis for memory 259 * ordering: Slot accesses are preceded by a read of base, 260 * ensuring happens-before ordering with respect to stealers (so 261 * the slots themselves can be read via plain array reads.) The 262 * only other memory orderings relied on are maintained in the 263 * course of signalling and activation (see below). A check that 264 * base == top indicates (momentary) emptiness, but otherwise may 265 * err on the side of possibly making the queue appear nonempty 266 * when a push, pop, or poll have not fully committed, or making 267 * it appear empty when an update of top has not yet been visibly 268 * written. (Method isEmpty() checks the case of a partially 269 * completed removal of the last element.) Because of this, the 270 * poll operation, considered individually, is not wait-free. One 271 * thief cannot successfully continue until another in-progress 272 * one (or, if previously empty, a push) visibly completes. 273 * However, in the aggregate, we ensure at least probabilistic 274 * non-blockingness. If an attempted steal fails, a scanning 275 * thief chooses a different random victim target to try next. So, 276 * in order for one thief to progress, it suffices for any 277 * in-progress poll or new push on any empty queue to 278 * complete. 279 * 280 * This approach also enables support of a user mode in which 281 * local task processing is in FIFO, not LIFO order, simply by 282 * using poll rather than pop. This can be useful in 283 * message-passing frameworks in which tasks are never joined. 284 * 285 * WorkQueues are also used in a similar way for tasks submitted 286 * to the pool. We cannot mix these tasks in the same queues used 287 * by workers. Instead, we randomly associate submission queues 288 * with submitting threads, using a form of hashing. The 289 * ThreadLocalRandom probe value serves as a hash code for 290 * choosing existing queues, and may be randomly repositioned upon 291 * contention with other submitters. In essence, submitters act 292 * like workers except that they are restricted to executing local 293 * tasks that they submitted. Insertion of tasks in shared mode 294 * requires a lock but we use only a simple spinlock (using field 295 * phase), because submitters encountering a busy queue move to a 296 * different position to use or create other queues -- they block 297 * only when creating and registering new queues. Because it is 298 * used only as a spinlock, unlocking requires only a "releasing" 299 * store (using setRelease). 300 * 301 * Management 302 * ========== 303 * 304 * The main throughput advantages of work-stealing stem from 305 * decentralized control -- workers mostly take tasks from 306 * themselves or each other, at rates that can exceed a billion 307 * per second. The pool itself creates, activates (enables 308 * scanning for and running tasks), deactivates, blocks, and 309 * terminates threads, all with minimal central information. 310 * There are only a few properties that we can globally track or 311 * maintain, so we pack them into a small number of variables, 312 * often maintaining atomicity without blocking or locking. 313 * Nearly all essentially atomic control state is held in a few 314 * volatile variables that are by far most often read (not 315 * written) as status and consistency checks. We pack as much 316 * information into them as we can. 317 * 318 * Field "ctl" contains 64 bits holding information needed to 319 * atomically decide to add, enqueue (on an event queue), and 320 * dequeue (and release)-activate workers. To enable this 321 * packing, we restrict maximum parallelism to (1<<15)-1 (which is 322 * far in excess of normal operating range) to allow ids, counts, 323 * and their negations (used for thresholding) to fit into 16bit 324 * subfields. 325 * 326 * Field "mode" holds configuration parameters as well as lifetime 327 * status, atomically and monotonically setting SHUTDOWN, STOP, 328 * and finally TERMINATED bits. 329 * 330 * Field "workQueues" holds references to WorkQueues. It is 331 * updated (only during worker creation and termination) under 332 * lock (using field workerNamePrefix as lock), but is otherwise 333 * concurrently readable, and accessed directly. We also ensure 334 * that uses of the array reference itself never become too stale 335 * in case of resizing. To simplify index-based operations, the 336 * array size is always a power of two, and all readers must 337 * tolerate null slots. Worker queues are at odd indices. Shared 338 * (submission) queues are at even indices, up to a maximum of 64 339 * slots, to limit growth even if array needs to expand to add 340 * more workers. Grouping them together in this way simplifies and 341 * speeds up task scanning. 342 * 343 * All worker thread creation is on-demand, triggered by task 344 * submissions, replacement of terminated workers, and/or 345 * compensation for blocked workers. However, all other support 346 * code is set up to work with other policies. To ensure that we 347 * do not hold on to worker references that would prevent GC, all 348 * accesses to workQueues are via indices into the workQueues 349 * array (which is one source of some of the messy code 350 * constructions here). In essence, the workQueues array serves as 351 * a weak reference mechanism. Thus for example the stack top 352 * subfield of ctl stores indices, not references. 353 * 354 * Queuing Idle Workers. Unlike HPC work-stealing frameworks, we 355 * cannot let workers spin indefinitely scanning for tasks when 356 * none can be found immediately, and we cannot start/resume 357 * workers unless there appear to be tasks available. On the 358 * other hand, we must quickly prod them into action when new 359 * tasks are submitted or generated. In many usages, ramp-up time 360 * is the main limiting factor in overall performance, which is 361 * compounded at program start-up by JIT compilation and 362 * allocation. So we streamline this as much as possible. 363 * 364 * The "ctl" field atomically maintains total worker and 365 * "released" worker counts, plus the head of the available worker 366 * queue (actually stack, represented by the lower 32bit subfield 367 * of ctl). Released workers are those known to be scanning for 368 * and/or running tasks. Unreleased ("available") workers are 369 * recorded in the ctl stack. These workers are made available for 370 * signalling by enqueuing in ctl (see method runWorker). The 371 * "queue" is a form of Treiber stack. This is ideal for 372 * activating threads in most-recently used order, and improves 373 * performance and locality, outweighing the disadvantages of 374 * being prone to contention and inability to release a worker 375 * unless it is topmost on stack. To avoid missed signal problems 376 * inherent in any wait/signal design, available workers rescan 377 * for (and if found run) tasks after enqueuing. Normally their 378 * release status will be updated while doing so, but the released 379 * worker ctl count may underestimate the number of active 380 * threads. (However, it is still possible to determine quiescence 381 * via a validation traversal -- see isQuiescent). After an 382 * unsuccessful rescan, available workers are blocked until 383 * signalled (see signalWork). The top stack state holds the 384 * value of the "phase" field of the worker: its index and status, 385 * plus a version counter that, in addition to the count subfields 386 * (also serving as version stamps) provide protection against 387 * Treiber stack ABA effects. 388 * 389 * Creating workers. To create a worker, we pre-increment counts 390 * (serving as a reservation), and attempt to construct a 391 * ForkJoinWorkerThread via its factory. Upon construction, the 392 * new thread invokes registerWorker, where it constructs a 393 * WorkQueue and is assigned an index in the workQueues array 394 * (expanding the array if necessary). The thread is then started. 395 * Upon any exception across these steps, or null return from 396 * factory, deregisterWorker adjusts counts and records 397 * accordingly. If a null return, the pool continues running with 398 * fewer than the target number workers. If exceptional, the 399 * exception is propagated, generally to some external caller. 400 * Worker index assignment avoids the bias in scanning that would 401 * occur if entries were sequentially packed starting at the front 402 * of the workQueues array. We treat the array as a simple 403 * power-of-two hash table, expanding as needed. The seedIndex 404 * increment ensures no collisions until a resize is needed or a 405 * worker is deregistered and replaced, and thereafter keeps 406 * probability of collision low. We cannot use 407 * ThreadLocalRandom.getProbe() for similar purposes here because 408 * the thread has not started yet, but do so for creating 409 * submission queues for existing external threads (see 410 * externalPush). 411 * 412 * WorkQueue field "phase" is used by both workers and the pool to 413 * manage and track whether a worker is UNSIGNALLED (possibly 414 * blocked waiting for a signal). When a worker is enqueued its 415 * phase field is set. Note that phase field updates lag queue CAS 416 * releases so usage requires care -- seeing a negative phase does 417 * not guarantee that the worker is available. When queued, the 418 * lower 16 bits of scanState must hold its pool index. So we 419 * place the index there upon initialization (see registerWorker) 420 * and otherwise keep it there or restore it when necessary. 421 * 422 * The ctl field also serves as the basis for memory 423 * synchronization surrounding activation. This uses a more 424 * efficient version of a Dekker-like rule that task producers and 425 * consumers sync with each other by both writing/CASing ctl (even 426 * if to its current value). This would be extremely costly. So 427 * we relax it in several ways: (1) Producers only signal when 428 * their queue is empty. Other workers propagate this signal (in 429 * method scan) when they find tasks; to further reduce flailing, 430 * each worker signals only one other per activation. (2) Workers 431 * only enqueue after scanning (see below) and not finding any 432 * tasks. (3) Rather than CASing ctl to its current value in the 433 * common case where no action is required, we reduce write 434 * contention by equivalently prefacing signalWork when called by 435 * an external task producer using a memory access with 436 * full-volatile semantics or a "fullFence". 437 * 438 * Almost always, too many signals are issued. A task producer 439 * cannot in general tell if some existing worker is in the midst 440 * of finishing one task (or already scanning) and ready to take 441 * another without being signalled. So the producer might instead 442 * activate a different worker that does not find any work, and 443 * then inactivates. This scarcely matters in steady-state 444 * computations involving all workers, but can create contention 445 * and bookkeeping bottlenecks during ramp-up, ramp-down, and small 446 * computations involving only a few workers. 447 * 448 * Scanning. Method runWorker performs top-level scanning for 449 * tasks. Each scan traverses and tries to poll from each queue 450 * starting at a random index and circularly stepping. Scans are 451 * not performed in ideal random permutation order, to reduce 452 * cacheline contention. The pseudorandom generator need not have 453 * high-quality statistical properties in the long term, but just 454 * within computations; We use Marsaglia XorShifts (often via 455 * ThreadLocalRandom.nextSecondarySeed), which are cheap and 456 * suffice. Scanning also employs contention reduction: When 457 * scanning workers fail to extract an apparently existing task, 458 * they soon restart at a different pseudorandom index. This 459 * improves throughput when many threads are trying to take tasks 460 * from few queues, which can be common in some usages. Scans do 461 * not otherwise explicitly take into account core affinities, 462 * loads, cache localities, etc, However, they do exploit temporal 463 * locality (which usually approximates these) by preferring to 464 * re-poll (at most #workers times) from the same queue after a 465 * successful poll before trying others. 466 * 467 * Trimming workers. To release resources after periods of lack of 468 * use, a worker starting to wait when the pool is quiescent will 469 * time out and terminate (see method scan) if the pool has 470 * remained quiescent for period given by field keepAlive. 471 * 472 * Shutdown and Termination. A call to shutdownNow invokes 473 * tryTerminate to atomically set a runState bit. The calling 474 * thread, as well as every other worker thereafter terminating, 475 * helps terminate others by cancelling their unprocessed tasks, 476 * and waking them up, doing so repeatedly until stable. Calls to 477 * non-abrupt shutdown() preface this by checking whether 478 * termination should commence by sweeping through queues (until 479 * stable) to ensure lack of in-flight submissions and workers 480 * about to process them before triggering the "STOP" phase of 481 * termination. 482 * 483 * Joining Tasks 484 * ============= 485 * 486 * Any of several actions may be taken when one worker is waiting 487 * to join a task stolen (or always held) by another. Because we 488 * are multiplexing many tasks on to a pool of workers, we can't 489 * always just let them block (as in Thread.join). We also cannot 490 * just reassign the joiner's run-time stack with another and 491 * replace it later, which would be a form of "continuation", that 492 * even if possible is not necessarily a good idea since we may 493 * need both an unblocked task and its continuation to progress. 494 * Instead we combine two tactics: 495 * 496 * Helping: Arranging for the joiner to execute some task that it 497 * would be running if the steal had not occurred. 498 * 499 * Compensating: Unless there are already enough live threads, 500 * method tryCompensate() may create or re-activate a spare 501 * thread to compensate for blocked joiners until they unblock. 502 * 503 * A third form (implemented in tryRemoveAndExec) amounts to 504 * helping a hypothetical compensator: If we can readily tell that 505 * a possible action of a compensator is to steal and execute the 506 * task being joined, the joining thread can do so directly, 507 * without the need for a compensation thread. 508 * 509 * The ManagedBlocker extension API can't use helping so relies 510 * only on compensation in method awaitBlocker. 511 * 512 * The algorithm in awaitJoin entails a form of "linear helping". 513 * Each worker records (in field source) the id of the queue from 514 * which it last stole a task. The scan in method awaitJoin uses 515 * these markers to try to find a worker to help (i.e., steal back 516 * a task from and execute it) that could hasten completion of the 517 * actively joined task. Thus, the joiner executes a task that 518 * would be on its own local deque if the to-be-joined task had 519 * not been stolen. This is a conservative variant of the approach 520 * described in Wagner & Calder "Leapfrogging: a portable 521 * technique for implementing efficient futures" SIGPLAN Notices, 522 * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs 523 * mainly in that we only record queue ids, not full dependency 524 * links. This requires a linear scan of the workQueues array to 525 * locate stealers, but isolates cost to when it is needed, rather 526 * than adding to per-task overhead. Searches can fail to locate 527 * stealers GC stalls and the like delay recording sources. 528 * Further, even when accurately identified, stealers might not 529 * ever produce a task that the joiner can in turn help with. So, 530 * compensation is tried upon failure to find tasks to run. 531 * 532 * Compensation does not by default aim to keep exactly the target 533 * parallelism number of unblocked threads running at any given 534 * time. Some previous versions of this class employed immediate 535 * compensations for any blocked join. However, in practice, the 536 * vast majority of blockages are transient byproducts of GC and 537 * other JVM or OS activities that are made worse by replacement. 538 * Rather than impose arbitrary policies, we allow users to 539 * override the default of only adding threads upon apparent 540 * starvation. The compensation mechanism may also be bounded. 541 * Bounds for the commonPool (see COMMON_MAX_SPARES) better enable 542 * JVMs to cope with programming errors and abuse before running 543 * out of resources to do so. 544 * 545 * Common Pool 546 * =========== 547 * 548 * The static common pool always exists after static 549 * initialization. Since it (or any other created pool) need 550 * never be used, we minimize initial construction overhead and 551 * footprint to the setup of about a dozen fields. 552 * 553 * When external threads submit to the common pool, they can 554 * perform subtask processing (see externalHelpComplete and 555 * related methods) upon joins. This caller-helps policy makes it 556 * sensible to set common pool parallelism level to one (or more) 557 * less than the total number of available cores, or even zero for 558 * pure caller-runs. We do not need to record whether external 559 * submissions are to the common pool -- if not, external help 560 * methods return quickly. These submitters would otherwise be 561 * blocked waiting for completion, so the extra effort (with 562 * liberally sprinkled task status checks) in inapplicable cases 563 * amounts to an odd form of limited spin-wait before blocking in 564 * ForkJoinTask.join. 565 * 566 * As a more appropriate default in managed environments, unless 567 * overridden by system properties, we use workers of subclass 568 * InnocuousForkJoinWorkerThread when there is a SecurityManager 569 * present. These workers have no permissions set, do not belong 570 * to any user-defined ThreadGroup, and erase all ThreadLocals 571 * after executing any top-level task (see 572 * WorkQueue.afterTopLevelExec). The associated mechanics (mainly 573 * in ForkJoinWorkerThread) may be JVM-dependent and must access 574 * particular Thread class fields to achieve this effect. 575 * 576 * Style notes 577 * =========== 578 * 579 * Memory ordering relies mainly on VarHandles. This can be 580 * awkward and ugly, but also reflects the need to control 581 * outcomes across the unusual cases that arise in very racy code 582 * with very few invariants. All fields are read into locals 583 * before use, and null-checked if they are references. This is 584 * usually done in a "C"-like style of listing declarations at the 585 * heads of methods or blocks, and using inline assignments on 586 * first encounter. Nearly all explicit checks lead to 587 * bypass/return, not exception throws, because they may 588 * legitimately arise due to cancellation/revocation during 589 * shutdown. 590 * 591 * There is a lot of representation-level coupling among classes 592 * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask. The 593 * fields of WorkQueue maintain data structures managed by 594 * ForkJoinPool, so are directly accessed. There is little point 595 * trying to reduce this, since any associated future changes in 596 * representations will need to be accompanied by algorithmic 597 * changes anyway. Several methods intrinsically sprawl because 598 * they must accumulate sets of consistent reads of fields held in 599 * local variables. There are also other coding oddities 600 * (including several unnecessary-looking hoisted null checks) 601 * that help some methods perform reasonably even when interpreted 602 * (not compiled). 603 * 604 * The order of declarations in this file is (with a few exceptions): 605 * (1) Static utility functions 606 * (2) Nested (static) classes 607 * (3) Static fields 608 * (4) Fields, along with constants used when unpacking some of them 609 * (5) Internal control methods 610 * (6) Callbacks and other support for ForkJoinTask methods 611 * (7) Exported methods 612 * (8) Static block initializing statics in minimally dependent order 613 */ 614 615 // Static utilities 616 617 /** 618 * If there is a security manager, makes sure caller has 619 * permission to modify threads. 620 */ 621 private static void checkPermission() { 622 SecurityManager security = System.getSecurityManager(); 623 if (security != null) 624 security.checkPermission(modifyThreadPermission); 625 } 626 627 // Nested classes 628 629 /** 630 * Factory for creating new {@link ForkJoinWorkerThread}s. 631 * A {@code ForkJoinWorkerThreadFactory} must be defined and used 632 * for {@code ForkJoinWorkerThread} subclasses that extend base 633 * functionality or initialize threads with different contexts. 634 */ 635 public static interface ForkJoinWorkerThreadFactory { 636 /** 637 * Returns a new worker thread operating in the given pool. 638 * Returning null or throwing an exception may result in tasks 639 * never being executed. If this method throws an exception, 640 * it is relayed to the caller of the method (for example 641 * {@code execute}) causing attempted thread creation. If this 642 * method returns null or throws an exception, it is not 643 * retried until the next attempted creation (for example 644 * another call to {@code execute}). 645 * 646 * @param pool the pool this thread works in 647 * @return the new worker thread, or {@code null} if the request 648 * to create a thread is rejected 649 * @throws NullPointerException if the pool is null 650 */ 651 public ForkJoinWorkerThread newThread(ForkJoinPool pool); 652 } 653 654 static AccessControlContext contextWithPermissions(Permission ... perms) { 655 Permissions permissions = new Permissions(); 656 for (Permission perm : perms) 657 permissions.add(perm); 658 return new AccessControlContext( 659 new ProtectionDomain[] { new ProtectionDomain(null, permissions) }); 660 } 661 662 /** 663 * Default ForkJoinWorkerThreadFactory implementation; creates a 664 * new ForkJoinWorkerThread using the system class loader as the 665 * thread context class loader. 666 */ 667 private static final class DefaultForkJoinWorkerThreadFactory 668 implements ForkJoinWorkerThreadFactory { 669 private static final AccessControlContext ACC = contextWithPermissions( 670 new RuntimePermission("getClassLoader"), 671 new RuntimePermission("setContextClassLoader")); 672 673 public final ForkJoinWorkerThread newThread(ForkJoinPool pool) { 674 return AccessController.doPrivileged( 675 new PrivilegedAction<>() { 676 public ForkJoinWorkerThread run() { 677 return new ForkJoinWorkerThread( 678 pool, ClassLoader.getSystemClassLoader()); }}, 679 ACC); 680 } 681 } 682 683 // Constants shared across ForkJoinPool and WorkQueue 684 685 // Bounds 686 static final int SWIDTH = 16; // width of short 687 static final int SMASK = 0xffff; // short bits == max index 688 static final int MAX_CAP = 0x7fff; // max #workers - 1 689 static final int SQMASK = 0x007e; // max 64 (even) slots 690 691 // Masks and units for WorkQueue.phase and ctl sp subfield 692 static final int UNSIGNALLED = 1 << 31; // must be negative 693 static final int SS_SEQ = 1 << 16; // version count 694 static final int QLOCK = 1; // must be 1 695 696 // Mode bits and sentinels, some also used in WorkQueue id and.source fields 697 static final int OWNED = 1; // queue has owner thread 698 static final int FIFO = 1 << 16; // fifo queue or access mode 699 static final int SHUTDOWN = 1 << 18; 700 static final int TERMINATED = 1 << 19; 701 static final int STOP = 1 << 31; // must be negative 702 static final int QUIET = 1 << 30; // not scanning or working 703 static final int DORMANT = QUIET | UNSIGNALLED; 704 705 /** 706 * The maximum number of local polls from the same queue before 707 * checking others. This is a safeguard against infinitely unfair 708 * looping under unbounded user task recursion, and must be larger 709 * than plausible cases of intentional bounded task recursion. 710 */ 711 static final int POLL_LIMIT = 1 << 10; 712 713 /** 714 * Queues supporting work-stealing as well as external task 715 * submission. See above for descriptions and algorithms. 716 * Performance on most platforms is very sensitive to placement of 717 * instances of both WorkQueues and their arrays -- we absolutely 718 * do not want multiple WorkQueue instances or multiple queue 719 * arrays sharing cache lines. The @Contended annotation alerts 720 * JVMs to try to keep instances apart. 721 */ 722 @jdk.internal.vm.annotation.Contended 723 static final class WorkQueue { 724 725 /** 726 * Capacity of work-stealing queue array upon initialization. 727 * Must be a power of two; at least 4, but should be larger to 728 * reduce or eliminate cacheline sharing among queues. 729 * Currently, it is much larger, as a partial workaround for 730 * the fact that JVMs often place arrays in locations that 731 * share GC bookkeeping (especially cardmarks) such that 732 * per-write accesses encounter serious memory contention. 733 */ 734 static final int INITIAL_QUEUE_CAPACITY = 1 << 13; 735 736 /** 737 * Maximum size for queue arrays. Must be a power of two less 738 * than or equal to 1 << (31 - width of array entry) to ensure 739 * lack of wraparound of index calculations, but defined to a 740 * value a bit less than this to help users trap runaway 741 * programs before saturating systems. 742 */ 743 static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M 744 745 // Instance fields 746 volatile int phase; // versioned, negative: queued, 1: locked 747 int stackPred; // pool stack (ctl) predecessor link 748 int nsteals; // number of steals 749 int id; // index, mode, tag 750 volatile int source; // source queue id, or sentinel 751 volatile int base; // index of next slot for poll 752 int top; // index of next slot for push 753 ForkJoinTask<?>[] array; // the elements (initially unallocated) 754 final ForkJoinPool pool; // the containing pool (may be null) 755 final ForkJoinWorkerThread owner; // owning thread or null if shared 756 757 WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) { 758 this.pool = pool; 759 this.owner = owner; 760 // Place indices in the center of array (that is not yet allocated) 761 base = top = INITIAL_QUEUE_CAPACITY >>> 1; 762 } 763 764 /** 765 * Returns an exportable index (used by ForkJoinWorkerThread). 766 */ 767 final int getPoolIndex() { 768 return (id & 0xffff) >>> 1; // ignore odd/even tag bit 769 } 770 771 /** 772 * Returns the approximate number of tasks in the queue. 773 */ 774 final int queueSize() { 775 int n = base - top; // read base first 776 return (n >= 0) ? 0 : -n; // ignore transient negative 777 } 778 779 /** 780 * Provides a more accurate estimate of whether this queue has 781 * any tasks than does queueSize, by checking whether a 782 * near-empty queue has at least one unclaimed task. 783 */ 784 final boolean isEmpty() { 785 ForkJoinTask<?>[] a; int n, al, b; 786 return ((n = (b = base) - top) >= 0 || // possibly one task 787 (n == -1 && ((a = array) == null || 788 (al = a.length) == 0 || 789 a[(al - 1) & b] == null))); 790 } 791 792 793 /** 794 * Pushes a task. Call only by owner in unshared queues. 795 * 796 * @param task the task. Caller must ensure non-null. 797 * @throws RejectedExecutionException if array cannot be resized 798 */ 799 final void push(ForkJoinTask<?> task) { 800 int s = top; ForkJoinTask<?>[] a; int al, d; 801 if ((a = array) != null && (al = a.length) > 0) { 802 int index = (al - 1) & s; 803 ForkJoinPool p = pool; 804 top = s + 1; 805 QA.setRelease(a, index, task); 806 if ((d = base - s) == 0 && p != null) { 807 VarHandle.fullFence(); 808 p.signalWork(); 809 } 810 else if (d + al == 1) 811 growArray(); 812 } 813 } 814 815 /** 816 * Initializes or doubles the capacity of array. Call either 817 * by owner or with lock held -- it is OK for base, but not 818 * top, to move while resizings are in progress. 819 */ 820 final ForkJoinTask<?>[] growArray() { 821 ForkJoinTask<?>[] oldA = array; 822 int oldSize = oldA != null ? oldA.length : 0; 823 int size = oldSize > 0 ? oldSize << 1 : INITIAL_QUEUE_CAPACITY; 824 if (size < INITIAL_QUEUE_CAPACITY || size > MAXIMUM_QUEUE_CAPACITY) 825 throw new RejectedExecutionException("Queue capacity exceeded"); 826 int oldMask, t, b; 827 ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size]; 828 if (oldA != null && (oldMask = oldSize - 1) > 0 && 829 (t = top) - (b = base) > 0) { 830 int mask = size - 1; 831 do { // emulate poll from old array, push to new array 832 int index = b & oldMask; 833 ForkJoinTask<?> x = (ForkJoinTask<?>) 834 QA.getAcquire(oldA, index); 835 if (x != null && 836 QA.compareAndSet(oldA, index, x, null)) 837 a[b & mask] = x; 838 } while (++b != t); 839 VarHandle.releaseFence(); 840 } 841 return a; 842 } 843 844 /** 845 * Takes next task, if one exists, in LIFO order. Call only 846 * by owner in unshared queues. 847 */ 848 final ForkJoinTask<?> pop() { 849 int b = base, s = top, al, i; ForkJoinTask<?>[] a; 850 if ((a = array) != null && b != s && (al = a.length) > 0) { 851 int index = (al - 1) & --s; 852 ForkJoinTask<?> t = (ForkJoinTask<?>) 853 QA.get(a, index); 854 if (t != null && 855 QA.compareAndSet(a, index, t, null)) { 856 top = s; 857 VarHandle.releaseFence(); 858 return t; 859 } 860 } 861 return null; 862 } 863 864 /** 865 * Takes next task, if one exists, in FIFO order. 866 */ 867 final ForkJoinTask<?> poll() { 868 for (;;) { 869 int b = base, s = top, d, al; ForkJoinTask<?>[] a; 870 if ((a = array) != null && (d = b - s) < 0 && 871 (al = a.length) > 0) { 872 int index = (al - 1) & b; 873 ForkJoinTask<?> t = (ForkJoinTask<?>) 874 QA.getAcquire(a, index); 875 if (b++ == base) { 876 if (t != null) { 877 if (QA.compareAndSet(a, index, t, null)) { 878 base = b; 879 return t; 880 } 881 } 882 else if (d == -1) 883 break; // now empty 884 } 885 } 886 else 887 break; 888 } 889 return null; 890 } 891 892 /** 893 * Takes next task, if one exists, in order specified by mode. 894 */ 895 final ForkJoinTask<?> nextLocalTask() { 896 return ((id & FIFO) != 0) ? poll() : pop(); 897 } 898 899 /** 900 * Returns next task, if one exists, in order specified by mode. 901 */ 902 final ForkJoinTask<?> peek() { 903 int al; ForkJoinTask<?>[] a; 904 return ((a = array) != null && (al = a.length) > 0) ? 905 a[(al - 1) & 906 ((id & FIFO) != 0 ? base : top - 1)] : null; 907 } 908 909 /** 910 * Pops the given task only if it is at the current top. 911 */ 912 final boolean tryUnpush(ForkJoinTask<?> task) { 913 int b = base, s = top, al; ForkJoinTask<?>[] a; 914 if ((a = array) != null && b != s && (al = a.length) > 0) { 915 int index = (al - 1) & --s; 916 if (QA.compareAndSet(a, index, task, null)) { 917 top = s; 918 VarHandle.releaseFence(); 919 return true; 920 } 921 } 922 return false; 923 } 924 925 /** 926 * Removes and cancels all known tasks, ignoring any exceptions. 927 */ 928 final void cancelAll() { 929 for (ForkJoinTask<?> t; (t = poll()) != null; ) 930 ForkJoinTask.cancelIgnoringExceptions(t); 931 } 932 933 // Specialized execution methods 934 935 /** 936 * Pops and executes up to limit consecutive tasks or until empty. 937 * 938 * @param limit max runs, or zero for no limit 939 */ 940 final void localPopAndExec(int limit) { 941 for (;;) { 942 int b = base, s = top, al; ForkJoinTask<?>[] a; 943 if ((a = array) != null && b != s && (al = a.length) > 0) { 944 int index = (al - 1) & --s; 945 ForkJoinTask<?> t = (ForkJoinTask<?>) 946 QA.getAndSet(a, index, null); 947 if (t != null) { 948 top = s; 949 VarHandle.releaseFence(); 950 t.doExec(); 951 if (limit != 0 && --limit == 0) 952 break; 953 } 954 else 955 break; 956 } 957 else 958 break; 959 } 960 } 961 962 /** 963 * Polls and executes up to limit consecutive tasks or until empty. 964 * 965 * @param limit, or zero for no limit 966 */ 967 final void localPollAndExec(int limit) { 968 for (int polls = 0;;) { 969 int b = base, s = top, d, al; ForkJoinTask<?>[] a; 970 if ((a = array) != null && (d = b - s) < 0 && 971 (al = a.length) > 0) { 972 int index = (al - 1) & b++; 973 ForkJoinTask<?> t = (ForkJoinTask<?>) 974 QA.getAndSet(a, index, null); 975 if (t != null) { 976 base = b; 977 t.doExec(); 978 if (limit != 0 && ++polls == limit) 979 break; 980 } 981 else if (d == -1) 982 break; // now empty 983 else 984 polls = 0; // stolen; reset 985 } 986 else 987 break; 988 } 989 } 990 991 /** 992 * If present, removes task from queue and executes it. 993 */ 994 final void tryRemoveAndExec(ForkJoinTask<?> task) { 995 ForkJoinTask<?>[] wa; int s, wal; 996 if (base - (s = top) < 0 && // traverse from top 997 (wa = array) != null && (wal = wa.length) > 0) { 998 for (int m = wal - 1, ns = s - 1, i = ns; ; --i) { 999 int index = i & m; 1000 ForkJoinTask<?> t = (ForkJoinTask<?>) 1001 QA.get(wa, index); 1002 if (t == null) 1003 break; 1004 else if (t == task) { 1005 if (QA.compareAndSet(wa, index, t, null)) { 1006 top = ns; // safely shift down 1007 for (int j = i; j != ns; ++j) { 1008 ForkJoinTask<?> f; 1009 int pindex = (j + 1) & m; 1010 f = (ForkJoinTask<?>)QA.get(wa, pindex); 1011 QA.setVolatile(wa, pindex, null); 1012 int jindex = j & m; 1013 QA.setRelease(wa, jindex, f); 1014 } 1015 VarHandle.releaseFence(); 1016 t.doExec(); 1017 } 1018 break; 1019 } 1020 } 1021 } 1022 } 1023 1024 /** 1025 * Tries to steal and run tasks within the target's 1026 * computation until done, not found, or limit exceeded. 1027 * 1028 * @param task root of CountedCompleter computation 1029 * @param limit max runs, or zero for no limit 1030 * @return task status on exit 1031 */ 1032 final int localHelpCC(CountedCompleter<?> task, int limit) { 1033 int status = 0; 1034 if (task != null && (status = task.status) >= 0) { 1035 for (;;) { 1036 boolean help = false; 1037 int b = base, s = top, al; ForkJoinTask<?>[] a; 1038 if ((a = array) != null && b != s && (al = a.length) > 0) { 1039 int index = (al - 1) & (s - 1); 1040 ForkJoinTask<?> o = (ForkJoinTask<?>) 1041 QA.get(a, index); 1042 if (o instanceof CountedCompleter) { 1043 CountedCompleter<?> t = (CountedCompleter<?>)o; 1044 for (CountedCompleter<?> f = t;;) { 1045 if (f != task) { 1046 if ((f = f.completer) == null) // try parent 1047 break; 1048 } 1049 else { 1050 if (QA.compareAndSet(a, index, t, null)) { 1051 top = s - 1; 1052 VarHandle.releaseFence(); 1053 t.doExec(); 1054 help = true; 1055 } 1056 break; 1057 } 1058 } 1059 } 1060 } 1061 if ((status = task.status) < 0 || !help || 1062 (limit != 0 && --limit == 0)) 1063 break; 1064 } 1065 } 1066 return status; 1067 } 1068 1069 // Operations on shared queues 1070 1071 /** 1072 * Tries to lock shared queue by CASing phase field. 1073 */ 1074 final boolean tryLockSharedQueue() { 1075 return PHASE.compareAndSet(this, 0, QLOCK); 1076 } 1077 1078 /** 1079 * Shared version of tryUnpush. 1080 */ 1081 final boolean trySharedUnpush(ForkJoinTask<?> task) { 1082 boolean popped = false; 1083 int s = top - 1, al; ForkJoinTask<?>[] a; 1084 if ((a = array) != null && (al = a.length) > 0) { 1085 int index = (al - 1) & s; 1086 ForkJoinTask<?> t = (ForkJoinTask<?>) QA.get(a, index); 1087 if (t == task && 1088 PHASE.compareAndSet(this, 0, QLOCK)) { 1089 if (top == s + 1 && array == a && 1090 QA.compareAndSet(a, index, task, null)) { 1091 popped = true; 1092 top = s; 1093 } 1094 PHASE.setRelease(this, 0); 1095 } 1096 } 1097 return popped; 1098 } 1099 1100 /** 1101 * Shared version of localHelpCC. 1102 */ 1103 final int sharedHelpCC(CountedCompleter<?> task, int limit) { 1104 int status = 0; 1105 if (task != null && (status = task.status) >= 0) { 1106 for (;;) { 1107 boolean help = false; 1108 int b = base, s = top, al; ForkJoinTask<?>[] a; 1109 if ((a = array) != null && b != s && (al = a.length) > 0) { 1110 int index = (al - 1) & (s - 1); 1111 ForkJoinTask<?> o = (ForkJoinTask<?>) 1112 QA.get(a, index); 1113 if (o instanceof CountedCompleter) { 1114 CountedCompleter<?> t = (CountedCompleter<?>)o; 1115 for (CountedCompleter<?> f = t;;) { 1116 if (f != task) { 1117 if ((f = f.completer) == null) 1118 break; 1119 } 1120 else { 1121 if (PHASE.compareAndSet(this, 0, QLOCK)) { 1122 if (top == s && array == a && 1123 QA.compareAndSet(a, index, t, null)) { 1124 help = true; 1125 top = s - 1; 1126 } 1127 PHASE.setRelease(this, 0); 1128 if (help) 1129 t.doExec(); 1130 } 1131 break; 1132 } 1133 } 1134 } 1135 } 1136 if ((status = task.status) < 0 || !help || 1137 (limit != 0 && --limit == 0)) 1138 break; 1139 } 1140 } 1141 return status; 1142 } 1143 1144 /** 1145 * Returns true if owned and not known to be blocked. 1146 */ 1147 final boolean isApparentlyUnblocked() { 1148 Thread wt; Thread.State s; 1149 return ((wt = owner) != null && 1150 (s = wt.getState()) != Thread.State.BLOCKED && 1151 s != Thread.State.WAITING && 1152 s != Thread.State.TIMED_WAITING); 1153 } 1154 1155 // VarHandle mechanics. 1156 private static final VarHandle PHASE; 1157 static { 1158 try { 1159 MethodHandles.Lookup l = MethodHandles.lookup(); 1160 PHASE = l.findVarHandle(WorkQueue.class, "phase", int.class); 1161 } catch (ReflectiveOperationException e) { 1162 throw new Error(e); 1163 } 1164 } 1165 } 1166 1167 // static fields (initialized in static initializer below) 1168 1169 /** 1170 * Creates a new ForkJoinWorkerThread. This factory is used unless 1171 * overridden in ForkJoinPool constructors. 1172 */ 1173 public static final ForkJoinWorkerThreadFactory 1174 defaultForkJoinWorkerThreadFactory; 1175 1176 /** 1177 * Permission required for callers of methods that may start or 1178 * kill threads. 1179 */ 1180 static final RuntimePermission modifyThreadPermission; 1181 1182 /** 1183 * Common (static) pool. Non-null for public use unless a static 1184 * construction exception, but internal usages null-check on use 1185 * to paranoically avoid potential initialization circularities 1186 * as well as to simplify generated code. 1187 */ 1188 static final ForkJoinPool common; 1189 1190 /** 1191 * Common pool parallelism. To allow simpler use and management 1192 * when common pool threads are disabled, we allow the underlying 1193 * common.parallelism field to be zero, but in that case still report 1194 * parallelism as 1 to reflect resulting caller-runs mechanics. 1195 */ 1196 static final int COMMON_PARALLELISM; 1197 1198 /** 1199 * Limit on spare thread construction in tryCompensate. 1200 */ 1201 private static final int COMMON_MAX_SPARES; 1202 1203 /** 1204 * Sequence number for creating workerNamePrefix. 1205 */ 1206 private static int poolNumberSequence; 1207 1208 /** 1209 * Returns the next sequence number. We don't expect this to 1210 * ever contend, so use simple builtin sync. 1211 */ 1212 private static final synchronized int nextPoolId() { 1213 return ++poolNumberSequence; 1214 } 1215 1216 // static configuration constants 1217 1218 /** 1219 * Default idle timeout value (in milliseconds) for the thread 1220 * triggering quiescence to park waiting for new work 1221 */ 1222 private static final long DEFAULT_KEEPALIVE = 60_000L; 1223 1224 /** 1225 * Undershoot tolerance for idle timeouts 1226 */ 1227 private static final long TIMEOUT_SLOP = 20L; 1228 1229 /** 1230 * The default value for COMMON_MAX_SPARES. Overridable using the 1231 * "java.util.concurrent.ForkJoinPool.common.maximumSpares" system 1232 * property. The default value is far in excess of normal 1233 * requirements, but also far short of MAX_CAP and typical OS 1234 * thread limits, so allows JVMs to catch misuse/abuse before 1235 * running out of resources needed to do so. 1236 */ 1237 private static final int DEFAULT_COMMON_MAX_SPARES = 256; 1238 1239 /** 1240 * Increment for seed generators. See class ThreadLocal for 1241 * explanation. 1242 */ 1243 private static final int SEED_INCREMENT = 0x9e3779b9; 1244 1245 /* 1246 * Bits and masks for field ctl, packed with 4 16 bit subfields: 1247 * RC: Number of released (unqueued) workers minus target parallelism 1248 * TC: Number of total workers minus target parallelism 1249 * SS: version count and status of top waiting thread 1250 * ID: poolIndex of top of Treiber stack of waiters 1251 * 1252 * When convenient, we can extract the lower 32 stack top bits 1253 * (including version bits) as sp=(int)ctl. The offsets of counts 1254 * by the target parallelism and the positionings of fields makes 1255 * it possible to perform the most common checks via sign tests of 1256 * fields: When ac is negative, there are not enough unqueued 1257 * workers, when tc is negative, there are not enough total 1258 * workers. When sp is non-zero, there are waiting workers. To 1259 * deal with possibly negative fields, we use casts in and out of 1260 * "short" and/or signed shifts to maintain signedness. 1261 * 1262 * Because it occupies uppermost bits, we can add one release count 1263 * using getAndAddLong of RC_UNIT, rather than CAS, when returning 1264 * from a blocked join. Other updates entail multiple subfields 1265 * and masking, requiring CAS. 1266 * 1267 * The limits packed in field "bounds" are also offset by the 1268 * parallelism level to make them comparable to the ctl rc and tc 1269 * fields. 1270 */ 1271 1272 // Lower and upper word masks 1273 private static final long SP_MASK = 0xffffffffL; 1274 private static final long UC_MASK = ~SP_MASK; 1275 1276 // Release counts 1277 private static final int RC_SHIFT = 48; 1278 private static final long RC_UNIT = 0x0001L << RC_SHIFT; 1279 private static final long RC_MASK = 0xffffL << RC_SHIFT; 1280 1281 // Total counts 1282 private static final int TC_SHIFT = 32; 1283 private static final long TC_UNIT = 0x0001L << TC_SHIFT; 1284 private static final long TC_MASK = 0xffffL << TC_SHIFT; 1285 private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign 1286 1287 // Instance fields 1288 1289 volatile long stealCount; // collects worker nsteals 1290 final long keepAlive; // milliseconds before dropping if idle 1291 int indexSeed; // next worker index 1292 final int bounds; // min, max threads packed as shorts 1293 volatile int mode; // parallelism, runstate, queue mode 1294 WorkQueue[] workQueues; // main registry 1295 final String workerNamePrefix; // for worker thread string; sync lock 1296 final ForkJoinWorkerThreadFactory factory; 1297 final UncaughtExceptionHandler ueh; // per-worker UEH 1298 final Predicate<? super ForkJoinPool> saturate; 1299 1300 @jdk.internal.vm.annotation.Contended("fjpctl") // segregate 1301 volatile long ctl; // main pool control 1302 1303 // Creating, registering and deregistering workers 1304 1305 /** 1306 * Tries to construct and start one worker. Assumes that total 1307 * count has already been incremented as a reservation. Invokes 1308 * deregisterWorker on any failure. 1309 * 1310 * @return true if successful 1311 */ 1312 private boolean createWorker() { 1313 ForkJoinWorkerThreadFactory fac = factory; 1314 Throwable ex = null; 1315 ForkJoinWorkerThread wt = null; 1316 try { 1317 if (fac != null && (wt = fac.newThread(this)) != null) { 1318 wt.start(); 1319 return true; 1320 } 1321 } catch (Throwable rex) { 1322 ex = rex; 1323 } 1324 deregisterWorker(wt, ex); 1325 return false; 1326 } 1327 1328 /** 1329 * Tries to add one worker, incrementing ctl counts before doing 1330 * so, relying on createWorker to back out on failure. 1331 * 1332 * @param c incoming ctl value, with total count negative and no 1333 * idle workers. On CAS failure, c is refreshed and retried if 1334 * this holds (otherwise, a new worker is not needed). 1335 */ 1336 private void tryAddWorker(long c) { 1337 do { 1338 long nc = ((RC_MASK & (c + RC_UNIT)) | 1339 (TC_MASK & (c + TC_UNIT))); 1340 if (ctl == c && CTL.compareAndSet(this, c, nc)) { 1341 createWorker(); 1342 break; 1343 } 1344 } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0); 1345 } 1346 1347 /** 1348 * Callback from ForkJoinWorkerThread constructor to establish and 1349 * record its WorkQueue. 1350 * 1351 * @param wt the worker thread 1352 * @return the worker's queue 1353 */ 1354 final WorkQueue registerWorker(ForkJoinWorkerThread wt) { 1355 UncaughtExceptionHandler handler; 1356 wt.setDaemon(true); // configure thread 1357 if ((handler = ueh) != null) 1358 wt.setUncaughtExceptionHandler(handler); 1359 WorkQueue w = new WorkQueue(this, wt); 1360 int tid = 0; // for thread name 1361 int fifo = mode & FIFO; 1362 String prefix = workerNamePrefix; 1363 if (prefix != null) { 1364 synchronized (prefix) { 1365 WorkQueue[] ws = workQueues; int n; 1366 int s = indexSeed += SEED_INCREMENT; 1367 if (ws != null && (n = ws.length) > 1) { 1368 int m = n - 1; 1369 tid = s & m; 1370 int i = m & ((s << 1) | 1); // odd-numbered indices 1371 for (int probes = n >>> 1;;) { // find empty slot 1372 WorkQueue q; 1373 if ((q = ws[i]) == null || q.phase == QUIET) 1374 break; 1375 else if (--probes == 0) { 1376 i = n | 1; // resize below 1377 break; 1378 } 1379 else 1380 i = (i + 2) & m; 1381 } 1382 1383 int id = i | fifo | (s & ~(SMASK | FIFO | DORMANT)); 1384 w.phase = w.id = id; // now publishable 1385 1386 if (i < n) 1387 ws[i] = w; 1388 else { // expand array 1389 int an = n << 1; 1390 WorkQueue[] as = new WorkQueue[an]; 1391 as[i] = w; 1392 int am = an - 1; 1393 for (int j = 0; j < n; ++j) { 1394 WorkQueue v; // copy external queue 1395 if ((v = ws[j]) != null) // position may change 1396 as[v.id & am & SQMASK] = v; 1397 if (++j >= n) 1398 break; 1399 as[j] = ws[j]; // copy worker 1400 } 1401 workQueues = as; 1402 } 1403 } 1404 } 1405 wt.setName(prefix.concat(Integer.toString(tid))); 1406 } 1407 return w; 1408 } 1409 1410 /** 1411 * Final callback from terminating worker, as well as upon failure 1412 * to construct or start a worker. Removes record of worker from 1413 * array, and adjusts counts. If pool is shutting down, tries to 1414 * complete termination. 1415 * 1416 * @param wt the worker thread, or null if construction failed 1417 * @param ex the exception causing failure, or null if none 1418 */ 1419 final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) { 1420 WorkQueue w = null; 1421 int phase = 0; 1422 if (wt != null && (w = wt.workQueue) != null) { 1423 Object lock = workerNamePrefix; 1424 long ns = (long)w.nsteals & 0xffffffffL; 1425 int idx = w.id & SMASK; 1426 if (lock != null) { 1427 WorkQueue[] ws; // remove index from array 1428 synchronized (lock) { 1429 if ((ws = workQueues) != null && ws.length > idx && 1430 ws[idx] == w) 1431 ws[idx] = null; 1432 stealCount += ns; 1433 } 1434 } 1435 phase = w.phase; 1436 } 1437 if (phase != QUIET) { // else pre-adjusted 1438 long c; // decrement counts 1439 do {} while (!CTL.weakCompareAndSet 1440 (this, c = ctl, ((RC_MASK & (c - RC_UNIT)) | 1441 (TC_MASK & (c - TC_UNIT)) | 1442 (SP_MASK & c)))); 1443 } 1444 if (w != null) 1445 w.cancelAll(); // cancel remaining tasks 1446 1447 if (!tryTerminate(false, false) && // possibly replace worker 1448 w != null && w.array != null) // avoid repeated failures 1449 signalWork(); 1450 1451 if (ex == null) // help clean on way out 1452 ForkJoinTask.helpExpungeStaleExceptions(); 1453 else // rethrow 1454 ForkJoinTask.rethrow(ex); 1455 } 1456 1457 /** 1458 * Tries to create or release a worker if too few are running. 1459 */ 1460 final void signalWork() { 1461 for (;;) { 1462 long c; int sp; WorkQueue[] ws; int i; WorkQueue v; 1463 if ((c = ctl) >= 0L) // enough workers 1464 break; 1465 else if ((sp = (int)c) == 0) { // no idle workers 1466 if ((c & ADD_WORKER) != 0L) // too few workers 1467 tryAddWorker(c); 1468 break; 1469 } 1470 else if ((ws = workQueues) == null) 1471 break; // unstarted/terminated 1472 else if (ws.length <= (i = sp & SMASK)) 1473 break; // terminated 1474 else if ((v = ws[i]) == null) 1475 break; // terminating 1476 else { 1477 int np = sp & ~UNSIGNALLED; 1478 int vp = v.phase; 1479 long nc = (v.stackPred & SP_MASK) | (UC_MASK & (c + RC_UNIT)); 1480 Thread vt = v.owner; 1481 if (sp == vp && CTL.compareAndSet(this, c, nc)) { 1482 v.phase = np; 1483 if (v.source < 0) 1484 LockSupport.unpark(vt); 1485 break; 1486 } 1487 } 1488 } 1489 } 1490 1491 /** 1492 * Tries to decrement counts (sometimes implicitly) and possibly 1493 * arrange for a compensating worker in preparation for blocking: 1494 * If not all core workers yet exist, creates one, else if any are 1495 * unreleased (possibly including caller) releases one, else if 1496 * fewer than the minimum allowed number of workers running, 1497 * checks to see that they are all active, and if so creates an 1498 * extra worker unless over maximum limit and policy is to 1499 * saturate. Most of these steps can fail due to interference, in 1500 * which case 0 is returned so caller will retry. A negative 1501 * return value indicates that the caller doesn't need to 1502 * re-adjust counts when later unblocked. 1503 * 1504 * @return 1: block then adjust, -1: block without adjust, 0 : retry 1505 */ 1506 private int tryCompensate(WorkQueue w) { 1507 int t, n, sp; 1508 long c = ctl; 1509 WorkQueue[] ws = workQueues; 1510 if ((t = (short)(c >>> TC_SHIFT)) >= 0) { 1511 if (ws == null || (n = ws.length) <= 0 || w == null) 1512 return 0; // disabled 1513 else if ((sp = (int)c) != 0) { // replace or release 1514 WorkQueue v = ws[sp & (n - 1)]; 1515 int wp = w.phase; 1516 long uc = UC_MASK & ((wp < 0) ? c + RC_UNIT : c); 1517 int np = sp & ~UNSIGNALLED; 1518 if (v != null) { 1519 int vp = v.phase; 1520 Thread vt = v.owner; 1521 long nc = ((long)v.stackPred & SP_MASK) | uc; 1522 if (vp == sp && CTL.compareAndSet(this, c, nc)) { 1523 v.phase = np; 1524 if (v.source < 0) 1525 LockSupport.unpark(vt); 1526 return (wp < 0) ? -1 : 1; 1527 } 1528 } 1529 return 0; 1530 } 1531 else if ((int)(c >> RC_SHIFT) - // reduce parallelism 1532 (short)(bounds & SMASK) > 0) { 1533 long nc = ((RC_MASK & (c - RC_UNIT)) | (~RC_MASK & c)); 1534 return CTL.compareAndSet(this, c, nc) ? 1 : 0; 1535 } 1536 else { // validate 1537 int md = mode, pc = md & SMASK, tc = pc + t, bc = 0; 1538 boolean unstable = false; 1539 for (int i = 1; i < n; i += 2) { 1540 WorkQueue q; Thread wt; Thread.State ts; 1541 if ((q = ws[i]) != null) { 1542 if (q.source == 0) { 1543 unstable = true; 1544 break; 1545 } 1546 else { 1547 --tc; 1548 if ((wt = q.owner) != null && 1549 ((ts = wt.getState()) == Thread.State.BLOCKED || 1550 ts == Thread.State.WAITING)) 1551 ++bc; // worker is blocking 1552 } 1553 } 1554 } 1555 if (unstable || tc != 0 || ctl != c) 1556 return 0; // inconsistent 1557 else if (t + pc >= MAX_CAP || t >= (bounds >>> SWIDTH)) { 1558 Predicate<? super ForkJoinPool> sat; 1559 if ((sat = saturate) != null && sat.test(this)) 1560 return -1; 1561 else if (bc < pc) { // lagging 1562 Thread.yield(); // for retry spins 1563 return 0; 1564 } 1565 else 1566 throw new RejectedExecutionException( 1567 "Thread limit exceeded replacing blocked worker"); 1568 } 1569 } 1570 } 1571 1572 long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK); // expand pool 1573 return CTL.compareAndSet(this, c, nc) && createWorker() ? 1 : 0; 1574 } 1575 1576 /** 1577 * Top-level runloop for workers, called by ForkJoinWorkerThread.run. 1578 * See above for explanation. 1579 */ 1580 final void runWorker(WorkQueue w) { 1581 WorkQueue[] ws; 1582 w.growArray(); // allocate queue 1583 int r = w.id ^ ThreadLocalRandom.nextSecondarySeed(); 1584 if (r == 0) // initial nonzero seed 1585 r = 1; 1586 int lastSignalId = 0; // avoid unneeded signals 1587 while ((ws = workQueues) != null) { 1588 boolean nonempty = false; // scan 1589 for (int n = ws.length, j = n, m = n - 1; j > 0; --j) { 1590 WorkQueue q; int i, b, al; ForkJoinTask<?>[] a; 1591 if ((i = r & m) >= 0 && i < n && // always true 1592 (q = ws[i]) != null && (b = q.base) - q.top < 0 && 1593 (a = q.array) != null && (al = a.length) > 0) { 1594 int qid = q.id; // (never zero) 1595 int index = (al - 1) & b; 1596 ForkJoinTask<?> t = (ForkJoinTask<?>) 1597 QA.getAcquire(a, index); 1598 if (t != null && b++ == q.base && 1599 QA.compareAndSet(a, index, t, null)) { 1600 if ((q.base = b) - q.top < 0 && qid != lastSignalId) 1601 signalWork(); // propagate signal 1602 w.source = lastSignalId = qid; 1603 t.doExec(); 1604 if ((w.id & FIFO) != 0) // run remaining locals 1605 w.localPollAndExec(POLL_LIMIT); 1606 else 1607 w.localPopAndExec(POLL_LIMIT); 1608 ForkJoinWorkerThread thread = w.owner; 1609 ++w.nsteals; 1610 w.source = 0; // now idle 1611 if (thread != null) 1612 thread.afterTopLevelExec(); 1613 } 1614 nonempty = true; 1615 } 1616 else if (nonempty) 1617 break; 1618 else 1619 ++r; 1620 } 1621 1622 if (nonempty) { // move (xorshift) 1623 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; 1624 } 1625 else { 1626 int phase; 1627 lastSignalId = 0; // clear for next scan 1628 if ((phase = w.phase) >= 0) { // enqueue 1629 int np = w.phase = (phase + SS_SEQ) | UNSIGNALLED; 1630 long c, nc; 1631 do { 1632 w.stackPred = (int)(c = ctl); 1633 nc = ((c - RC_UNIT) & UC_MASK) | (SP_MASK & np); 1634 } while (!CTL.weakCompareAndSet(this, c, nc)); 1635 } 1636 else { // already queued 1637 int pred = w.stackPred; 1638 w.source = DORMANT; // enable signal 1639 for (int steps = 0;;) { 1640 int md, rc; long c; 1641 if (w.phase >= 0) { 1642 w.source = 0; 1643 break; 1644 } 1645 else if ((md = mode) < 0) // shutting down 1646 return; 1647 else if ((rc = ((md & SMASK) + // possibly quiescent 1648 (int)((c = ctl) >> RC_SHIFT))) <= 0 && 1649 (md & SHUTDOWN) != 0 && 1650 tryTerminate(false, false)) 1651 return; // help terminate 1652 else if ((++steps & 1) == 0) 1653 Thread.interrupted(); // clear between parks 1654 else if (rc <= 0 && pred != 0 && phase == (int)c) { 1655 long d = keepAlive + System.currentTimeMillis(); 1656 LockSupport.parkUntil(this, d); 1657 if (ctl == c && 1658 d - System.currentTimeMillis() <= TIMEOUT_SLOP) { 1659 long nc = ((UC_MASK & (c - TC_UNIT)) | 1660 (SP_MASK & pred)); 1661 if (CTL.compareAndSet(this, c, nc)) { 1662 w.phase = QUIET; 1663 return; // drop on timeout 1664 } 1665 } 1666 } 1667 else 1668 LockSupport.park(this); 1669 } 1670 } 1671 } 1672 } 1673 } 1674 1675 /** 1676 * Helps and/or blocks until the given task is done or timeout. 1677 * First tries locally helping, then scans other queues for a task 1678 * produced by one of w's stealers; compensating and blocking if 1679 * none are found (rescanning if tryCompensate fails). 1680 * 1681 * @param w caller 1682 * @param task the task 1683 * @param deadline for timed waits, if nonzero 1684 * @return task status on exit 1685 */ 1686 final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) { 1687 int s = 0; 1688 if (w != null && task != null && 1689 (!(task instanceof CountedCompleter) || 1690 (s = w.localHelpCC((CountedCompleter<?>)task, 0)) >= 0)) { 1691 w.tryRemoveAndExec(task); 1692 int src = w.source, id = w.id; 1693 s = task.status; 1694 while (s >= 0) { 1695 WorkQueue[] ws; 1696 boolean nonempty = false; 1697 int r = ThreadLocalRandom.nextSecondarySeed() | 1; // odd indices 1698 if ((ws = workQueues) != null) { // scan for matching id 1699 for (int n = ws.length, m = n - 1, j = -n; j < n; j += 2) { 1700 WorkQueue q; int i, b, al; ForkJoinTask<?>[] a; 1701 if ((i = (r + j) & m) >= 0 && i < n && 1702 (q = ws[i]) != null && q.source == id && 1703 (b = q.base) - q.top < 0 && 1704 (a = q.array) != null && (al = a.length) > 0) { 1705 int qid = q.id; 1706 int index = (al - 1) & b; 1707 ForkJoinTask<?> t = (ForkJoinTask<?>) 1708 QA.getAcquire(a, index); 1709 if (t != null && b++ == q.base && id == q.source && 1710 QA.compareAndSet(a, index, t, null)) { 1711 q.base = b; 1712 w.source = qid; 1713 t.doExec(); 1714 w.source = src; 1715 } 1716 nonempty = true; 1717 break; 1718 } 1719 } 1720 } 1721 if ((s = task.status) < 0) 1722 break; 1723 else if (!nonempty) { 1724 long ms, ns; int block; 1725 if (deadline == 0L) 1726 ms = 0L; // untimed 1727 else if ((ns = deadline - System.nanoTime()) <= 0L) 1728 break; // timeout 1729 else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L) 1730 ms = 1L; // avoid 0 for timed wait 1731 if ((block = tryCompensate(w)) != 0) { 1732 task.internalWait(ms); 1733 CTL.getAndAdd(this, (block > 0) ? RC_UNIT : 0L); 1734 } 1735 s = task.status; 1736 } 1737 } 1738 } 1739 return s; 1740 } 1741 1742 /** 1743 * Runs tasks until {@code isQuiescent()}. Rather than blocking 1744 * when tasks cannot be found, rescans until all others cannot 1745 * find tasks either. 1746 */ 1747 final void helpQuiescePool(WorkQueue w) { 1748 int prevSrc = w.source, fifo = w.id & FIFO; 1749 for (int source = prevSrc, released = -1;;) { // -1 until known 1750 WorkQueue[] ws; 1751 if (fifo != 0) 1752 w.localPollAndExec(0); 1753 else 1754 w.localPopAndExec(0); 1755 if (released == -1 && w.phase >= 0) 1756 released = 1; 1757 boolean quiet = true, empty = true; 1758 int r = ThreadLocalRandom.nextSecondarySeed(); 1759 if ((ws = workQueues) != null) { 1760 for (int n = ws.length, j = n, m = n - 1; j > 0; --j) { 1761 WorkQueue q; int i, b, al; ForkJoinTask<?>[] a; 1762 if ((i = (r - j) & m) >= 0 && i < n && (q = ws[i]) != null) { 1763 if ((b = q.base) - q.top < 0 && 1764 (a = q.array) != null && (al = a.length) > 0) { 1765 int qid = q.id; 1766 if (released == 0) { // increment 1767 released = 1; 1768 CTL.getAndAdd(this, RC_UNIT); 1769 } 1770 int index = (al - 1) & b; 1771 ForkJoinTask<?> t = (ForkJoinTask<?>) 1772 QA.getAcquire(a, index); 1773 if (t != null && b++ == q.base && 1774 QA.compareAndSet(a, index, t, null)) { 1775 q.base = b; 1776 w.source = source = q.id; 1777 t.doExec(); 1778 w.source = source = prevSrc; 1779 } 1780 quiet = empty = false; 1781 break; 1782 } 1783 else if ((q.source & QUIET) == 0) 1784 quiet = false; 1785 } 1786 } 1787 } 1788 if (quiet) { 1789 if (released == 0) 1790 CTL.getAndAdd(this, RC_UNIT); 1791 w.source = prevSrc; 1792 break; 1793 } 1794 else if (empty) { 1795 if (source != QUIET) 1796 w.source = source = QUIET; 1797 if (released == 1) { // decrement 1798 released = 0; 1799 CTL.getAndAdd(this, RC_MASK & -RC_UNIT); 1800 } 1801 } 1802 } 1803 } 1804 1805 /** 1806 * Scans for and returns a polled task, if available. 1807 * Used only for untracked polls. 1808 * 1809 * @param submissionsOnly if true, only scan submission queues 1810 */ 1811 private ForkJoinTask<?> pollScan(boolean submissionsOnly) { 1812 WorkQueue[] ws; int n; 1813 rescan: while ((mode & STOP) == 0 && (ws = workQueues) != null && 1814 (n = ws.length) > 0) { 1815 int m = n - 1; 1816 int r = ThreadLocalRandom.nextSecondarySeed(); 1817 int h = r >>> 16; 1818 int origin, step; 1819 if (submissionsOnly) { 1820 origin = (r & ~1) & m; // even indices and steps 1821 step = (h & ~1) | 2; 1822 } 1823 else { 1824 origin = r & m; 1825 step = h | 1; 1826 } 1827 for (int k = origin, oldSum = 0, checkSum = 0;;) { 1828 WorkQueue q; int b, al; ForkJoinTask<?>[] a; 1829 if ((q = ws[k]) != null) { 1830 checkSum += b = q.base; 1831 if (b - q.top < 0 && 1832 (a = q.array) != null && (al = a.length) > 0) { 1833 int index = (al - 1) & b; 1834 ForkJoinTask<?> t = (ForkJoinTask<?>) 1835 QA.getAcquire(a, index); 1836 if (t != null && b++ == q.base && 1837 QA.compareAndSet(a, index, t, null)) { 1838 q.base = b; 1839 return t; 1840 } 1841 else 1842 break; // restart 1843 } 1844 } 1845 if ((k = (k + step) & m) == origin) { 1846 if (oldSum == (oldSum = checkSum)) 1847 break rescan; 1848 checkSum = 0; 1849 } 1850 } 1851 } 1852 return null; 1853 } 1854 1855 /** 1856 * Gets and removes a local or stolen task for the given worker. 1857 * 1858 * @return a task, if available 1859 */ 1860 final ForkJoinTask<?> nextTaskFor(WorkQueue w) { 1861 ForkJoinTask<?> t; 1862 if (w != null && 1863 (t = (w.id & FIFO) != 0 ? w.poll() : w.pop()) != null) 1864 return t; 1865 else 1866 return pollScan(false); 1867 } 1868 1869 // External operations 1870 1871 /** 1872 * Adds the given task to a submission queue at submitter's 1873 * current queue, creating one if null or contended. 1874 * 1875 * @param task the task. Caller must ensure non-null. 1876 */ 1877 final void externalPush(ForkJoinTask<?> task) { 1878 int r; // initialize caller's probe 1879 if ((r = ThreadLocalRandom.getProbe()) == 0) { 1880 ThreadLocalRandom.localInit(); 1881 r = ThreadLocalRandom.getProbe(); 1882 } 1883 for (;;) { 1884 int md = mode, n; 1885 WorkQueue[] ws = workQueues; 1886 if ((md & SHUTDOWN) != 0 || ws == null || (n = ws.length) <= 0) 1887 throw new RejectedExecutionException(); 1888 else { 1889 WorkQueue q; 1890 boolean push = false, grow = false; 1891 if ((q = ws[(n - 1) & r & SQMASK]) == null) { 1892 Object lock = workerNamePrefix; 1893 int qid = (r | QUIET) & ~(FIFO | OWNED); 1894 q = new WorkQueue(this, null); 1895 q.id = qid; 1896 q.source = QUIET; 1897 q.phase = QLOCK; // lock queue 1898 if (lock != null) { 1899 synchronized (lock) { // lock pool to install 1900 int i; 1901 if ((ws = workQueues) != null && 1902 (n = ws.length) > 0 && 1903 ws[i = qid & (n - 1) & SQMASK] == null) { 1904 ws[i] = q; 1905 push = grow = true; 1906 } 1907 } 1908 } 1909 } 1910 else if (q.tryLockSharedQueue()) { 1911 int b = q.base, s = q.top, al, d; ForkJoinTask<?>[] a; 1912 if ((a = q.array) != null && (al = a.length) > 0 && 1913 al - 1 + (d = b - s) > 0) { 1914 a[(al - 1) & s] = task; 1915 q.top = s + 1; // relaxed writes OK here 1916 q.phase = 0; 1917 if (d < 0 && q.base - s < -1) 1918 break; // no signal needed 1919 } 1920 else 1921 grow = true; 1922 push = true; 1923 } 1924 if (push) { 1925 if (grow) { 1926 try { 1927 q.growArray(); 1928 int s = q.top, al; ForkJoinTask<?>[] a; 1929 if ((a = q.array) != null && (al = a.length) > 0) { 1930 a[(al - 1) & s] = task; 1931 q.top = s + 1; 1932 } 1933 } finally { 1934 q.phase = 0; 1935 } 1936 } 1937 signalWork(); 1938 break; 1939 } 1940 else // move if busy 1941 r = ThreadLocalRandom.advanceProbe(r); 1942 } 1943 } 1944 } 1945 1946 /** 1947 * Pushes a possibly-external submission. 1948 */ 1949 private <T> ForkJoinTask<T> externalSubmit(ForkJoinTask<T> task) { 1950 Thread t; ForkJoinWorkerThread w; WorkQueue q; 1951 if (task == null) 1952 throw new NullPointerException(); 1953 if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) && 1954 (w = (ForkJoinWorkerThread)t).pool == this && 1955 (q = w.workQueue) != null) 1956 q.push(task); 1957 else 1958 externalPush(task); 1959 return task; 1960 } 1961 1962 /** 1963 * Returns common pool queue for an external thread. 1964 */ 1965 static WorkQueue commonSubmitterQueue() { 1966 ForkJoinPool p = common; 1967 int r = ThreadLocalRandom.getProbe(); 1968 WorkQueue[] ws; int n; 1969 return (p != null && (ws = p.workQueues) != null && 1970 (n = ws.length) > 0) ? 1971 ws[(n - 1) & r & SQMASK] : null; 1972 } 1973 1974 /** 1975 * Performs tryUnpush for an external submitter. 1976 */ 1977 final boolean tryExternalUnpush(ForkJoinTask<?> task) { 1978 int r = ThreadLocalRandom.getProbe(); 1979 WorkQueue[] ws; WorkQueue w; int n; 1980 return ((ws = workQueues) != null && 1981 (n = ws.length) > 0 && 1982 (w = ws[(n - 1) & r & SQMASK]) != null && 1983 w.trySharedUnpush(task)); 1984 } 1985 1986 /** 1987 * Performs helpComplete for an external submitter. 1988 */ 1989 final int externalHelpComplete(CountedCompleter<?> task, int maxTasks) { 1990 int r = ThreadLocalRandom.getProbe(); 1991 WorkQueue[] ws; WorkQueue w; int n; 1992 return ((ws = workQueues) != null && (n = ws.length) > 0 && 1993 (w = ws[(n - 1) & r & SQMASK]) != null) ? 1994 w.sharedHelpCC(task, maxTasks) : 0; 1995 } 1996 1997 /** 1998 * Tries to steal and run tasks within the target's computation. 1999 * The maxTasks argument supports external usages; internal calls 2000 * use zero, allowing unbounded steps (external calls trap 2001 * non-positive values). 2002 * 2003 * @param w caller 2004 * @param maxTasks if non-zero, the maximum number of other tasks to run 2005 * @return task status on exit 2006 */ 2007 final int helpComplete(WorkQueue w, CountedCompleter<?> task, 2008 int maxTasks) { 2009 return (w == null) ? 0 : w.localHelpCC(task, maxTasks); 2010 } 2011 2012 /** 2013 * Returns a cheap heuristic guide for task partitioning when 2014 * programmers, frameworks, tools, or languages have little or no 2015 * idea about task granularity. In essence, by offering this 2016 * method, we ask users only about tradeoffs in overhead vs 2017 * expected throughput and its variance, rather than how finely to 2018 * partition tasks. 2019 * 2020 * In a steady state strict (tree-structured) computation, each 2021 * thread makes available for stealing enough tasks for other 2022 * threads to remain active. Inductively, if all threads play by 2023 * the same rules, each thread should make available only a 2024 * constant number of tasks. 2025 * 2026 * The minimum useful constant is just 1. But using a value of 1 2027 * would require immediate replenishment upon each steal to 2028 * maintain enough tasks, which is infeasible. Further, 2029 * partitionings/granularities of offered tasks should minimize 2030 * steal rates, which in general means that threads nearer the top 2031 * of computation tree should generate more than those nearer the 2032 * bottom. In perfect steady state, each thread is at 2033 * approximately the same level of computation tree. However, 2034 * producing extra tasks amortizes the uncertainty of progress and 2035 * diffusion assumptions. 2036 * 2037 * So, users will want to use values larger (but not much larger) 2038 * than 1 to both smooth over transient shortages and hedge 2039 * against uneven progress; as traded off against the cost of 2040 * extra task overhead. We leave the user to pick a threshold 2041 * value to compare with the results of this call to guide 2042 * decisions, but recommend values such as 3. 2043 * 2044 * When all threads are active, it is on average OK to estimate 2045 * surplus strictly locally. In steady-state, if one thread is 2046 * maintaining say 2 surplus tasks, then so are others. So we can 2047 * just use estimated queue length. However, this strategy alone 2048 * leads to serious mis-estimates in some non-steady-state 2049 * conditions (ramp-up, ramp-down, other stalls). We can detect 2050 * many of these by further considering the number of "idle" 2051 * threads, that are known to have zero queued tasks, so 2052 * compensate by a factor of (#idle/#active) threads. 2053 */ 2054 static int getSurplusQueuedTaskCount() { 2055 Thread t; ForkJoinWorkerThread wt; ForkJoinPool pool; WorkQueue q; 2056 if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) && 2057 (pool = (wt = (ForkJoinWorkerThread)t).pool) != null && 2058 (q = wt.workQueue) != null) { 2059 int p = pool.mode & SMASK; 2060 int a = p + (int)(pool.ctl >> RC_SHIFT); 2061 int n = q.top - q.base; 2062 return n - (a > (p >>>= 1) ? 0 : 2063 a > (p >>>= 1) ? 1 : 2064 a > (p >>>= 1) ? 2 : 2065 a > (p >>>= 1) ? 4 : 2066 8); 2067 } 2068 return 0; 2069 } 2070 2071 // Termination 2072 2073 /** 2074 * Possibly initiates and/or completes termination. 2075 * 2076 * @param now if true, unconditionally terminate, else only 2077 * if no work and no active workers 2078 * @param enable if true, terminate when next possible 2079 * @return true if terminating or terminated 2080 */ 2081 private boolean tryTerminate(boolean now, boolean enable) { 2082 int md; // 3 phases: try to set SHUTDOWN, then STOP, then TERMINATED 2083 2084 while (((md = mode) & SHUTDOWN) == 0) { 2085 if (!enable || this == common) // cannot shutdown 2086 return false; 2087 else 2088 MODE.compareAndSet(this, md, md | SHUTDOWN); 2089 } 2090 2091 while (((md = mode) & STOP) == 0) { // try to initiate termination 2092 if (!now) { // check if quiescent & empty 2093 for (long oldSum = 0L;;) { // repeat until stable 2094 boolean running = false; 2095 long checkSum = ctl; 2096 WorkQueue[] ws = workQueues; 2097 if ((md & SMASK) + (int)(checkSum >> RC_SHIFT) > 0) 2098 running = true; 2099 else if (ws != null) { 2100 WorkQueue w; int b; 2101 for (int i = 0; i < ws.length; ++i) { 2102 if ((w = ws[i]) != null) { 2103 checkSum += (b = w.base) + w.id; 2104 if (b != w.top || 2105 ((i & 1) == 1 && w.source >= 0)) { 2106 running = true; 2107 break; 2108 } 2109 } 2110 } 2111 } 2112 if (((md = mode) & STOP) != 0) 2113 break; // already triggered 2114 else if (running) 2115 return false; 2116 else if (workQueues == ws && oldSum == (oldSum = checkSum)) 2117 break; 2118 } 2119 } 2120 if ((md & STOP) == 0) 2121 MODE.compareAndSet(this, md, md | STOP); 2122 } 2123 2124 while (((md = mode) & TERMINATED) == 0) { // help terminate others 2125 for (long oldSum = 0L;;) { // repeat until stable 2126 WorkQueue[] ws; WorkQueue w; 2127 long checkSum = ctl; 2128 if ((ws = workQueues) != null) { 2129 for (int i = 0; i < ws.length; ++i) { 2130 if ((w = ws[i]) != null) { 2131 ForkJoinWorkerThread wt = w.owner; 2132 w.cancelAll(); // clear queues 2133 if (wt != null) { 2134 try { // unblock join or park 2135 wt.interrupt(); 2136 } catch (Throwable ignore) { 2137 } 2138 } 2139 checkSum += w.base + w.id; 2140 } 2141 } 2142 } 2143 if (((md = mode) & TERMINATED) != 0 || 2144 (workQueues == ws && oldSum == (oldSum = checkSum))) 2145 break; 2146 } 2147 if ((md & TERMINATED) != 0) 2148 break; 2149 else if ((md & SMASK) + (short)(ctl >>> TC_SHIFT) > 0) 2150 break; 2151 else if (MODE.compareAndSet(this, md, md | TERMINATED)) { 2152 synchronized (this) { 2153 notifyAll(); // for awaitTermination 2154 } 2155 break; 2156 } 2157 } 2158 return true; 2159 } 2160 2161 // Exported methods 2162 2163 // Constructors 2164 2165 /** 2166 * Creates a {@code ForkJoinPool} with parallelism equal to {@link 2167 * java.lang.Runtime#availableProcessors}, using defaults for all 2168 * other parameters (see {@link #ForkJoinPool(int, 2169 * ForkJoinWorkerThreadFactory, UncaughtExceptionHandler, boolean, 2170 * int, int, int, Predicate, long, TimeUnit)}). 2171 * 2172 * @throws SecurityException if a security manager exists and 2173 * the caller is not permitted to modify threads 2174 * because it does not hold {@link 2175 * java.lang.RuntimePermission}{@code ("modifyThread")} 2176 */ 2177 public ForkJoinPool() { 2178 this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()), 2179 defaultForkJoinWorkerThreadFactory, null, false, 2180 0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS); 2181 } 2182 2183 /** 2184 * Creates a {@code ForkJoinPool} with the indicated parallelism 2185 * level, using defaults for all other parameters (see {@link 2186 * #ForkJoinPool(int, ForkJoinWorkerThreadFactory, 2187 * UncaughtExceptionHandler, boolean, int, int, int, Predicate, 2188 * long, TimeUnit)}). 2189 * 2190 * @param parallelism the parallelism level 2191 * @throws IllegalArgumentException if parallelism less than or 2192 * equal to zero, or greater than implementation limit 2193 * @throws SecurityException if a security manager exists and 2194 * the caller is not permitted to modify threads 2195 * because it does not hold {@link 2196 * java.lang.RuntimePermission}{@code ("modifyThread")} 2197 */ 2198 public ForkJoinPool(int parallelism) { 2199 this(parallelism, defaultForkJoinWorkerThreadFactory, null, false, 2200 0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS); 2201 } 2202 2203 /** 2204 * Creates a {@code ForkJoinPool} with the given parameters (using 2205 * defaults for others -- see {@link #ForkJoinPool(int, 2206 * ForkJoinWorkerThreadFactory, UncaughtExceptionHandler, boolean, 2207 * int, int, int, Predicate, long, TimeUnit)}). 2208 * 2209 * @param parallelism the parallelism level. For default value, 2210 * use {@link java.lang.Runtime#availableProcessors}. 2211 * @param factory the factory for creating new threads. For default value, 2212 * use {@link #defaultForkJoinWorkerThreadFactory}. 2213 * @param handler the handler for internal worker threads that 2214 * terminate due to unrecoverable errors encountered while executing 2215 * tasks. For default value, use {@code null}. 2216 * @param asyncMode if true, 2217 * establishes local first-in-first-out scheduling mode for forked 2218 * tasks that are never joined. This mode may be more appropriate 2219 * than default locally stack-based mode in applications in which 2220 * worker threads only process event-style asynchronous tasks. 2221 * For default value, use {@code false}. 2222 * @throws IllegalArgumentException if parallelism less than or 2223 * equal to zero, or greater than implementation limit 2224 * @throws NullPointerException if the factory is null 2225 * @throws SecurityException if a security manager exists and 2226 * the caller is not permitted to modify threads 2227 * because it does not hold {@link 2228 * java.lang.RuntimePermission}{@code ("modifyThread")} 2229 */ 2230 public ForkJoinPool(int parallelism, 2231 ForkJoinWorkerThreadFactory factory, 2232 UncaughtExceptionHandler handler, 2233 boolean asyncMode) { 2234 this(parallelism, factory, handler, asyncMode, 2235 0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS); 2236 } 2237 2238 /** 2239 * Creates a {@code ForkJoinPool} with the given parameters. 2240 * 2241 * @param parallelism the parallelism level. For default value, 2242 * use {@link java.lang.Runtime#availableProcessors}. 2243 * 2244 * @param factory the factory for creating new threads. For 2245 * default value, use {@link #defaultForkJoinWorkerThreadFactory}. 2246 * 2247 * @param handler the handler for internal worker threads that 2248 * terminate due to unrecoverable errors encountered while 2249 * executing tasks. For default value, use {@code null}. 2250 * 2251 * @param asyncMode if true, establishes local first-in-first-out 2252 * scheduling mode for forked tasks that are never joined. This 2253 * mode may be more appropriate than default locally stack-based 2254 * mode in applications in which worker threads only process 2255 * event-style asynchronous tasks. For default value, use {@code 2256 * false}. 2257 * 2258 * @param corePoolSize the number of threads to keep in the pool 2259 * (unless timed out after an elapsed keep-alive). Normally (and 2260 * by default) this is the same value as the parallelism level, 2261 * but may be set to a larger value to reduce dynamic overhead if 2262 * tasks regularly block. Using a smaller value (for example 2263 * {@code 0}) has the same effect as the default. 2264 * 2265 * @param maximumPoolSize the maximum number of threads allowed. 2266 * When the maximum is reached, attempts to replace blocked 2267 * threads fail. (However, because creation and termination of 2268 * different threads may overlap, and may be managed by the given 2269 * thread factory, this value may be transiently exceeded.) To 2270 * arrange the same value as is used by default for the common 2271 * pool, use {@code 256} plus the {@code parallelism} level. (By 2272 * default, the common pool allows a maximum of 256 spare 2273 * threads.) Using a value (for example {@code 2274 * Integer.MAX_VALUE}) larger than the implementation's total 2275 * thread limit has the same effect as using this limit (which is 2276 * the default). 2277 * 2278 * @param minimumRunnable the minimum allowed number of core 2279 * threads not blocked by a join or {@link ManagedBlocker}. To 2280 * ensure progress, when too few unblocked threads exist and 2281 * unexecuted tasks may exist, new threads are constructed, up to 2282 * the given maximumPoolSize. For the default value, use {@code 2283 * 1}, that ensures liveness. A larger value might improve 2284 * throughput in the presence of blocked activities, but might 2285 * not, due to increased overhead. A value of zero may be 2286 * acceptable when submitted tasks cannot have dependencies 2287 * requiring additional threads. 2288 * 2289 * @param saturate if non-null, a predicate invoked upon attempts 2290 * to create more than the maximum total allowed threads. By 2291 * default, when a thread is about to block on a join or {@link 2292 * ManagedBlocker}, but cannot be replaced because the 2293 * maximumPoolSize would be exceeded, a {@link 2294 * RejectedExecutionException} is thrown. But if this predicate 2295 * returns {@code true}, then no exception is thrown, so the pool 2296 * continues to operate with fewer than the target number of 2297 * runnable threads, which might not ensure progress. 2298 * 2299 * @param keepAliveTime the elapsed time since last use before 2300 * a thread is terminated (and then later replaced if needed). 2301 * For the default value, use {@code 60, TimeUnit.SECONDS}. 2302 * 2303 * @param unit the time unit for the {@code keepAliveTime} argument 2304 * 2305 * @throws IllegalArgumentException if parallelism is less than or 2306 * equal to zero, or is greater than implementation limit, 2307 * or if maximumPoolSize is less than parallelism, 2308 * of if the keepAliveTime is less than or equal to zero. 2309 * @throws NullPointerException if the factory is null 2310 * @throws SecurityException if a security manager exists and 2311 * the caller is not permitted to modify threads 2312 * because it does not hold {@link 2313 * java.lang.RuntimePermission}{@code ("modifyThread")} 2314 * @since 9 2315 */ 2316 public ForkJoinPool(int parallelism, 2317 ForkJoinWorkerThreadFactory factory, 2318 UncaughtExceptionHandler handler, 2319 boolean asyncMode, 2320 int corePoolSize, 2321 int maximumPoolSize, 2322 int minimumRunnable, 2323 Predicate<? super ForkJoinPool> saturate, 2324 long keepAliveTime, 2325 TimeUnit unit) { 2326 // check, encode, pack parameters 2327 if (parallelism <= 0 || parallelism > MAX_CAP || 2328 maximumPoolSize < parallelism || keepAliveTime <= 0L) 2329 throw new IllegalArgumentException(); 2330 if (factory == null) 2331 throw new NullPointerException(); 2332 long ms = Math.max(unit.toMillis(keepAliveTime), TIMEOUT_SLOP); 2333 2334 int corep = Math.min(Math.max(corePoolSize, parallelism), MAX_CAP); 2335 long c = ((((long)(-corep) << TC_SHIFT) & TC_MASK) | 2336 (((long)(-parallelism) << RC_SHIFT) & RC_MASK)); 2337 int m = parallelism | (asyncMode ? FIFO : 0); 2338 int maxSpares = Math.min(maximumPoolSize, MAX_CAP) - parallelism; 2339 int minAvail = Math.min(Math.max(minimumRunnable, 0), MAX_CAP); 2340 int b = ((minAvail - parallelism) & SMASK) | (maxSpares << SWIDTH); 2341 int n = (parallelism > 1) ? parallelism - 1 : 1; // at least 2 slots 2342 n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; 2343 n = (n + 1) << 1; // power of two, including space for submission queues 2344 2345 this.workerNamePrefix = "ForkJoinPool-" + nextPoolId() + "-worker-"; 2346 this.workQueues = new WorkQueue[n]; 2347 this.factory = factory; 2348 this.ueh = handler; 2349 this.saturate = saturate; 2350 this.keepAlive = ms; 2351 this.bounds = b; 2352 this.mode = m; 2353 this.ctl = c; 2354 checkPermission(); 2355 } 2356 2357 private static Object newInstanceFromSystemProperty(String property) 2358 throws ReflectiveOperationException { 2359 String className = System.getProperty(property); 2360 return (className == null) 2361 ? null 2362 : ClassLoader.getSystemClassLoader().loadClass(className) 2363 .getConstructor().newInstance(); 2364 } 2365 2366 /** 2367 * Constructor for common pool using parameters possibly 2368 * overridden by system properties 2369 */ 2370 private ForkJoinPool(byte forCommonPoolOnly) { 2371 int parallelism = -1; 2372 ForkJoinWorkerThreadFactory fac = null; 2373 UncaughtExceptionHandler handler = null; 2374 try { // ignore exceptions in accessing/parsing properties 2375 String pp = System.getProperty 2376 ("java.util.concurrent.ForkJoinPool.common.parallelism"); 2377 if (pp != null) 2378 parallelism = Integer.parseInt(pp); 2379 fac = (ForkJoinWorkerThreadFactory) newInstanceFromSystemProperty( 2380 "java.util.concurrent.ForkJoinPool.common.threadFactory"); 2381 handler = (UncaughtExceptionHandler) newInstanceFromSystemProperty( 2382 "java.util.concurrent.ForkJoinPool.common.exceptionHandler"); 2383 } catch (Exception ignore) { 2384 } 2385 2386 if (fac == null) { 2387 if (System.getSecurityManager() == null) 2388 fac = defaultForkJoinWorkerThreadFactory; 2389 else // use security-managed default 2390 fac = new InnocuousForkJoinWorkerThreadFactory(); 2391 } 2392 if (parallelism < 0 && // default 1 less than #cores 2393 (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0) 2394 parallelism = 1; 2395 if (parallelism > MAX_CAP) 2396 parallelism = MAX_CAP; 2397 2398 long c = ((((long)(-parallelism) << TC_SHIFT) & TC_MASK) | 2399 (((long)(-parallelism) << RC_SHIFT) & RC_MASK)); 2400 int b = ((1 - parallelism) & SMASK) | (COMMON_MAX_SPARES << SWIDTH); 2401 int n = (parallelism > 1) ? parallelism - 1 : 1; 2402 n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; 2403 n = (n + 1) << 1; 2404 2405 this.workerNamePrefix = "ForkJoinPool.commonPool-worker-"; 2406 this.workQueues = new WorkQueue[n]; 2407 this.factory = fac; 2408 this.ueh = handler; 2409 this.saturate = null; 2410 this.keepAlive = DEFAULT_KEEPALIVE; 2411 this.bounds = b; 2412 this.mode = parallelism; 2413 this.ctl = c; 2414 } 2415 2416 /** 2417 * Returns the common pool instance. This pool is statically 2418 * constructed; its run state is unaffected by attempts to {@link 2419 * #shutdown} or {@link #shutdownNow}. However this pool and any 2420 * ongoing processing are automatically terminated upon program 2421 * {@link System#exit}. Any program that relies on asynchronous 2422 * task processing to complete before program termination should 2423 * invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence}, 2424 * before exit. 2425 * 2426 * @return the common pool instance 2427 * @since 1.8 2428 */ 2429 public static ForkJoinPool commonPool() { 2430 // assert common != null : "static init error"; 2431 return common; 2432 } 2433 2434 // Execution methods 2435 2436 /** 2437 * Performs the given task, returning its result upon completion. 2438 * If the computation encounters an unchecked Exception or Error, 2439 * it is rethrown as the outcome of this invocation. Rethrown 2440 * exceptions behave in the same way as regular exceptions, but, 2441 * when possible, contain stack traces (as displayed for example 2442 * using {@code ex.printStackTrace()}) of both the current thread 2443 * as well as the thread actually encountering the exception; 2444 * minimally only the latter. 2445 * 2446 * @param task the task 2447 * @param <T> the type of the task's result 2448 * @return the task's result 2449 * @throws NullPointerException if the task is null 2450 * @throws RejectedExecutionException if the task cannot be 2451 * scheduled for execution 2452 */ 2453 public <T> T invoke(ForkJoinTask<T> task) { 2454 if (task == null) 2455 throw new NullPointerException(); 2456 externalSubmit(task); 2457 return task.join(); 2458 } 2459 2460 /** 2461 * Arranges for (asynchronous) execution of the given task. 2462 * 2463 * @param task the task 2464 * @throws NullPointerException if the task is null 2465 * @throws RejectedExecutionException if the task cannot be 2466 * scheduled for execution 2467 */ 2468 public void execute(ForkJoinTask<?> task) { 2469 externalSubmit(task); 2470 } 2471 2472 // AbstractExecutorService methods 2473 2474 /** 2475 * @throws NullPointerException if the task is null 2476 * @throws RejectedExecutionException if the task cannot be 2477 * scheduled for execution 2478 */ 2479 public void execute(Runnable task) { 2480 if (task == null) 2481 throw new NullPointerException(); 2482 ForkJoinTask<?> job; 2483 if (task instanceof ForkJoinTask<?>) // avoid re-wrap 2484 job = (ForkJoinTask<?>) task; 2485 else 2486 job = new ForkJoinTask.RunnableExecuteAction(task); 2487 externalSubmit(job); 2488 } 2489 2490 /** 2491 * Submits a ForkJoinTask for execution. 2492 * 2493 * @param task the task to submit 2494 * @param <T> the type of the task's result 2495 * @return the task 2496 * @throws NullPointerException if the task is null 2497 * @throws RejectedExecutionException if the task cannot be 2498 * scheduled for execution 2499 */ 2500 public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) { 2501 return externalSubmit(task); 2502 } 2503 2504 /** 2505 * @throws NullPointerException if the task is null 2506 * @throws RejectedExecutionException if the task cannot be 2507 * scheduled for execution 2508 */ 2509 public <T> ForkJoinTask<T> submit(Callable<T> task) { 2510 return externalSubmit(new ForkJoinTask.AdaptedCallable<T>(task)); 2511 } 2512 2513 /** 2514 * @throws NullPointerException if the task is null 2515 * @throws RejectedExecutionException if the task cannot be 2516 * scheduled for execution 2517 */ 2518 public <T> ForkJoinTask<T> submit(Runnable task, T result) { 2519 return externalSubmit(new ForkJoinTask.AdaptedRunnable<T>(task, result)); 2520 } 2521 2522 /** 2523 * @throws NullPointerException if the task is null 2524 * @throws RejectedExecutionException if the task cannot be 2525 * scheduled for execution 2526 */ 2527 @SuppressWarnings("unchecked") 2528 public ForkJoinTask<?> submit(Runnable task) { 2529 if (task == null) 2530 throw new NullPointerException(); 2531 return externalSubmit((task instanceof ForkJoinTask<?>) 2532 ? (ForkJoinTask<Void>) task // avoid re-wrap 2533 : new ForkJoinTask.AdaptedRunnableAction(task)); 2534 } 2535 2536 /** 2537 * @throws NullPointerException {@inheritDoc} 2538 * @throws RejectedExecutionException {@inheritDoc} 2539 */ 2540 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) { 2541 // In previous versions of this class, this method constructed 2542 // a task to run ForkJoinTask.invokeAll, but now external 2543 // invocation of multiple tasks is at least as efficient. 2544 ArrayList<Future<T>> futures = new ArrayList<>(tasks.size()); 2545 2546 try { 2547 for (Callable<T> t : tasks) { 2548 ForkJoinTask<T> f = new ForkJoinTask.AdaptedCallable<T>(t); 2549 futures.add(f); 2550 externalSubmit(f); 2551 } 2552 for (int i = 0, size = futures.size(); i < size; i++) 2553 ((ForkJoinTask<?>)futures.get(i)).quietlyJoin(); 2554 return futures; 2555 } catch (Throwable t) { 2556 for (int i = 0, size = futures.size(); i < size; i++) 2557 futures.get(i).cancel(false); 2558 throw t; 2559 } 2560 } 2561 2562 /** 2563 * Returns the factory used for constructing new workers. 2564 * 2565 * @return the factory used for constructing new workers 2566 */ 2567 public ForkJoinWorkerThreadFactory getFactory() { 2568 return factory; 2569 } 2570 2571 /** 2572 * Returns the handler for internal worker threads that terminate 2573 * due to unrecoverable errors encountered while executing tasks. 2574 * 2575 * @return the handler, or {@code null} if none 2576 */ 2577 public UncaughtExceptionHandler getUncaughtExceptionHandler() { 2578 return ueh; 2579 } 2580 2581 /** 2582 * Returns the targeted parallelism level of this pool. 2583 * 2584 * @return the targeted parallelism level of this pool 2585 */ 2586 public int getParallelism() { 2587 int par = mode & SMASK; 2588 return (par > 0) ? par : 1; 2589 } 2590 2591 /** 2592 * Returns the targeted parallelism level of the common pool. 2593 * 2594 * @return the targeted parallelism level of the common pool 2595 * @since 1.8 2596 */ 2597 public static int getCommonPoolParallelism() { 2598 return COMMON_PARALLELISM; 2599 } 2600 2601 /** 2602 * Returns the number of worker threads that have started but not 2603 * yet terminated. The result returned by this method may differ 2604 * from {@link #getParallelism} when threads are created to 2605 * maintain parallelism when others are cooperatively blocked. 2606 * 2607 * @return the number of worker threads 2608 */ 2609 public int getPoolSize() { 2610 return ((mode & SMASK) + (short)(ctl >>> TC_SHIFT)); 2611 } 2612 2613 /** 2614 * Returns {@code true} if this pool uses local first-in-first-out 2615 * scheduling mode for forked tasks that are never joined. 2616 * 2617 * @return {@code true} if this pool uses async mode 2618 */ 2619 public boolean getAsyncMode() { 2620 return (mode & FIFO) != 0; 2621 } 2622 2623 /** 2624 * Returns an estimate of the number of worker threads that are 2625 * not blocked waiting to join tasks or for other managed 2626 * synchronization. This method may overestimate the 2627 * number of running threads. 2628 * 2629 * @return the number of worker threads 2630 */ 2631 public int getRunningThreadCount() { 2632 int rc = 0; 2633 WorkQueue[] ws; WorkQueue w; 2634 if ((ws = workQueues) != null) { 2635 for (int i = 1; i < ws.length; i += 2) { 2636 if ((w = ws[i]) != null && w.isApparentlyUnblocked()) 2637 ++rc; 2638 } 2639 } 2640 return rc; 2641 } 2642 2643 /** 2644 * Returns an estimate of the number of threads that are currently 2645 * stealing or executing tasks. This method may overestimate the 2646 * number of active threads. 2647 * 2648 * @return the number of active threads 2649 */ 2650 public int getActiveThreadCount() { 2651 int r = (mode & SMASK) + (int)(ctl >> RC_SHIFT); 2652 return (r <= 0) ? 0 : r; // suppress momentarily negative values 2653 } 2654 2655 /** 2656 * Returns {@code true} if all worker threads are currently idle. 2657 * An idle worker is one that cannot obtain a task to execute 2658 * because none are available to steal from other threads, and 2659 * there are no pending submissions to the pool. This method is 2660 * conservative; it might not return {@code true} immediately upon 2661 * idleness of all threads, but will eventually become true if 2662 * threads remain inactive. 2663 * 2664 * @return {@code true} if all threads are currently idle 2665 */ 2666 public boolean isQuiescent() { 2667 for (;;) { 2668 long c = ctl; 2669 int md = mode, pc = md & SMASK; 2670 int tc = pc + (short)(c >>> TC_SHIFT); 2671 int rc = pc + (int)(c >> RC_SHIFT); 2672 if ((md & (STOP | TERMINATED)) != 0) 2673 return true; 2674 else if (rc > 0) 2675 return false; 2676 else { 2677 WorkQueue[] ws; WorkQueue v; 2678 if ((ws = workQueues) != null) { 2679 for (int i = 1; i < ws.length; i += 2) { 2680 if ((v = ws[i]) != null) { 2681 if ((v.source & QUIET) == 0) 2682 return false; 2683 --tc; 2684 } 2685 } 2686 } 2687 if (tc == 0 && ctl == c) 2688 return true; 2689 } 2690 } 2691 } 2692 2693 /** 2694 * Returns an estimate of the total number of tasks stolen from 2695 * one thread's work queue by another. The reported value 2696 * underestimates the actual total number of steals when the pool 2697 * is not quiescent. This value may be useful for monitoring and 2698 * tuning fork/join programs: in general, steal counts should be 2699 * high enough to keep threads busy, but low enough to avoid 2700 * overhead and contention across threads. 2701 * 2702 * @return the number of steals 2703 */ 2704 public long getStealCount() { 2705 long count = stealCount; 2706 WorkQueue[] ws; WorkQueue w; 2707 if ((ws = workQueues) != null) { 2708 for (int i = 1; i < ws.length; i += 2) { 2709 if ((w = ws[i]) != null) 2710 count += (long)w.nsteals & 0xffffffffL; 2711 } 2712 } 2713 return count; 2714 } 2715 2716 /** 2717 * Returns an estimate of the total number of tasks currently held 2718 * in queues by worker threads (but not including tasks submitted 2719 * to the pool that have not begun executing). This value is only 2720 * an approximation, obtained by iterating across all threads in 2721 * the pool. This method may be useful for tuning task 2722 * granularities. 2723 * 2724 * @return the number of queued tasks 2725 */ 2726 public long getQueuedTaskCount() { 2727 long count = 0; 2728 WorkQueue[] ws; WorkQueue w; 2729 if ((ws = workQueues) != null) { 2730 for (int i = 1; i < ws.length; i += 2) { 2731 if ((w = ws[i]) != null) 2732 count += w.queueSize(); 2733 } 2734 } 2735 return count; 2736 } 2737 2738 /** 2739 * Returns an estimate of the number of tasks submitted to this 2740 * pool that have not yet begun executing. This method may take 2741 * time proportional to the number of submissions. 2742 * 2743 * @return the number of queued submissions 2744 */ 2745 public int getQueuedSubmissionCount() { 2746 int count = 0; 2747 WorkQueue[] ws; WorkQueue w; 2748 if ((ws = workQueues) != null) { 2749 for (int i = 0; i < ws.length; i += 2) { 2750 if ((w = ws[i]) != null) 2751 count += w.queueSize(); 2752 } 2753 } 2754 return count; 2755 } 2756 2757 /** 2758 * Returns {@code true} if there are any tasks submitted to this 2759 * pool that have not yet begun executing. 2760 * 2761 * @return {@code true} if there are any queued submissions 2762 */ 2763 public boolean hasQueuedSubmissions() { 2764 WorkQueue[] ws; WorkQueue w; 2765 if ((ws = workQueues) != null) { 2766 for (int i = 0; i < ws.length; i += 2) { 2767 if ((w = ws[i]) != null && !w.isEmpty()) 2768 return true; 2769 } 2770 } 2771 return false; 2772 } 2773 2774 /** 2775 * Removes and returns the next unexecuted submission if one is 2776 * available. This method may be useful in extensions to this 2777 * class that re-assign work in systems with multiple pools. 2778 * 2779 * @return the next submission, or {@code null} if none 2780 */ 2781 protected ForkJoinTask<?> pollSubmission() { 2782 return pollScan(true); 2783 } 2784 2785 /** 2786 * Removes all available unexecuted submitted and forked tasks 2787 * from scheduling queues and adds them to the given collection, 2788 * without altering their execution status. These may include 2789 * artificially generated or wrapped tasks. This method is 2790 * designed to be invoked only when the pool is known to be 2791 * quiescent. Invocations at other times may not remove all 2792 * tasks. A failure encountered while attempting to add elements 2793 * to collection {@code c} may result in elements being in 2794 * neither, either or both collections when the associated 2795 * exception is thrown. The behavior of this operation is 2796 * undefined if the specified collection is modified while the 2797 * operation is in progress. 2798 * 2799 * @param c the collection to transfer elements into 2800 * @return the number of elements transferred 2801 */ 2802 protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) { 2803 int count = 0; 2804 WorkQueue[] ws; WorkQueue w; ForkJoinTask<?> t; 2805 if ((ws = workQueues) != null) { 2806 for (int i = 0; i < ws.length; ++i) { 2807 if ((w = ws[i]) != null) { 2808 while ((t = w.poll()) != null) { 2809 c.add(t); 2810 ++count; 2811 } 2812 } 2813 } 2814 } 2815 return count; 2816 } 2817 2818 /** 2819 * Returns a string identifying this pool, as well as its state, 2820 * including indications of run state, parallelism level, and 2821 * worker and task counts. 2822 * 2823 * @return a string identifying this pool, as well as its state 2824 */ 2825 public String toString() { 2826 // Use a single pass through workQueues to collect counts 2827 long qt = 0L, qs = 0L; int rc = 0; 2828 long st = stealCount; 2829 WorkQueue[] ws; WorkQueue w; 2830 if ((ws = workQueues) != null) { 2831 for (int i = 0; i < ws.length; ++i) { 2832 if ((w = ws[i]) != null) { 2833 int size = w.queueSize(); 2834 if ((i & 1) == 0) 2835 qs += size; 2836 else { 2837 qt += size; 2838 st += (long)w.nsteals & 0xffffffffL; 2839 if (w.isApparentlyUnblocked()) 2840 ++rc; 2841 } 2842 } 2843 } 2844 } 2845 2846 int md = mode; 2847 int pc = (md & SMASK); 2848 long c = ctl; 2849 int tc = pc + (short)(c >>> TC_SHIFT); 2850 int ac = pc + (int)(c >> RC_SHIFT); 2851 if (ac < 0) // ignore transient negative 2852 ac = 0; 2853 String level = ((md & TERMINATED) != 0 ? "Terminated" : 2854 (md & STOP) != 0 ? "Terminating" : 2855 (md & SHUTDOWN) != 0 ? "Shutting down" : 2856 "Running"); 2857 return super.toString() + 2858 "[" + level + 2859 ", parallelism = " + pc + 2860 ", size = " + tc + 2861 ", active = " + ac + 2862 ", running = " + rc + 2863 ", steals = " + st + 2864 ", tasks = " + qt + 2865 ", submissions = " + qs + 2866 "]"; 2867 } 2868 2869 /** 2870 * Possibly initiates an orderly shutdown in which previously 2871 * submitted tasks are executed, but no new tasks will be 2872 * accepted. Invocation has no effect on execution state if this 2873 * is the {@link #commonPool()}, and no additional effect if 2874 * already shut down. Tasks that are in the process of being 2875 * submitted concurrently during the course of this method may or 2876 * may not be rejected. 2877 * 2878 * @throws SecurityException if a security manager exists and 2879 * the caller is not permitted to modify threads 2880 * because it does not hold {@link 2881 * java.lang.RuntimePermission}{@code ("modifyThread")} 2882 */ 2883 public void shutdown() { 2884 checkPermission(); 2885 tryTerminate(false, true); 2886 } 2887 2888 /** 2889 * Possibly attempts to cancel and/or stop all tasks, and reject 2890 * all subsequently submitted tasks. Invocation has no effect on 2891 * execution state if this is the {@link #commonPool()}, and no 2892 * additional effect if already shut down. Otherwise, tasks that 2893 * are in the process of being submitted or executed concurrently 2894 * during the course of this method may or may not be 2895 * rejected. This method cancels both existing and unexecuted 2896 * tasks, in order to permit termination in the presence of task 2897 * dependencies. So the method always returns an empty list 2898 * (unlike the case for some other Executors). 2899 * 2900 * @return an empty list 2901 * @throws SecurityException if a security manager exists and 2902 * the caller is not permitted to modify threads 2903 * because it does not hold {@link 2904 * java.lang.RuntimePermission}{@code ("modifyThread")} 2905 */ 2906 public List<Runnable> shutdownNow() { 2907 checkPermission(); 2908 tryTerminate(true, true); 2909 return Collections.emptyList(); 2910 } 2911 2912 /** 2913 * Returns {@code true} if all tasks have completed following shut down. 2914 * 2915 * @return {@code true} if all tasks have completed following shut down 2916 */ 2917 public boolean isTerminated() { 2918 return (mode & TERMINATED) != 0; 2919 } 2920 2921 /** 2922 * Returns {@code true} if the process of termination has 2923 * commenced but not yet completed. This method may be useful for 2924 * debugging. A return of {@code true} reported a sufficient 2925 * period after shutdown may indicate that submitted tasks have 2926 * ignored or suppressed interruption, or are waiting for I/O, 2927 * causing this executor not to properly terminate. (See the 2928 * advisory notes for class {@link ForkJoinTask} stating that 2929 * tasks should not normally entail blocking operations. But if 2930 * they do, they must abort them on interrupt.) 2931 * 2932 * @return {@code true} if terminating but not yet terminated 2933 */ 2934 public boolean isTerminating() { 2935 int md = mode; 2936 return (md & STOP) != 0 && (md & TERMINATED) == 0; 2937 } 2938 2939 /** 2940 * Returns {@code true} if this pool has been shut down. 2941 * 2942 * @return {@code true} if this pool has been shut down 2943 */ 2944 public boolean isShutdown() { 2945 return (mode & SHUTDOWN) != 0; 2946 } 2947 2948 /** 2949 * Blocks until all tasks have completed execution after a 2950 * shutdown request, or the timeout occurs, or the current thread 2951 * is interrupted, whichever happens first. Because the {@link 2952 * #commonPool()} never terminates until program shutdown, when 2953 * applied to the common pool, this method is equivalent to {@link 2954 * #awaitQuiescence(long, TimeUnit)} but always returns {@code false}. 2955 * 2956 * @param timeout the maximum time to wait 2957 * @param unit the time unit of the timeout argument 2958 * @return {@code true} if this executor terminated and 2959 * {@code false} if the timeout elapsed before termination 2960 * @throws InterruptedException if interrupted while waiting 2961 */ 2962 public boolean awaitTermination(long timeout, TimeUnit unit) 2963 throws InterruptedException { 2964 if (Thread.interrupted()) 2965 throw new InterruptedException(); 2966 if (this == common) { 2967 awaitQuiescence(timeout, unit); 2968 return false; 2969 } 2970 long nanos = unit.toNanos(timeout); 2971 if (isTerminated()) 2972 return true; 2973 if (nanos <= 0L) 2974 return false; 2975 long deadline = System.nanoTime() + nanos; 2976 synchronized (this) { 2977 for (;;) { 2978 if (isTerminated()) 2979 return true; 2980 if (nanos <= 0L) 2981 return false; 2982 long millis = TimeUnit.NANOSECONDS.toMillis(nanos); 2983 wait(millis > 0L ? millis : 1L); 2984 nanos = deadline - System.nanoTime(); 2985 } 2986 } 2987 } 2988 2989 /** 2990 * If called by a ForkJoinTask operating in this pool, equivalent 2991 * in effect to {@link ForkJoinTask#helpQuiesce}. Otherwise, 2992 * waits and/or attempts to assist performing tasks until this 2993 * pool {@link #isQuiescent} or the indicated timeout elapses. 2994 * 2995 * @param timeout the maximum time to wait 2996 * @param unit the time unit of the timeout argument 2997 * @return {@code true} if quiescent; {@code false} if the 2998 * timeout elapsed. 2999 */ 3000 public boolean awaitQuiescence(long timeout, TimeUnit unit) { 3001 long nanos = unit.toNanos(timeout); 3002 ForkJoinWorkerThread wt; 3003 Thread thread = Thread.currentThread(); 3004 if ((thread instanceof ForkJoinWorkerThread) && 3005 (wt = (ForkJoinWorkerThread)thread).pool == this) { 3006 helpQuiescePool(wt.workQueue); 3007 return true; 3008 } 3009 else { 3010 for (long startTime = System.nanoTime();;) { 3011 ForkJoinTask<?> t; 3012 if ((t = pollScan(false)) != null) 3013 t.doExec(); 3014 else if (isQuiescent()) 3015 return true; 3016 else if ((System.nanoTime() - startTime) > nanos) 3017 return false; 3018 else 3019 Thread.yield(); // cannot block 3020 } 3021 } 3022 } 3023 3024 /** 3025 * Waits and/or attempts to assist performing tasks indefinitely 3026 * until the {@link #commonPool()} {@link #isQuiescent}. 3027 */ 3028 static void quiesceCommonPool() { 3029 common.awaitQuiescence(Long.MAX_VALUE, TimeUnit.NANOSECONDS); 3030 } 3031 3032 /** 3033 * Interface for extending managed parallelism for tasks running 3034 * in {@link ForkJoinPool}s. 3035 * 3036 * <p>A {@code ManagedBlocker} provides two methods. Method 3037 * {@link #isReleasable} must return {@code true} if blocking is 3038 * not necessary. Method {@link #block} blocks the current thread 3039 * if necessary (perhaps internally invoking {@code isReleasable} 3040 * before actually blocking). These actions are performed by any 3041 * thread invoking {@link ForkJoinPool#managedBlock(ManagedBlocker)}. 3042 * The unusual methods in this API accommodate synchronizers that 3043 * may, but don't usually, block for long periods. Similarly, they 3044 * allow more efficient internal handling of cases in which 3045 * additional workers may be, but usually are not, needed to 3046 * ensure sufficient parallelism. Toward this end, 3047 * implementations of method {@code isReleasable} must be amenable 3048 * to repeated invocation. 3049 * 3050 * <p>For example, here is a ManagedBlocker based on a 3051 * ReentrantLock: 3052 * <pre> {@code 3053 * class ManagedLocker implements ManagedBlocker { 3054 * final ReentrantLock lock; 3055 * boolean hasLock = false; 3056 * ManagedLocker(ReentrantLock lock) { this.lock = lock; } 3057 * public boolean block() { 3058 * if (!hasLock) 3059 * lock.lock(); 3060 * return true; 3061 * } 3062 * public boolean isReleasable() { 3063 * return hasLock || (hasLock = lock.tryLock()); 3064 * } 3065 * }}</pre> 3066 * 3067 * <p>Here is a class that possibly blocks waiting for an 3068 * item on a given queue: 3069 * <pre> {@code 3070 * class QueueTaker<E> implements ManagedBlocker { 3071 * final BlockingQueue<E> queue; 3072 * volatile E item = null; 3073 * QueueTaker(BlockingQueue<E> q) { this.queue = q; } 3074 * public boolean block() throws InterruptedException { 3075 * if (item == null) 3076 * item = queue.take(); 3077 * return true; 3078 * } 3079 * public boolean isReleasable() { 3080 * return item != null || (item = queue.poll()) != null; 3081 * } 3082 * public E getItem() { // call after pool.managedBlock completes 3083 * return item; 3084 * } 3085 * }}</pre> 3086 */ 3087 public static interface ManagedBlocker { 3088 /** 3089 * Possibly blocks the current thread, for example waiting for 3090 * a lock or condition. 3091 * 3092 * @return {@code true} if no additional blocking is necessary 3093 * (i.e., if isReleasable would return true) 3094 * @throws InterruptedException if interrupted while waiting 3095 * (the method is not required to do so, but is allowed to) 3096 */ 3097 boolean block() throws InterruptedException; 3098 3099 /** 3100 * Returns {@code true} if blocking is unnecessary. 3101 * @return {@code true} if blocking is unnecessary 3102 */ 3103 boolean isReleasable(); 3104 } 3105 3106 /** 3107 * Runs the given possibly blocking task. When {@linkplain 3108 * ForkJoinTask#inForkJoinPool() running in a ForkJoinPool}, this 3109 * method possibly arranges for a spare thread to be activated if 3110 * necessary to ensure sufficient parallelism while the current 3111 * thread is blocked in {@link ManagedBlocker#block blocker.block()}. 3112 * 3113 * <p>This method repeatedly calls {@code blocker.isReleasable()} and 3114 * {@code blocker.block()} until either method returns {@code true}. 3115 * Every call to {@code blocker.block()} is preceded by a call to 3116 * {@code blocker.isReleasable()} that returned {@code false}. 3117 * 3118 * <p>If not running in a ForkJoinPool, this method is 3119 * behaviorally equivalent to 3120 * <pre> {@code 3121 * while (!blocker.isReleasable()) 3122 * if (blocker.block()) 3123 * break;}</pre> 3124 * 3125 * If running in a ForkJoinPool, the pool may first be expanded to 3126 * ensure sufficient parallelism available during the call to 3127 * {@code blocker.block()}. 3128 * 3129 * @param blocker the blocker task 3130 * @throws InterruptedException if {@code blocker.block()} did so 3131 */ 3132 public static void managedBlock(ManagedBlocker blocker) 3133 throws InterruptedException { 3134 ForkJoinPool p; 3135 ForkJoinWorkerThread wt; 3136 WorkQueue w; 3137 Thread t = Thread.currentThread(); 3138 if ((t instanceof ForkJoinWorkerThread) && 3139 (p = (wt = (ForkJoinWorkerThread)t).pool) != null && 3140 (w = wt.workQueue) != null) { 3141 int block; 3142 while (!blocker.isReleasable()) { 3143 if ((block = p.tryCompensate(w)) != 0) { 3144 try { 3145 do {} while (!blocker.isReleasable() && 3146 !blocker.block()); 3147 } finally { 3148 CTL.getAndAdd(p, (block > 0) ? RC_UNIT : 0L); 3149 } 3150 break; 3151 } 3152 } 3153 } 3154 else { 3155 do {} while (!blocker.isReleasable() && 3156 !blocker.block()); 3157 } 3158 } 3159 3160 /** 3161 * If the given executor is a ForkJoinPool, poll and execute 3162 * AsynchronousCompletionTasks from worker's queue until none are 3163 * available or blocker is released. 3164 */ 3165 static void helpAsyncBlocker(Executor e, ManagedBlocker blocker) { 3166 if (blocker != null && (e instanceof ForkJoinPool)) { 3167 WorkQueue w; ForkJoinWorkerThread wt; WorkQueue[] ws; int r, n; 3168 ForkJoinPool p = (ForkJoinPool)e; 3169 Thread thread = Thread.currentThread(); 3170 if (thread instanceof ForkJoinWorkerThread && 3171 (wt = (ForkJoinWorkerThread)thread).pool == p) 3172 w = wt.workQueue; 3173 else if ((r = ThreadLocalRandom.getProbe()) != 0 && 3174 (ws = p.workQueues) != null && (n = ws.length) > 0) 3175 w = ws[(n - 1) & r & SQMASK]; 3176 else 3177 w = null; 3178 if (w != null) { 3179 for (;;) { 3180 int b = w.base, s = w.top, d, al; ForkJoinTask<?>[] a; 3181 if ((a = w.array) != null && (d = b - s) < 0 && 3182 (al = a.length) > 0) { 3183 int index = (al - 1) & b; 3184 ForkJoinTask<?> t = (ForkJoinTask<?>) 3185 QA.getAcquire(a, index); 3186 if (blocker.isReleasable()) 3187 break; 3188 else if (b++ == w.base) { 3189 if (t == null) { 3190 if (d == -1) 3191 break; 3192 } 3193 else if (!(t instanceof CompletableFuture. 3194 AsynchronousCompletionTask)) 3195 break; 3196 else if (QA.compareAndSet(a, index, t, null)) { 3197 w.base = b; 3198 t.doExec(); 3199 } 3200 } 3201 } 3202 else 3203 break; 3204 } 3205 } 3206 } 3207 } 3208 3209 // AbstractExecutorService overrides. These rely on undocumented 3210 // fact that ForkJoinTask.adapt returns ForkJoinTasks that also 3211 // implement RunnableFuture. 3212 3213 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { 3214 return new ForkJoinTask.AdaptedRunnable<T>(runnable, value); 3215 } 3216 3217 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { 3218 return new ForkJoinTask.AdaptedCallable<T>(callable); 3219 } 3220 3221 // VarHandle mechanics 3222 private static final VarHandle CTL; 3223 private static final VarHandle MODE; 3224 private static final VarHandle QA; 3225 3226 static { 3227 try { 3228 MethodHandles.Lookup l = MethodHandles.lookup(); 3229 CTL = l.findVarHandle(ForkJoinPool.class, "ctl", long.class); 3230 MODE = l.findVarHandle(ForkJoinPool.class, "mode", int.class); 3231 QA = MethodHandles.arrayElementVarHandle(ForkJoinTask[].class); 3232 } catch (ReflectiveOperationException e) { 3233 throw new Error(e); 3234 } 3235 3236 // Reduce the risk of rare disastrous classloading in first call to 3237 // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773 3238 Class<?> ensureLoaded = LockSupport.class; 3239 3240 int commonMaxSpares = DEFAULT_COMMON_MAX_SPARES; 3241 try { 3242 String p = System.getProperty 3243 ("java.util.concurrent.ForkJoinPool.common.maximumSpares"); 3244 if (p != null) 3245 commonMaxSpares = Integer.parseInt(p); 3246 } catch (Exception ignore) {} 3247 COMMON_MAX_SPARES = commonMaxSpares; 3248 3249 defaultForkJoinWorkerThreadFactory = 3250 new DefaultForkJoinWorkerThreadFactory(); 3251 modifyThreadPermission = new RuntimePermission("modifyThread"); 3252 3253 common = AccessController.doPrivileged(new PrivilegedAction<>() { 3254 public ForkJoinPool run() { 3255 return new ForkJoinPool((byte)0); }}); 3256 3257 COMMON_PARALLELISM = Math.max(common.mode & SMASK, 1); 3258 } 3259 3260 /** 3261 * Factory for innocuous worker threads. 3262 */ 3263 private static final class InnocuousForkJoinWorkerThreadFactory 3264 implements ForkJoinWorkerThreadFactory { 3265 3266 /** 3267 * An ACC to restrict permissions for the factory itself. 3268 * The constructed workers have no permissions set. 3269 */ 3270 private static final AccessControlContext ACC = contextWithPermissions( 3271 modifyThreadPermission, 3272 new RuntimePermission("enableContextClassLoaderOverride"), 3273 new RuntimePermission("modifyThreadGroup"), 3274 new RuntimePermission("getClassLoader"), 3275 new RuntimePermission("setContextClassLoader")); 3276 3277 public final ForkJoinWorkerThread newThread(ForkJoinPool pool) { 3278 return AccessController.doPrivileged( 3279 new PrivilegedAction<>() { 3280 public ForkJoinWorkerThread run() { 3281 return new ForkJoinWorkerThread. 3282 InnocuousForkJoinWorkerThread(pool); }}, 3283 ACC); 3284 } 3285 } 3286 }