1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent; 37 38 import java.lang.Thread.UncaughtExceptionHandler; 39 import java.lang.invoke.MethodHandles; 40 import java.lang.invoke.VarHandle; 41 import java.security.AccessController; 42 import java.security.AccessControlContext; 43 import java.security.Permission; 44 import java.security.Permissions; 45 import java.security.PrivilegedAction; 46 import java.security.ProtectionDomain; 47 import java.util.ArrayList; 48 import java.util.Collection; 49 import java.util.Collections; 50 import java.util.List; 51 import java.util.function.Predicate; 52 import java.util.concurrent.locks.LockSupport; 53 54 /** 55 * An {@link ExecutorService} for running {@link ForkJoinTask}s. 56 * A {@code ForkJoinPool} provides the entry point for submissions 57 * from non-{@code ForkJoinTask} clients, as well as management and 58 * monitoring operations. 59 * 60 * <p>A {@code ForkJoinPool} differs from other kinds of {@link 61 * ExecutorService} mainly by virtue of employing 62 * <em>work-stealing</em>: all threads in the pool attempt to find and 63 * execute tasks submitted to the pool and/or created by other active 64 * tasks (eventually blocking waiting for work if none exist). This 65 * enables efficient processing when most tasks spawn other subtasks 66 * (as do most {@code ForkJoinTask}s), as well as when many small 67 * tasks are submitted to the pool from external clients. Especially 68 * when setting <em>asyncMode</em> to true in constructors, {@code 69 * ForkJoinPool}s may also be appropriate for use with event-style 70 * tasks that are never joined. All worker threads are initialized 71 * with {@link Thread#isDaemon} set {@code true}. 72 * 73 * <p>A static {@link #commonPool()} is available and appropriate for 74 * most applications. The common pool is used by any ForkJoinTask that 75 * is not explicitly submitted to a specified pool. Using the common 76 * pool normally reduces resource usage (its threads are slowly 77 * reclaimed during periods of non-use, and reinstated upon subsequent 78 * use). 79 * 80 * <p>For applications that require separate or custom pools, a {@code 81 * ForkJoinPool} may be constructed with a given target parallelism 82 * level; by default, equal to the number of available processors. 83 * The pool attempts to maintain enough active (or available) threads 84 * by dynamically adding, suspending, or resuming internal worker 85 * threads, even if some tasks are stalled waiting to join others. 86 * However, no such adjustments are guaranteed in the face of blocked 87 * I/O or other unmanaged synchronization. The nested {@link 88 * ManagedBlocker} interface enables extension of the kinds of 89 * synchronization accommodated. The default policies may be 90 * overridden using a constructor with parameters corresponding to 91 * those documented in class {@link ThreadPoolExecutor}. 92 * 93 * <p>In addition to execution and lifecycle control methods, this 94 * class provides status check methods (for example 95 * {@link #getStealCount}) that are intended to aid in developing, 96 * tuning, and monitoring fork/join applications. Also, method 97 * {@link #toString} returns indications of pool state in a 98 * convenient form for informal monitoring. 99 * 100 * <p>As is the case with other ExecutorServices, there are three 101 * main task execution methods summarized in the following table. 102 * These are designed to be used primarily by clients not already 103 * engaged in fork/join computations in the current pool. The main 104 * forms of these methods accept instances of {@code ForkJoinTask}, 105 * but overloaded forms also allow mixed execution of plain {@code 106 * Runnable}- or {@code Callable}- based activities as well. However, 107 * tasks that are already executing in a pool should normally instead 108 * use the within-computation forms listed in the table unless using 109 * async event-style tasks that are not usually joined, in which case 110 * there is little difference among choice of methods. 111 * 112 * <table class="plain"> 113 * <caption>Summary of task execution methods</caption> 114 * <tr> 115 * <td></td> 116 * <th scope="col"> Call from non-fork/join clients</th> 117 * <th scope="col"> Call from within fork/join computations</th> 118 * </tr> 119 * <tr> 120 * <th scope="row" style="text-align:left"> Arrange async execution</th> 121 * <td> {@link #execute(ForkJoinTask)}</td> 122 * <td> {@link ForkJoinTask#fork}</td> 123 * </tr> 124 * <tr> 125 * <th scope="row" style="text-align:left"> Await and obtain result</th> 126 * <td> {@link #invoke(ForkJoinTask)}</td> 127 * <td> {@link ForkJoinTask#invoke}</td> 128 * </tr> 129 * <tr> 130 * <th scope="row" style="text-align:left"> Arrange exec and obtain Future</th> 131 * <td> {@link #submit(ForkJoinTask)}</td> 132 * <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td> 133 * </tr> 134 * </table> 135 * 136 * <p>The parameters used to construct the common pool may be controlled by 137 * setting the following {@linkplain System#getProperty system properties}: 138 * <ul> 139 * <li>{@systemProperty java.util.concurrent.ForkJoinPool.common.parallelism} 140 * - the parallelism level, a non-negative integer 141 * <li>{@systemProperty java.util.concurrent.ForkJoinPool.common.threadFactory} 142 * - the class name of a {@link ForkJoinWorkerThreadFactory}. 143 * The {@linkplain ClassLoader#getSystemClassLoader() system class loader} 144 * is used to load this class. 145 * <li>{@systemProperty java.util.concurrent.ForkJoinPool.common.exceptionHandler} 146 * - the class name of a {@link UncaughtExceptionHandler}. 147 * The {@linkplain ClassLoader#getSystemClassLoader() system class loader} 148 * is used to load this class. 149 * <li>{@systemProperty java.util.concurrent.ForkJoinPool.common.maximumSpares} 150 * - the maximum number of allowed extra threads to maintain target 151 * parallelism (default 256). 152 * </ul> 153 * If no thread factory is supplied via a system property, then the 154 * common pool uses a factory that uses the system class loader as the 155 * {@linkplain Thread#getContextClassLoader() thread context class loader}. 156 * In addition, if a {@link SecurityManager} is present, then 157 * the common pool uses a factory supplying threads that have no 158 * {@link Permissions} enabled. 159 * 160 * Upon any error in establishing these settings, default parameters 161 * are used. It is possible to disable or limit the use of threads in 162 * the common pool by setting the parallelism property to zero, and/or 163 * using a factory that may return {@code null}. However doing so may 164 * cause unjoined tasks to never be executed. 165 * 166 * <p><b>Implementation notes</b>: This implementation restricts the 167 * maximum number of running threads to 32767. Attempts to create 168 * pools with greater than the maximum number result in 169 * {@code IllegalArgumentException}. 170 * 171 * <p>This implementation rejects submitted tasks (that is, by throwing 172 * {@link RejectedExecutionException}) only when the pool is shut down 173 * or internal resources have been exhausted. 174 * 175 * @since 1.7 176 * @author Doug Lea 177 */ 178 public class ForkJoinPool extends AbstractExecutorService { 179 180 /* 181 * Implementation Overview 182 * 183 * This class and its nested classes provide the main 184 * functionality and control for a set of worker threads: 185 * Submissions from non-FJ threads enter into submission queues. 186 * Workers take these tasks and typically split them into subtasks 187 * that may be stolen by other workers. Work-stealing based on 188 * randomized scans generally leads to better throughput than 189 * "work dealing" in which producers assign tasks to idle threads, 190 * in part because threads that have finished other tasks before 191 * the signalled thread wakes up (which can be a long time) can 192 * take the task instead. Preference rules give first priority to 193 * processing tasks from their own queues (LIFO or FIFO, depending 194 * on mode), then to randomized FIFO steals of tasks in other 195 * queues. This framework began as vehicle for supporting 196 * tree-structured parallelism using work-stealing. Over time, 197 * its scalability advantages led to extensions and changes to 198 * better support more diverse usage contexts. Because most 199 * internal methods and nested classes are interrelated, their 200 * main rationale and descriptions are presented here; individual 201 * methods and nested classes contain only brief comments about 202 * details. 203 * 204 * WorkQueues 205 * ========== 206 * 207 * Most operations occur within work-stealing queues (in nested 208 * class WorkQueue). These are special forms of Deques that 209 * support only three of the four possible end-operations -- push, 210 * pop, and poll (aka steal), under the further constraints that 211 * push and pop are called only from the owning thread (or, as 212 * extended here, under a lock), while poll may be called from 213 * other threads. (If you are unfamiliar with them, you probably 214 * want to read Herlihy and Shavit's book "The Art of 215 * Multiprocessor programming", chapter 16 describing these in 216 * more detail before proceeding.) The main work-stealing queue 217 * design is roughly similar to those in the papers "Dynamic 218 * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005 219 * (http://research.sun.com/scalable/pubs/index.html) and 220 * "Idempotent work stealing" by Michael, Saraswat, and Vechev, 221 * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186). 222 * The main differences ultimately stem from GC requirements that 223 * we null out taken slots as soon as we can, to maintain as small 224 * a footprint as possible even in programs generating huge 225 * numbers of tasks. To accomplish this, we shift the CAS 226 * arbitrating pop vs poll (steal) from being on the indices 227 * ("base" and "top") to the slots themselves. 228 * 229 * Adding tasks then takes the form of a classic array push(task) 230 * in a circular buffer: 231 * q.array[q.top++ % length] = task; 232 * 233 * (The actual code needs to null-check and size-check the array, 234 * uses masking, not mod, for indexing a power-of-two-sized array, 235 * adds a release fence for publication, and possibly signals 236 * waiting workers to start scanning -- see below.) Both a 237 * successful pop and poll mainly entail a CAS of a slot from 238 * non-null to null. 239 * 240 * The pop operation (always performed by owner) is: 241 * if ((the task at top slot is not null) and 242 * (CAS slot to null)) 243 * decrement top and return task; 244 * 245 * And the poll operation (usually by a stealer) is 246 * if ((the task at base slot is not null) and 247 * (CAS slot to null)) 248 * increment base and return task; 249 * 250 * There are several variants of each of these. Most uses occur 251 * within operations that also interleave contention or emptiness 252 * tracking or inspection of elements before extracting them, so 253 * must interleave these with the above code. When performed by 254 * owner, getAndSet is used instead of CAS (see for example method 255 * nextLocalTask) which is usually more efficient, and possible 256 * because the top index cannot independently change during the 257 * operation. 258 * 259 * Memory ordering. See "Correct and Efficient Work-Stealing for 260 * Weak Memory Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013 261 * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an 262 * analysis of memory ordering requirements in work-stealing 263 * algorithms similar to (but different than) the one used here. 264 * Extracting tasks in array slots via (fully fenced) CAS provides 265 * primary synchronization. The base and top indices imprecisely 266 * guide where to extract from. We do not usually require strict 267 * orderings of array and index updates. Many index accesses use 268 * plain mode, with ordering constrained by surrounding context 269 * (usually with respect to element CASes or the two WorkQueue 270 * volatile fields source and phase). When not otherwise already 271 * constrained, reads of "base" by queue owners use acquire-mode, 272 * and some externally callable methods preface accesses with 273 * acquire fences. Additionally, to ensure that index update 274 * writes are not coalesced or postponed in loops etc, "opaque" 275 * mode is used in a few cases where timely writes are not 276 * otherwise ensured. The "locked" versions of push- and pop- 277 * based methods for shared queues differ from owned versions 278 * because locking already forces some of the ordering. 279 * 280 * Because indices and slot contents cannot always be consistent, 281 * a check that base == top indicates (momentary) emptiness, but 282 * otherwise may err on the side of possibly making the queue 283 * appear nonempty when a push, pop, or poll have not fully 284 * committed, or making it appear empty when an update of top has 285 * not yet been visibly written. (Method isEmpty() checks the 286 * case of a partially completed removal of the last element.) 287 * Because of this, the poll operation, considered individually, 288 * is not wait-free. One thief cannot successfully continue until 289 * another in-progress one (or, if previously empty, a push) 290 * visibly completes. This can stall threads when required to 291 * consume from a given queue (see method poll()). However, in 292 * the aggregate, we ensure at least probabilistic 293 * non-blockingness. If an attempted steal fails, a scanning 294 * thief chooses a different random victim target to try next. So, 295 * in order for one thief to progress, it suffices for any 296 * in-progress poll or new push on any empty queue to complete. 297 * 298 * This approach also enables support of a user mode in which 299 * local task processing is in FIFO, not LIFO order, simply by 300 * using poll rather than pop. This can be useful in 301 * message-passing frameworks in which tasks are never joined. 302 * 303 * WorkQueues are also used in a similar way for tasks submitted 304 * to the pool. We cannot mix these tasks in the same queues used 305 * by workers. Instead, we randomly associate submission queues 306 * with submitting threads, using a form of hashing. The 307 * ThreadLocalRandom probe value serves as a hash code for 308 * choosing existing queues, and may be randomly repositioned upon 309 * contention with other submitters. In essence, submitters act 310 * like workers except that they are restricted to executing local 311 * tasks that they submitted. Insertion of tasks in shared mode 312 * requires a lock but we use only a simple spinlock (using field 313 * phase), because submitters encountering a busy queue move to a 314 * different position to use or create other queues -- they block 315 * only when creating and registering new queues. Because it is 316 * used only as a spinlock, unlocking requires only a "releasing" 317 * store (using setRelease) unless otherwise signalling. 318 * 319 * Management 320 * ========== 321 * 322 * The main throughput advantages of work-stealing stem from 323 * decentralized control -- workers mostly take tasks from 324 * themselves or each other, at rates that can exceed a billion 325 * per second. The pool itself creates, activates (enables 326 * scanning for and running tasks), deactivates, blocks, and 327 * terminates threads, all with minimal central information. 328 * There are only a few properties that we can globally track or 329 * maintain, so we pack them into a small number of variables, 330 * often maintaining atomicity without blocking or locking. 331 * Nearly all essentially atomic control state is held in a few 332 * volatile variables that are by far most often read (not 333 * written) as status and consistency checks. We pack as much 334 * information into them as we can. 335 * 336 * Field "ctl" contains 64 bits holding information needed to 337 * atomically decide to add, enqueue (on an event queue), and 338 * dequeue and release workers. To enable this packing, we 339 * restrict maximum parallelism to (1<<15)-1 (which is far in 340 * excess of normal operating range) to allow ids, counts, and 341 * their negations (used for thresholding) to fit into 16bit 342 * subfields. 343 * 344 * Field "mode" holds configuration parameters as well as lifetime 345 * status, atomically and monotonically setting SHUTDOWN, STOP, 346 * and finally TERMINATED bits. 347 * 348 * Field "workQueues" holds references to WorkQueues. It is 349 * updated (only during worker creation and termination) under 350 * lock (using field workerNamePrefix as lock), but is otherwise 351 * concurrently readable, and accessed directly. We also ensure 352 * that uses of the array reference itself never become too stale 353 * in case of resizing, by arranging that (re-)reads are separated 354 * by at least one acquiring read access. To simplify index-based 355 * operations, the array size is always a power of two, and all 356 * readers must tolerate null slots. Worker queues are at odd 357 * indices. Shared (submission) queues are at even indices, up to 358 * a maximum of 64 slots, to limit growth even if the array needs 359 * to expand to add more workers. Grouping them together in this 360 * way simplifies and speeds up task scanning. 361 * 362 * All worker thread creation is on-demand, triggered by task 363 * submissions, replacement of terminated workers, and/or 364 * compensation for blocked workers. However, all other support 365 * code is set up to work with other policies. To ensure that we 366 * do not hold on to worker references that would prevent GC, all 367 * accesses to workQueues are via indices into the workQueues 368 * array (which is one source of some of the messy code 369 * constructions here). In essence, the workQueues array serves as 370 * a weak reference mechanism. Thus for example the stack top 371 * subfield of ctl stores indices, not references. 372 * 373 * Queuing Idle Workers. Unlike HPC work-stealing frameworks, we 374 * cannot let workers spin indefinitely scanning for tasks when 375 * none can be found immediately, and we cannot start/resume 376 * workers unless there appear to be tasks available. On the 377 * other hand, we must quickly prod them into action when new 378 * tasks are submitted or generated. In many usages, ramp-up time 379 * is the main limiting factor in overall performance, which is 380 * compounded at program start-up by JIT compilation and 381 * allocation. So we streamline this as much as possible. 382 * 383 * The "ctl" field atomically maintains total worker and 384 * "released" worker counts, plus the head of the available worker 385 * queue (actually stack, represented by the lower 32bit subfield 386 * of ctl). Released workers are those known to be scanning for 387 * and/or running tasks. Unreleased ("available") workers are 388 * recorded in the ctl stack. These workers are made available for 389 * signalling by enqueuing in ctl (see method runWorker). The 390 * "queue" is a form of Treiber stack. This is ideal for 391 * activating threads in most-recently used order, and improves 392 * performance and locality, outweighing the disadvantages of 393 * being prone to contention and inability to release a worker 394 * unless it is topmost on stack. To avoid missed signal problems 395 * inherent in any wait/signal design, available workers rescan 396 * for (and if found run) tasks after enqueuing. Normally their 397 * release status will be updated while doing so, but the released 398 * worker ctl count may underestimate the number of active 399 * threads. (However, it is still possible to determine quiescence 400 * via a validation traversal -- see isQuiescent). After an 401 * unsuccessful rescan, available workers are blocked until 402 * signalled (see signalWork). The top stack state holds the 403 * value of the "phase" field of the worker: its index and status, 404 * plus a version counter that, in addition to the count subfields 405 * (also serving as version stamps) provide protection against 406 * Treiber stack ABA effects. 407 * 408 * Creating workers. To create a worker, we pre-increment counts 409 * (serving as a reservation), and attempt to construct a 410 * ForkJoinWorkerThread via its factory. Upon construction, the 411 * new thread invokes registerWorker, where it constructs a 412 * WorkQueue and is assigned an index in the workQueues array 413 * (expanding the array if necessary). The thread is then started. 414 * Upon any exception across these steps, or null return from 415 * factory, deregisterWorker adjusts counts and records 416 * accordingly. If a null return, the pool continues running with 417 * fewer than the target number workers. If exceptional, the 418 * exception is propagated, generally to some external caller. 419 * Worker index assignment avoids the bias in scanning that would 420 * occur if entries were sequentially packed starting at the front 421 * of the workQueues array. We treat the array as a simple 422 * power-of-two hash table, expanding as needed. The seedIndex 423 * increment ensures no collisions until a resize is needed or a 424 * worker is deregistered and replaced, and thereafter keeps 425 * probability of collision low. We cannot use 426 * ThreadLocalRandom.getProbe() for similar purposes here because 427 * the thread has not started yet, but do so for creating 428 * submission queues for existing external threads (see 429 * externalPush). 430 * 431 * WorkQueue field "phase" is used by both workers and the pool to 432 * manage and track whether a worker is UNSIGNALLED (possibly 433 * blocked waiting for a signal). When a worker is enqueued its 434 * phase field is set. Note that phase field updates lag queue CAS 435 * releases so usage requires care -- seeing a negative phase does 436 * not guarantee that the worker is available. When queued, the 437 * lower 16 bits of scanState must hold its pool index. So we 438 * place the index there upon initialization and otherwise keep it 439 * there or restore it when necessary. 440 * 441 * The ctl field also serves as the basis for memory 442 * synchronization surrounding activation. This uses a more 443 * efficient version of a Dekker-like rule that task producers and 444 * consumers sync with each other by both writing/CASing ctl (even 445 * if to its current value). This would be extremely costly. So 446 * we relax it in several ways: (1) Producers only signal when 447 * their queue is possibly empty at some point during a push 448 * operation. (2) Other workers propagate this signal 449 * when they find tasks in a queue with size greater than one. (3) 450 * Workers only enqueue after scanning (see below) and not finding 451 * any tasks. (4) Rather than CASing ctl to its current value in 452 * the common case where no action is required, we reduce write 453 * contention by equivalently prefacing signalWork when called by 454 * an external task producer using a memory access with 455 * full-volatile semantics or a "fullFence". 456 * 457 * Almost always, too many signals are issued, in part because a 458 * task producer cannot tell if some existing worker is in the 459 * midst of finishing one task (or already scanning) and ready to 460 * take another without being signalled. So the producer might 461 * instead activate a different worker that does not find any 462 * work, and then inactivates. This scarcely matters in 463 * steady-state computations involving all workers, but can create 464 * contention and bookkeeping bottlenecks during ramp-up, 465 * ramp-down, and small computations involving only a few workers. 466 * 467 * Scanning. Method scan (from runWorker) performs top-level 468 * scanning for tasks. (Similar scans appear in helpQuiesce and 469 * pollScan.) Each scan traverses and tries to poll from each 470 * queue starting at a random index. Scans are not performed in 471 * ideal random permutation order, to reduce cacheline 472 * contention. The pseudorandom generator need not have 473 * high-quality statistical properties in the long term, but just 474 * within computations; We use Marsaglia XorShifts (often via 475 * ThreadLocalRandom.nextSecondarySeed), which are cheap and 476 * suffice. Scanning also includes contention reduction: When 477 * scanning workers fail to extract an apparently existing task, 478 * they soon restart at a different pseudorandom index. This form 479 * of backoff improves throughput when many threads are trying to 480 * take tasks from few queues, which can be common in some usages. 481 * Scans do not otherwise explicitly take into account core 482 * affinities, loads, cache localities, etc, However, they do 483 * exploit temporal locality (which usually approximates these) by 484 * preferring to re-poll from the same queue after a successful 485 * poll before trying others (see method topLevelExec). However 486 * this preference is bounded (see TOP_BOUND_SHIFT) as a safeguard 487 * against infinitely unfair looping under unbounded user task 488 * recursion, and also to reduce long-term contention when many 489 * threads poll few queues holding many small tasks. The bound is 490 * high enough to avoid much impact on locality and scheduling 491 * overhead. 492 * 493 * Trimming workers. To release resources after periods of lack of 494 * use, a worker starting to wait when the pool is quiescent will 495 * time out and terminate (see method runWorker) if the pool has 496 * remained quiescent for period given by field keepAlive. 497 * 498 * Shutdown and Termination. A call to shutdownNow invokes 499 * tryTerminate to atomically set a runState bit. The calling 500 * thread, as well as every other worker thereafter terminating, 501 * helps terminate others by cancelling their unprocessed tasks, 502 * and waking them up, doing so repeatedly until stable. Calls to 503 * non-abrupt shutdown() preface this by checking whether 504 * termination should commence by sweeping through queues (until 505 * stable) to ensure lack of in-flight submissions and workers 506 * about to process them before triggering the "STOP" phase of 507 * termination. 508 * 509 * Joining Tasks 510 * ============= 511 * 512 * Any of several actions may be taken when one worker is waiting 513 * to join a task stolen (or always held) by another. Because we 514 * are multiplexing many tasks on to a pool of workers, we can't 515 * always just let them block (as in Thread.join). We also cannot 516 * just reassign the joiner's run-time stack with another and 517 * replace it later, which would be a form of "continuation", that 518 * even if possible is not necessarily a good idea since we may 519 * need both an unblocked task and its continuation to progress. 520 * Instead we combine two tactics: 521 * 522 * Helping: Arranging for the joiner to execute some task that it 523 * would be running if the steal had not occurred. 524 * 525 * Compensating: Unless there are already enough live threads, 526 * method tryCompensate() may create or re-activate a spare 527 * thread to compensate for blocked joiners until they unblock. 528 * 529 * A third form (implemented in tryRemoveAndExec) amounts to 530 * helping a hypothetical compensator: If we can readily tell that 531 * a possible action of a compensator is to steal and execute the 532 * task being joined, the joining thread can do so directly, 533 * without the need for a compensation thread. 534 * 535 * The ManagedBlocker extension API can't use helping so relies 536 * only on compensation in method awaitBlocker. 537 * 538 * The algorithm in awaitJoin entails a form of "linear helping". 539 * Each worker records (in field source) the id of the queue from 540 * which it last stole a task. The scan in method awaitJoin uses 541 * these markers to try to find a worker to help (i.e., steal back 542 * a task from and execute it) that could hasten completion of the 543 * actively joined task. Thus, the joiner executes a task that 544 * would be on its own local deque if the to-be-joined task had 545 * not been stolen. This is a conservative variant of the approach 546 * described in Wagner & Calder "Leapfrogging: a portable 547 * technique for implementing efficient futures" SIGPLAN Notices, 548 * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs 549 * mainly in that we only record queue ids, not full dependency 550 * links. This requires a linear scan of the workQueues array to 551 * locate stealers, but isolates cost to when it is needed, rather 552 * than adding to per-task overhead. Searches can fail to locate 553 * stealers GC stalls and the like delay recording sources. 554 * Further, even when accurately identified, stealers might not 555 * ever produce a task that the joiner can in turn help with. So, 556 * compensation is tried upon failure to find tasks to run. 557 * 558 * Compensation does not by default aim to keep exactly the target 559 * parallelism number of unblocked threads running at any given 560 * time. Some previous versions of this class employed immediate 561 * compensations for any blocked join. However, in practice, the 562 * vast majority of blockages are transient byproducts of GC and 563 * other JVM or OS activities that are made worse by replacement 564 * when they cause longer-term oversubscription. Rather than 565 * impose arbitrary policies, we allow users to override the 566 * default of only adding threads upon apparent starvation. The 567 * compensation mechanism may also be bounded. Bounds for the 568 * commonPool (see COMMON_MAX_SPARES) better enable JVMs to cope 569 * with programming errors and abuse before running out of 570 * resources to do so. 571 * 572 * Common Pool 573 * =========== 574 * 575 * The static common pool always exists after static 576 * initialization. Since it (or any other created pool) need 577 * never be used, we minimize initial construction overhead and 578 * footprint to the setup of about a dozen fields. 579 * 580 * When external threads submit to the common pool, they can 581 * perform subtask processing (see externalHelpComplete and 582 * related methods) upon joins. This caller-helps policy makes it 583 * sensible to set common pool parallelism level to one (or more) 584 * less than the total number of available cores, or even zero for 585 * pure caller-runs. We do not need to record whether external 586 * submissions are to the common pool -- if not, external help 587 * methods return quickly. These submitters would otherwise be 588 * blocked waiting for completion, so the extra effort (with 589 * liberally sprinkled task status checks) in inapplicable cases 590 * amounts to an odd form of limited spin-wait before blocking in 591 * ForkJoinTask.join. 592 * 593 * As a more appropriate default in managed environments, unless 594 * overridden by system properties, we use workers of subclass 595 * InnocuousForkJoinWorkerThread when there is a SecurityManager 596 * present. These workers have no permissions set, do not belong 597 * to any user-defined ThreadGroup, and erase all ThreadLocals 598 * after executing any top-level task (see 599 * WorkQueue.afterTopLevelExec). The associated mechanics (mainly 600 * in ForkJoinWorkerThread) may be JVM-dependent and must access 601 * particular Thread class fields to achieve this effect. 602 * 603 * Memory placement 604 * ================ 605 * 606 * Performance can be very sensitive to placement of instances of 607 * ForkJoinPool and WorkQueues and their queue arrays. To reduce 608 * false-sharing impact, the @Contended annotation isolates 609 * adjacent WorkQueue instances, as well as the ForkJoinPool.ctl 610 * field. WorkQueue arrays are allocated (by their threads) with 611 * larger initial sizes than most ever need, mostly to reduce 612 * false sharing with current garbage collectors that use cardmark 613 * tables. 614 * 615 * Style notes 616 * =========== 617 * 618 * Memory ordering relies mainly on VarHandles. This can be 619 * awkward and ugly, but also reflects the need to control 620 * outcomes across the unusual cases that arise in very racy code 621 * with very few invariants. All fields are read into locals 622 * before use, and null-checked if they are references. Array 623 * accesses using masked indices include checks (that are always 624 * true) that the array length is non-zero to avoid compilers 625 * inserting more expensive traps. This is usually done in a 626 * "C"-like style of listing declarations at the heads of methods 627 * or blocks, and using inline assignments on first encounter. 628 * Nearly all explicit checks lead to bypass/return, not exception 629 * throws, because they may legitimately arise due to 630 * cancellation/revocation during shutdown. 631 * 632 * There is a lot of representation-level coupling among classes 633 * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask. The 634 * fields of WorkQueue maintain data structures managed by 635 * ForkJoinPool, so are directly accessed. There is little point 636 * trying to reduce this, since any associated future changes in 637 * representations will need to be accompanied by algorithmic 638 * changes anyway. Several methods intrinsically sprawl because 639 * they must accumulate sets of consistent reads of fields held in 640 * local variables. Some others are artificially broken up to 641 * reduce producer/consumer imbalances due to dynamic compilation. 642 * There are also other coding oddities (including several 643 * unnecessary-looking hoisted null checks) that help some methods 644 * perform reasonably even when interpreted (not compiled). 645 * 646 * The order of declarations in this file is (with a few exceptions): 647 * (1) Static utility functions 648 * (2) Nested (static) classes 649 * (3) Static fields 650 * (4) Fields, along with constants used when unpacking some of them 651 * (5) Internal control methods 652 * (6) Callbacks and other support for ForkJoinTask methods 653 * (7) Exported methods 654 * (8) Static block initializing statics in minimally dependent order 655 */ 656 657 // Static utilities 658 659 /** 660 * If there is a security manager, makes sure caller has 661 * permission to modify threads. 662 */ 663 private static void checkPermission() { 664 SecurityManager security = System.getSecurityManager(); 665 if (security != null) 666 security.checkPermission(modifyThreadPermission); 667 } 668 669 // Nested classes 670 671 /** 672 * Factory for creating new {@link ForkJoinWorkerThread}s. 673 * A {@code ForkJoinWorkerThreadFactory} must be defined and used 674 * for {@code ForkJoinWorkerThread} subclasses that extend base 675 * functionality or initialize threads with different contexts. 676 */ 677 public static interface ForkJoinWorkerThreadFactory { 678 /** 679 * Returns a new worker thread operating in the given pool. 680 * Returning null or throwing an exception may result in tasks 681 * never being executed. If this method throws an exception, 682 * it is relayed to the caller of the method (for example 683 * {@code execute}) causing attempted thread creation. If this 684 * method returns null or throws an exception, it is not 685 * retried until the next attempted creation (for example 686 * another call to {@code execute}). 687 * 688 * @param pool the pool this thread works in 689 * @return the new worker thread, or {@code null} if the request 690 * to create a thread is rejected 691 * @throws NullPointerException if the pool is null 692 */ 693 public ForkJoinWorkerThread newThread(ForkJoinPool pool); 694 } 695 696 static AccessControlContext contextWithPermissions(Permission ... perms) { 697 Permissions permissions = new Permissions(); 698 for (Permission perm : perms) 699 permissions.add(perm); 700 return new AccessControlContext( 701 new ProtectionDomain[] { new ProtectionDomain(null, permissions) }); 702 } 703 704 /** 705 * Default ForkJoinWorkerThreadFactory implementation; creates a 706 * new ForkJoinWorkerThread using the system class loader as the 707 * thread context class loader. 708 */ 709 private static final class DefaultForkJoinWorkerThreadFactory 710 implements ForkJoinWorkerThreadFactory { 711 private static final AccessControlContext ACC = contextWithPermissions( 712 new RuntimePermission("getClassLoader"), 713 new RuntimePermission("setContextClassLoader")); 714 715 public final ForkJoinWorkerThread newThread(ForkJoinPool pool) { 716 return AccessController.doPrivileged( 717 new PrivilegedAction<>() { 718 public ForkJoinWorkerThread run() { 719 return new ForkJoinWorkerThread( 720 pool, ClassLoader.getSystemClassLoader()); }}, 721 ACC); 722 } 723 } 724 725 // Constants shared across ForkJoinPool and WorkQueue 726 727 // Bounds 728 static final int SWIDTH = 16; // width of short 729 static final int SMASK = 0xffff; // short bits == max index 730 static final int MAX_CAP = 0x7fff; // max #workers - 1 731 static final int SQMASK = 0x007e; // max 64 (even) slots 732 733 // Masks and units for WorkQueue.phase and ctl sp subfield 734 static final int UNSIGNALLED = 1 << 31; // must be negative 735 static final int SS_SEQ = 1 << 16; // version count 736 static final int QLOCK = 1; // must be 1 737 738 // Mode bits and sentinels, some also used in WorkQueue id and.source fields 739 static final int OWNED = 1; // queue has owner thread 740 static final int FIFO = 1 << 16; // fifo queue or access mode 741 static final int SHUTDOWN = 1 << 18; 742 static final int TERMINATED = 1 << 19; 743 static final int STOP = 1 << 31; // must be negative 744 static final int QUIET = 1 << 30; // not scanning or working 745 static final int DORMANT = QUIET | UNSIGNALLED; 746 747 /** 748 * Initial capacity of work-stealing queue array. 749 * Must be a power of two, at least 2. 750 */ 751 static final int INITIAL_QUEUE_CAPACITY = 1 << 13; 752 753 /** 754 * Maximum capacity for queue arrays. Must be a power of two less 755 * than or equal to 1 << (31 - width of array entry) to ensure 756 * lack of wraparound of index calculations, but defined to a 757 * value a bit less than this to help users trap runaway programs 758 * before saturating systems. 759 */ 760 static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M 761 762 /** 763 * The maximum number of top-level polls per worker before 764 * checking other queues, expressed as a bit shift. See above for 765 * rationale. 766 */ 767 static final int TOP_BOUND_SHIFT = 10; 768 769 /** 770 * Queues supporting work-stealing as well as external task 771 * submission. See above for descriptions and algorithms. 772 */ 773 @jdk.internal.vm.annotation.Contended 774 static final class WorkQueue { 775 volatile int source; // source queue id, or sentinel 776 int id; // pool index, mode, tag 777 int base; // index of next slot for poll 778 int top; // index of next slot for push 779 volatile int phase; // versioned, negative: queued, 1: locked 780 int stackPred; // pool stack (ctl) predecessor link 781 int nsteals; // number of steals 782 ForkJoinTask<?>[] array; // the queued tasks; power of 2 size 783 final ForkJoinPool pool; // the containing pool (may be null) 784 final ForkJoinWorkerThread owner; // owning thread or null if shared 785 786 WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) { 787 this.pool = pool; 788 this.owner = owner; 789 // Place indices in the center of array (that is not yet allocated) 790 base = top = INITIAL_QUEUE_CAPACITY >>> 1; 791 } 792 793 /** 794 * Tries to lock shared queue by CASing phase field. 795 */ 796 final boolean tryLockPhase() { 797 return PHASE.compareAndSet(this, 0, 1); 798 } 799 800 final void releasePhaseLock() { 801 PHASE.setRelease(this, 0); 802 } 803 804 /** 805 * Returns an exportable index (used by ForkJoinWorkerThread). 806 */ 807 final int getPoolIndex() { 808 return (id & 0xffff) >>> 1; // ignore odd/even tag bit 809 } 810 811 /** 812 * Returns the approximate number of tasks in the queue. 813 */ 814 final int queueSize() { 815 int n = (int)BASE.getAcquire(this) - top; 816 return (n >= 0) ? 0 : -n; // ignore transient negative 817 } 818 819 /** 820 * Provides a more accurate estimate of whether this queue has 821 * any tasks than does queueSize, by checking whether a 822 * near-empty queue has at least one unclaimed task. 823 */ 824 final boolean isEmpty() { 825 ForkJoinTask<?>[] a; int n, cap, b; 826 VarHandle.acquireFence(); // needed by external callers 827 return ((n = (b = base) - top) >= 0 || // possibly one task 828 (n == -1 && ((a = array) == null || 829 (cap = a.length) == 0 || 830 a[(cap - 1) & b] == null))); 831 } 832 833 /** 834 * Pushes a task. Call only by owner in unshared queues. 835 * 836 * @param task the task. Caller must ensure non-null. 837 * @throws RejectedExecutionException if array cannot be resized 838 */ 839 final void push(ForkJoinTask<?> task) { 840 ForkJoinTask<?>[] a; 841 int s = top, d = s - base, cap, m; 842 ForkJoinPool p = pool; 843 if ((a = array) != null && (cap = a.length) > 0) { 844 QA.setRelease(a, (m = cap - 1) & s, task); 845 top = s + 1; 846 if (d == m) 847 growArray(false); 848 else if (QA.getAcquire(a, m & (s - 1)) == null && p != null) { 849 VarHandle.fullFence(); // was empty 850 p.signalWork(null); 851 } 852 } 853 } 854 855 /** 856 * Version of push for shared queues. Call only with phase lock held. 857 * @return true if should signal work 858 */ 859 final boolean lockedPush(ForkJoinTask<?> task) { 860 ForkJoinTask<?>[] a; 861 boolean signal = false; 862 int s = top, d = s - base, cap, m; 863 if ((a = array) != null && (cap = a.length) > 0) { 864 a[(m = (cap - 1)) & s] = task; 865 top = s + 1; 866 if (d == m) 867 growArray(true); 868 else { 869 phase = 0; // full volatile unlock 870 if (((s - base) & ~1) == 0) // size 0 or 1 871 signal = true; 872 } 873 } 874 return signal; 875 } 876 877 /** 878 * Doubles the capacity of array. Call either by owner or with 879 * lock held -- it is OK for base, but not top, to move while 880 * resizings are in progress. 881 */ 882 final void growArray(boolean locked) { 883 ForkJoinTask<?>[] newA = null; 884 try { 885 ForkJoinTask<?>[] oldA; int oldSize, newSize; 886 if ((oldA = array) != null && (oldSize = oldA.length) > 0 && 887 (newSize = oldSize << 1) <= MAXIMUM_QUEUE_CAPACITY && 888 newSize > 0) { 889 try { 890 newA = new ForkJoinTask<?>[newSize]; 891 } catch (OutOfMemoryError ex) { 892 } 893 if (newA != null) { // poll from old array, push to new 894 int oldMask = oldSize - 1, newMask = newSize - 1; 895 for (int s = top - 1, k = oldMask; k >= 0; --k) { 896 ForkJoinTask<?> x = (ForkJoinTask<?>) 897 QA.getAndSet(oldA, s & oldMask, null); 898 if (x != null) 899 newA[s-- & newMask] = x; 900 else 901 break; 902 } 903 array = newA; 904 VarHandle.releaseFence(); 905 } 906 } 907 } finally { 908 if (locked) 909 phase = 0; 910 } 911 if (newA == null) 912 throw new RejectedExecutionException("Queue capacity exceeded"); 913 } 914 915 /** 916 * Takes next task, if one exists, in FIFO order. 917 */ 918 final ForkJoinTask<?> poll() { 919 int b, k, cap; ForkJoinTask<?>[] a; 920 while ((a = array) != null && (cap = a.length) > 0 && 921 top - (b = base) > 0) { 922 ForkJoinTask<?> t = (ForkJoinTask<?>) 923 QA.getAcquire(a, k = (cap - 1) & b); 924 if (base == b++) { 925 if (t == null) 926 Thread.yield(); // await index advance 927 else if (QA.compareAndSet(a, k, t, null)) { 928 BASE.setOpaque(this, b); 929 return t; 930 } 931 } 932 } 933 return null; 934 } 935 936 /** 937 * Takes next task, if one exists, in order specified by mode. 938 */ 939 final ForkJoinTask<?> nextLocalTask() { 940 ForkJoinTask<?> t = null; 941 int md = id, b, s, d, cap; ForkJoinTask<?>[] a; 942 if ((a = array) != null && (cap = a.length) > 0 && 943 (d = (s = top) - (b = base)) > 0) { 944 if ((md & FIFO) == 0 || d == 1) { 945 if ((t = (ForkJoinTask<?>) 946 QA.getAndSet(a, (cap - 1) & --s, null)) != null) 947 TOP.setOpaque(this, s); 948 } 949 else if ((t = (ForkJoinTask<?>) 950 QA.getAndSet(a, (cap - 1) & b++, null)) != null) { 951 BASE.setOpaque(this, b); 952 } 953 else // on contention in FIFO mode, use regular poll 954 t = poll(); 955 } 956 return t; 957 } 958 959 /** 960 * Returns next task, if one exists, in order specified by mode. 961 */ 962 final ForkJoinTask<?> peek() { 963 int cap; ForkJoinTask<?>[] a; 964 return ((a = array) != null && (cap = a.length) > 0) ? 965 a[(cap - 1) & ((id & FIFO) != 0 ? base : top - 1)] : null; 966 } 967 968 /** 969 * Pops the given task only if it is at the current top. 970 */ 971 final boolean tryUnpush(ForkJoinTask<?> task) { 972 boolean popped = false; 973 int s, cap; ForkJoinTask<?>[] a; 974 if ((a = array) != null && (cap = a.length) > 0 && 975 (s = top) != base && 976 (popped = QA.compareAndSet(a, (cap - 1) & --s, task, null))) 977 TOP.setOpaque(this, s); 978 return popped; 979 } 980 981 /** 982 * Shared version of tryUnpush. 983 */ 984 final boolean tryLockedUnpush(ForkJoinTask<?> task) { 985 boolean popped = false; 986 int s = top - 1, k, cap; ForkJoinTask<?>[] a; 987 if ((a = array) != null && (cap = a.length) > 0 && 988 a[k = (cap - 1) & s] == task && tryLockPhase()) { 989 if (top == s + 1 && array == a && 990 (popped = QA.compareAndSet(a, k, task, null))) 991 top = s; 992 releasePhaseLock(); 993 } 994 return popped; 995 } 996 997 /** 998 * Removes and cancels all known tasks, ignoring any exceptions. 999 */ 1000 final void cancelAll() { 1001 for (ForkJoinTask<?> t; (t = poll()) != null; ) 1002 ForkJoinTask.cancelIgnoringExceptions(t); 1003 } 1004 1005 // Specialized execution methods 1006 1007 /** 1008 * Runs the given (stolen) task if nonnull, as well as 1009 * remaining local tasks and others available from the given 1010 * queue, up to bound n (to avoid infinite unfairness). 1011 */ 1012 final void topLevelExec(ForkJoinTask<?> t, WorkQueue q, int n) { 1013 int nstolen = 1; 1014 for (int j = 0;;) { 1015 if (t != null) 1016 t.doExec(); 1017 if (j++ <= n) 1018 t = nextLocalTask(); 1019 else { 1020 j = 0; 1021 t = null; 1022 } 1023 if (t == null) { 1024 if (q != null && (t = q.poll()) != null) { 1025 ++nstolen; 1026 j = 0; 1027 } 1028 else if (j != 0) 1029 break; 1030 } 1031 } 1032 ForkJoinWorkerThread thread = owner; 1033 nsteals += nstolen; 1034 source = 0; 1035 if (thread != null) 1036 thread.afterTopLevelExec(); 1037 } 1038 1039 /** 1040 * If present, removes task from queue and executes it. 1041 */ 1042 final void tryRemoveAndExec(ForkJoinTask<?> task) { 1043 ForkJoinTask<?>[] a; int s, cap; 1044 if ((a = array) != null && (cap = a.length) > 0 && 1045 (s = top) - base > 0) { // traverse from top 1046 for (int m = cap - 1, ns = s - 1, i = ns; ; --i) { 1047 int index = i & m; 1048 ForkJoinTask<?> t = (ForkJoinTask<?>)QA.get(a, index); 1049 if (t == null) 1050 break; 1051 else if (t == task) { 1052 if (QA.compareAndSet(a, index, t, null)) { 1053 top = ns; // safely shift down 1054 for (int j = i; j != ns; ++j) { 1055 ForkJoinTask<?> f; 1056 int pindex = (j + 1) & m; 1057 f = (ForkJoinTask<?>)QA.get(a, pindex); 1058 QA.setVolatile(a, pindex, null); 1059 int jindex = j & m; 1060 QA.setRelease(a, jindex, f); 1061 } 1062 VarHandle.releaseFence(); 1063 t.doExec(); 1064 } 1065 break; 1066 } 1067 } 1068 } 1069 } 1070 1071 /** 1072 * Tries to pop and run tasks within the target's computation 1073 * until done, not found, or limit exceeded. 1074 * 1075 * @param task root of CountedCompleter computation 1076 * @param limit max runs, or zero for no limit 1077 * @param shared true if must lock to extract task 1078 * @return task status on exit 1079 */ 1080 final int helpCC(CountedCompleter<?> task, int limit, boolean shared) { 1081 int status = 0; 1082 if (task != null && (status = task.status) >= 0) { 1083 int s, k, cap; ForkJoinTask<?>[] a; 1084 while ((a = array) != null && (cap = a.length) > 0 && 1085 (s = top) - base > 0) { 1086 CountedCompleter<?> v = null; 1087 ForkJoinTask<?> o = a[k = (cap - 1) & (s - 1)]; 1088 if (o instanceof CountedCompleter) { 1089 CountedCompleter<?> t = (CountedCompleter<?>)o; 1090 for (CountedCompleter<?> f = t;;) { 1091 if (f != task) { 1092 if ((f = f.completer) == null) 1093 break; 1094 } 1095 else if (shared) { 1096 if (tryLockPhase()) { 1097 if (top == s && array == a && 1098 QA.compareAndSet(a, k, t, null)) { 1099 top = s - 1; 1100 v = t; 1101 } 1102 releasePhaseLock(); 1103 } 1104 break; 1105 } 1106 else { 1107 if (QA.compareAndSet(a, k, t, null)) { 1108 top = s - 1; 1109 v = t; 1110 } 1111 break; 1112 } 1113 } 1114 } 1115 if (v != null) 1116 v.doExec(); 1117 if ((status = task.status) < 0 || v == null || 1118 (limit != 0 && --limit == 0)) 1119 break; 1120 } 1121 } 1122 return status; 1123 } 1124 1125 /** 1126 * Tries to poll and run AsynchronousCompletionTasks until 1127 * none found or blocker is released 1128 * 1129 * @param blocker the blocker 1130 */ 1131 final void helpAsyncBlocker(ManagedBlocker blocker) { 1132 if (blocker != null) { 1133 int b, k, cap; ForkJoinTask<?>[] a; ForkJoinTask<?> t; 1134 while ((a = array) != null && (cap = a.length) > 0 && 1135 top - (b = base) > 0) { 1136 t = (ForkJoinTask<?>)QA.getAcquire(a, k = (cap - 1) & b); 1137 if (blocker.isReleasable()) 1138 break; 1139 else if (base == b++ && t != null) { 1140 if (!(t instanceof CompletableFuture. 1141 AsynchronousCompletionTask)) 1142 break; 1143 else if (QA.compareAndSet(a, k, t, null)) { 1144 BASE.setOpaque(this, b); 1145 t.doExec(); 1146 } 1147 } 1148 } 1149 } 1150 } 1151 1152 /** 1153 * Returns true if owned and not known to be blocked. 1154 */ 1155 final boolean isApparentlyUnblocked() { 1156 Thread wt; Thread.State s; 1157 return ((wt = owner) != null && 1158 (s = wt.getState()) != Thread.State.BLOCKED && 1159 s != Thread.State.WAITING && 1160 s != Thread.State.TIMED_WAITING); 1161 } 1162 1163 // VarHandle mechanics. 1164 static final VarHandle PHASE; 1165 static final VarHandle BASE; 1166 static final VarHandle TOP; 1167 static { 1168 try { 1169 MethodHandles.Lookup l = MethodHandles.lookup(); 1170 PHASE = l.findVarHandle(WorkQueue.class, "phase", int.class); 1171 BASE = l.findVarHandle(WorkQueue.class, "base", int.class); 1172 TOP = l.findVarHandle(WorkQueue.class, "top", int.class); 1173 } catch (ReflectiveOperationException e) { 1174 throw new ExceptionInInitializerError(e); 1175 } 1176 } 1177 } 1178 1179 // static fields (initialized in static initializer below) 1180 1181 /** 1182 * Creates a new ForkJoinWorkerThread. This factory is used unless 1183 * overridden in ForkJoinPool constructors. 1184 */ 1185 public static final ForkJoinWorkerThreadFactory 1186 defaultForkJoinWorkerThreadFactory; 1187 1188 /** 1189 * Permission required for callers of methods that may start or 1190 * kill threads. 1191 */ 1192 static final RuntimePermission modifyThreadPermission; 1193 1194 /** 1195 * Common (static) pool. Non-null for public use unless a static 1196 * construction exception, but internal usages null-check on use 1197 * to paranoically avoid potential initialization circularities 1198 * as well as to simplify generated code. 1199 */ 1200 static final ForkJoinPool common; 1201 1202 /** 1203 * Common pool parallelism. To allow simpler use and management 1204 * when common pool threads are disabled, we allow the underlying 1205 * common.parallelism field to be zero, but in that case still report 1206 * parallelism as 1 to reflect resulting caller-runs mechanics. 1207 */ 1208 static final int COMMON_PARALLELISM; 1209 1210 /** 1211 * Limit on spare thread construction in tryCompensate. 1212 */ 1213 private static final int COMMON_MAX_SPARES; 1214 1215 /** 1216 * Sequence number for creating workerNamePrefix. 1217 */ 1218 private static int poolNumberSequence; 1219 1220 /** 1221 * Returns the next sequence number. We don't expect this to 1222 * ever contend, so use simple builtin sync. 1223 */ 1224 private static final synchronized int nextPoolId() { 1225 return ++poolNumberSequence; 1226 } 1227 1228 // static configuration constants 1229 1230 /** 1231 * Default idle timeout value (in milliseconds) for the thread 1232 * triggering quiescence to park waiting for new work 1233 */ 1234 private static final long DEFAULT_KEEPALIVE = 60_000L; 1235 1236 /** 1237 * Undershoot tolerance for idle timeouts 1238 */ 1239 private static final long TIMEOUT_SLOP = 20L; 1240 1241 /** 1242 * The default value for COMMON_MAX_SPARES. Overridable using the 1243 * "java.util.concurrent.ForkJoinPool.common.maximumSpares" system 1244 * property. The default value is far in excess of normal 1245 * requirements, but also far short of MAX_CAP and typical OS 1246 * thread limits, so allows JVMs to catch misuse/abuse before 1247 * running out of resources needed to do so. 1248 */ 1249 private static final int DEFAULT_COMMON_MAX_SPARES = 256; 1250 1251 /** 1252 * Increment for seed generators. See class ThreadLocal for 1253 * explanation. 1254 */ 1255 private static final int SEED_INCREMENT = 0x9e3779b9; 1256 1257 /* 1258 * Bits and masks for field ctl, packed with 4 16 bit subfields: 1259 * RC: Number of released (unqueued) workers minus target parallelism 1260 * TC: Number of total workers minus target parallelism 1261 * SS: version count and status of top waiting thread 1262 * ID: poolIndex of top of Treiber stack of waiters 1263 * 1264 * When convenient, we can extract the lower 32 stack top bits 1265 * (including version bits) as sp=(int)ctl. The offsets of counts 1266 * by the target parallelism and the positionings of fields makes 1267 * it possible to perform the most common checks via sign tests of 1268 * fields: When ac is negative, there are not enough unqueued 1269 * workers, when tc is negative, there are not enough total 1270 * workers. When sp is non-zero, there are waiting workers. To 1271 * deal with possibly negative fields, we use casts in and out of 1272 * "short" and/or signed shifts to maintain signedness. 1273 * 1274 * Because it occupies uppermost bits, we can add one release count 1275 * using getAndAddLong of RC_UNIT, rather than CAS, when returning 1276 * from a blocked join. Other updates entail multiple subfields 1277 * and masking, requiring CAS. 1278 * 1279 * The limits packed in field "bounds" are also offset by the 1280 * parallelism level to make them comparable to the ctl rc and tc 1281 * fields. 1282 */ 1283 1284 // Lower and upper word masks 1285 private static final long SP_MASK = 0xffffffffL; 1286 private static final long UC_MASK = ~SP_MASK; 1287 1288 // Release counts 1289 private static final int RC_SHIFT = 48; 1290 private static final long RC_UNIT = 0x0001L << RC_SHIFT; 1291 private static final long RC_MASK = 0xffffL << RC_SHIFT; 1292 1293 // Total counts 1294 private static final int TC_SHIFT = 32; 1295 private static final long TC_UNIT = 0x0001L << TC_SHIFT; 1296 private static final long TC_MASK = 0xffffL << TC_SHIFT; 1297 private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign 1298 1299 // Instance fields 1300 1301 volatile long stealCount; // collects worker nsteals 1302 final long keepAlive; // milliseconds before dropping if idle 1303 int indexSeed; // next worker index 1304 final int bounds; // min, max threads packed as shorts 1305 volatile int mode; // parallelism, runstate, queue mode 1306 WorkQueue[] workQueues; // main registry 1307 final String workerNamePrefix; // for worker thread string; sync lock 1308 final ForkJoinWorkerThreadFactory factory; 1309 final UncaughtExceptionHandler ueh; // per-worker UEH 1310 final Predicate<? super ForkJoinPool> saturate; 1311 1312 @jdk.internal.vm.annotation.Contended("fjpctl") // segregate 1313 volatile long ctl; // main pool control 1314 1315 // Creating, registering and deregistering workers 1316 1317 /** 1318 * Tries to construct and start one worker. Assumes that total 1319 * count has already been incremented as a reservation. Invokes 1320 * deregisterWorker on any failure. 1321 * 1322 * @return true if successful 1323 */ 1324 private boolean createWorker() { 1325 ForkJoinWorkerThreadFactory fac = factory; 1326 Throwable ex = null; 1327 ForkJoinWorkerThread wt = null; 1328 try { 1329 if (fac != null && (wt = fac.newThread(this)) != null) { 1330 wt.start(); 1331 return true; 1332 } 1333 } catch (Throwable rex) { 1334 ex = rex; 1335 } 1336 deregisterWorker(wt, ex); 1337 return false; 1338 } 1339 1340 /** 1341 * Tries to add one worker, incrementing ctl counts before doing 1342 * so, relying on createWorker to back out on failure. 1343 * 1344 * @param c incoming ctl value, with total count negative and no 1345 * idle workers. On CAS failure, c is refreshed and retried if 1346 * this holds (otherwise, a new worker is not needed). 1347 */ 1348 private void tryAddWorker(long c) { 1349 do { 1350 long nc = ((RC_MASK & (c + RC_UNIT)) | 1351 (TC_MASK & (c + TC_UNIT))); 1352 if (ctl == c && CTL.compareAndSet(this, c, nc)) { 1353 createWorker(); 1354 break; 1355 } 1356 } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0); 1357 } 1358 1359 /** 1360 * Callback from ForkJoinWorkerThread constructor to establish and 1361 * record its WorkQueue. 1362 * 1363 * @param wt the worker thread 1364 * @return the worker's queue 1365 */ 1366 final WorkQueue registerWorker(ForkJoinWorkerThread wt) { 1367 UncaughtExceptionHandler handler; 1368 wt.setDaemon(true); // configure thread 1369 if ((handler = ueh) != null) 1370 wt.setUncaughtExceptionHandler(handler); 1371 int tid = 0; // for thread name 1372 int idbits = mode & FIFO; 1373 String prefix = workerNamePrefix; 1374 WorkQueue w = new WorkQueue(this, wt); 1375 if (prefix != null) { 1376 synchronized (prefix) { 1377 WorkQueue[] ws = workQueues; int n; 1378 int s = indexSeed += SEED_INCREMENT; 1379 idbits |= (s & ~(SMASK | FIFO | DORMANT)); 1380 if (ws != null && (n = ws.length) > 1) { 1381 int m = n - 1; 1382 tid = m & ((s << 1) | 1); // odd-numbered indices 1383 for (int probes = n >>> 1;;) { // find empty slot 1384 WorkQueue q; 1385 if ((q = ws[tid]) == null || q.phase == QUIET) 1386 break; 1387 else if (--probes == 0) { 1388 tid = n | 1; // resize below 1389 break; 1390 } 1391 else 1392 tid = (tid + 2) & m; 1393 } 1394 w.phase = w.id = tid | idbits; // now publishable 1395 1396 if (tid < n) 1397 ws[tid] = w; 1398 else { // expand array 1399 int an = n << 1; 1400 WorkQueue[] as = new WorkQueue[an]; 1401 as[tid] = w; 1402 int am = an - 1; 1403 for (int j = 0; j < n; ++j) { 1404 WorkQueue v; // copy external queue 1405 if ((v = ws[j]) != null) // position may change 1406 as[v.id & am & SQMASK] = v; 1407 if (++j >= n) 1408 break; 1409 as[j] = ws[j]; // copy worker 1410 } 1411 workQueues = as; 1412 } 1413 } 1414 } 1415 wt.setName(prefix.concat(Integer.toString(tid))); 1416 } 1417 return w; 1418 } 1419 1420 /** 1421 * Final callback from terminating worker, as well as upon failure 1422 * to construct or start a worker. Removes record of worker from 1423 * array, and adjusts counts. If pool is shutting down, tries to 1424 * complete termination. 1425 * 1426 * @param wt the worker thread, or null if construction failed 1427 * @param ex the exception causing failure, or null if none 1428 */ 1429 final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) { 1430 WorkQueue w = null; 1431 int phase = 0; 1432 if (wt != null && (w = wt.workQueue) != null) { 1433 Object lock = workerNamePrefix; 1434 int wid = w.id; 1435 long ns = (long)w.nsteals & 0xffffffffL; 1436 if (lock != null) { 1437 synchronized (lock) { 1438 WorkQueue[] ws; int n, i; // remove index from array 1439 if ((ws = workQueues) != null && (n = ws.length) > 0 && 1440 ws[i = wid & (n - 1)] == w) 1441 ws[i] = null; 1442 stealCount += ns; 1443 } 1444 } 1445 phase = w.phase; 1446 } 1447 if (phase != QUIET) { // else pre-adjusted 1448 long c; // decrement counts 1449 do {} while (!CTL.weakCompareAndSet 1450 (this, c = ctl, ((RC_MASK & (c - RC_UNIT)) | 1451 (TC_MASK & (c - TC_UNIT)) | 1452 (SP_MASK & c)))); 1453 } 1454 if (w != null) 1455 w.cancelAll(); // cancel remaining tasks 1456 1457 if (!tryTerminate(false, false) && // possibly replace worker 1458 w != null && w.array != null) // avoid repeated failures 1459 signalWork(null); 1460 1461 if (ex == null) // help clean on way out 1462 ForkJoinTask.helpExpungeStaleExceptions(); 1463 else // rethrow 1464 ForkJoinTask.rethrow(ex); 1465 } 1466 1467 /** 1468 * Tries to create or release a worker if too few are running. 1469 * @param q if non-null recheck if empty on CAS failure 1470 */ 1471 final void signalWork(WorkQueue q) { 1472 for (;;) { 1473 long c; int sp; WorkQueue[] ws; int i; WorkQueue v; 1474 if ((c = ctl) >= 0L) // enough workers 1475 break; 1476 else if ((sp = (int)c) == 0) { // no idle workers 1477 if ((c & ADD_WORKER) != 0L) // too few workers 1478 tryAddWorker(c); 1479 break; 1480 } 1481 else if ((ws = workQueues) == null) 1482 break; // unstarted/terminated 1483 else if (ws.length <= (i = sp & SMASK)) 1484 break; // terminated 1485 else if ((v = ws[i]) == null) 1486 break; // terminating 1487 else { 1488 int np = sp & ~UNSIGNALLED; 1489 int vp = v.phase; 1490 long nc = (v.stackPred & SP_MASK) | (UC_MASK & (c + RC_UNIT)); 1491 Thread vt = v.owner; 1492 if (sp == vp && CTL.compareAndSet(this, c, nc)) { 1493 v.phase = np; 1494 if (vt != null && v.source < 0) 1495 LockSupport.unpark(vt); 1496 break; 1497 } 1498 else if (q != null && q.isEmpty()) // no need to retry 1499 break; 1500 } 1501 } 1502 } 1503 1504 /** 1505 * Tries to decrement counts (sometimes implicitly) and possibly 1506 * arrange for a compensating worker in preparation for blocking: 1507 * If not all core workers yet exist, creates one, else if any are 1508 * unreleased (possibly including caller) releases one, else if 1509 * fewer than the minimum allowed number of workers running, 1510 * checks to see that they are all active, and if so creates an 1511 * extra worker unless over maximum limit and policy is to 1512 * saturate. Most of these steps can fail due to interference, in 1513 * which case 0 is returned so caller will retry. A negative 1514 * return value indicates that the caller doesn't need to 1515 * re-adjust counts when later unblocked. 1516 * 1517 * @return 1: block then adjust, -1: block without adjust, 0 : retry 1518 */ 1519 private int tryCompensate(WorkQueue w) { 1520 int t, n, sp; 1521 long c = ctl; 1522 WorkQueue[] ws = workQueues; 1523 if ((t = (short)(c >>> TC_SHIFT)) >= 0) { 1524 if (ws == null || (n = ws.length) <= 0 || w == null) 1525 return 0; // disabled 1526 else if ((sp = (int)c) != 0) { // replace or release 1527 WorkQueue v = ws[sp & (n - 1)]; 1528 int wp = w.phase; 1529 long uc = UC_MASK & ((wp < 0) ? c + RC_UNIT : c); 1530 int np = sp & ~UNSIGNALLED; 1531 if (v != null) { 1532 int vp = v.phase; 1533 Thread vt = v.owner; 1534 long nc = ((long)v.stackPred & SP_MASK) | uc; 1535 if (vp == sp && CTL.compareAndSet(this, c, nc)) { 1536 v.phase = np; 1537 if (vt != null && v.source < 0) 1538 LockSupport.unpark(vt); 1539 return (wp < 0) ? -1 : 1; 1540 } 1541 } 1542 return 0; 1543 } 1544 else if ((int)(c >> RC_SHIFT) - // reduce parallelism 1545 (short)(bounds & SMASK) > 0) { 1546 long nc = ((RC_MASK & (c - RC_UNIT)) | (~RC_MASK & c)); 1547 return CTL.compareAndSet(this, c, nc) ? 1 : 0; 1548 } 1549 else { // validate 1550 int md = mode, pc = md & SMASK, tc = pc + t, bc = 0; 1551 boolean unstable = false; 1552 for (int i = 1; i < n; i += 2) { 1553 WorkQueue q; Thread wt; Thread.State ts; 1554 if ((q = ws[i]) != null) { 1555 if (q.source == 0) { 1556 unstable = true; 1557 break; 1558 } 1559 else { 1560 --tc; 1561 if ((wt = q.owner) != null && 1562 ((ts = wt.getState()) == Thread.State.BLOCKED || 1563 ts == Thread.State.WAITING)) 1564 ++bc; // worker is blocking 1565 } 1566 } 1567 } 1568 if (unstable || tc != 0 || ctl != c) 1569 return 0; // inconsistent 1570 else if (t + pc >= MAX_CAP || t >= (bounds >>> SWIDTH)) { 1571 Predicate<? super ForkJoinPool> sat; 1572 if ((sat = saturate) != null && sat.test(this)) 1573 return -1; 1574 else if (bc < pc) { // lagging 1575 Thread.yield(); // for retry spins 1576 return 0; 1577 } 1578 else 1579 throw new RejectedExecutionException( 1580 "Thread limit exceeded replacing blocked worker"); 1581 } 1582 } 1583 } 1584 1585 long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK); // expand pool 1586 return CTL.compareAndSet(this, c, nc) && createWorker() ? 1 : 0; 1587 } 1588 1589 /** 1590 * Top-level runloop for workers, called by ForkJoinWorkerThread.run. 1591 * See above for explanation. 1592 */ 1593 final void runWorker(WorkQueue w) { 1594 int r = (w.id ^ ThreadLocalRandom.nextSecondarySeed()) | FIFO; // rng 1595 w.array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY]; // initialize 1596 for (;;) { 1597 int phase; 1598 if (scan(w, r)) { // scan until apparently empty 1599 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // move (xorshift) 1600 } 1601 else if ((phase = w.phase) >= 0) { // enqueue, then rescan 1602 long np = (w.phase = (phase + SS_SEQ) | UNSIGNALLED) & SP_MASK; 1603 long c, nc; 1604 do { 1605 w.stackPred = (int)(c = ctl); 1606 nc = ((c - RC_UNIT) & UC_MASK) | np; 1607 } while (!CTL.weakCompareAndSet(this, c, nc)); 1608 } 1609 else { // already queued 1610 int pred = w.stackPred; 1611 Thread.interrupted(); // clear before park 1612 w.source = DORMANT; // enable signal 1613 long c = ctl; 1614 int md = mode, rc = (md & SMASK) + (int)(c >> RC_SHIFT); 1615 if (md < 0) // terminating 1616 break; 1617 else if (rc <= 0 && (md & SHUTDOWN) != 0 && 1618 tryTerminate(false, false)) 1619 break; // quiescent shutdown 1620 else if (w.phase < 0) { 1621 if (rc <= 0 && pred != 0 && phase == (int)c) { 1622 long nc = (UC_MASK & (c - TC_UNIT)) | (SP_MASK & pred); 1623 long d = keepAlive + System.currentTimeMillis(); 1624 LockSupport.parkUntil(this, d); 1625 if (ctl == c && // drop on timeout if all idle 1626 d - System.currentTimeMillis() <= TIMEOUT_SLOP && 1627 CTL.compareAndSet(this, c, nc)) { 1628 w.phase = QUIET; 1629 break; 1630 } 1631 } 1632 else { 1633 LockSupport.park(this); 1634 if (w.phase < 0) // one spurious wakeup check 1635 LockSupport.park(this); 1636 } 1637 } 1638 w.source = 0; // disable signal 1639 } 1640 } 1641 } 1642 1643 /** 1644 * Scans for and if found executes one or more top-level tasks from a queue. 1645 * 1646 * @return true if found an apparently non-empty queue, and 1647 * possibly ran task(s). 1648 */ 1649 private boolean scan(WorkQueue w, int r) { 1650 WorkQueue[] ws; int n; 1651 if ((ws = workQueues) != null && (n = ws.length) > 0 && w != null) { 1652 for (int m = n - 1, j = r & m;;) { 1653 WorkQueue q; int b; 1654 if ((q = ws[j]) != null && q.top != (b = q.base)) { 1655 int qid = q.id; 1656 ForkJoinTask<?>[] a; int cap, k; ForkJoinTask<?> t; 1657 if ((a = q.array) != null && (cap = a.length) > 0) { 1658 t = (ForkJoinTask<?>)QA.getAcquire(a, k = (cap - 1) & b); 1659 if (q.base == b++ && t != null && 1660 QA.compareAndSet(a, k, t, null)) { 1661 q.base = b; 1662 w.source = qid; 1663 if (a[(cap - 1) & b] != null) 1664 signalWork(q); // help signal if more tasks 1665 w.topLevelExec(t, q, // random fairness bound 1666 (r | (1 << TOP_BOUND_SHIFT)) & SMASK); 1667 } 1668 } 1669 return true; 1670 } 1671 else if (--n > 0) 1672 j = (j + 1) & m; 1673 else 1674 break; 1675 } 1676 } 1677 return false; 1678 } 1679 1680 /** 1681 * Helps and/or blocks until the given task is done or timeout. 1682 * First tries locally helping, then scans other queues for a task 1683 * produced by one of w's stealers; compensating and blocking if 1684 * none are found (rescanning if tryCompensate fails). 1685 * 1686 * @param w caller 1687 * @param task the task 1688 * @param deadline for timed waits, if nonzero 1689 * @return task status on exit 1690 */ 1691 final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) { 1692 int s = 0; 1693 int seed = ThreadLocalRandom.nextSecondarySeed(); 1694 if (w != null && task != null && 1695 (!(task instanceof CountedCompleter) || 1696 (s = w.helpCC((CountedCompleter<?>)task, 0, false)) >= 0)) { 1697 w.tryRemoveAndExec(task); 1698 int src = w.source, id = w.id; 1699 int r = (seed >>> 16) | 1, step = (seed & ~1) | 2; 1700 s = task.status; 1701 while (s >= 0) { 1702 WorkQueue[] ws; 1703 int n = (ws = workQueues) == null ? 0 : ws.length, m = n - 1; 1704 while (n > 0) { 1705 WorkQueue q; int b; 1706 if ((q = ws[r & m]) != null && q.source == id && 1707 q.top != (b = q.base)) { 1708 ForkJoinTask<?>[] a; int cap, k; 1709 int qid = q.id; 1710 if ((a = q.array) != null && (cap = a.length) > 0) { 1711 ForkJoinTask<?> t = (ForkJoinTask<?>) 1712 QA.getAcquire(a, k = (cap - 1) & b); 1713 if (q.source == id && q.base == b++ && 1714 t != null && QA.compareAndSet(a, k, t, null)) { 1715 q.base = b; 1716 w.source = qid; 1717 t.doExec(); 1718 w.source = src; 1719 } 1720 } 1721 break; 1722 } 1723 else { 1724 r += step; 1725 --n; 1726 } 1727 } 1728 if ((s = task.status) < 0) 1729 break; 1730 else if (n == 0) { // empty scan 1731 long ms, ns; int block; 1732 if (deadline == 0L) 1733 ms = 0L; // untimed 1734 else if ((ns = deadline - System.nanoTime()) <= 0L) 1735 break; // timeout 1736 else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L) 1737 ms = 1L; // avoid 0 for timed wait 1738 if ((block = tryCompensate(w)) != 0) { 1739 task.internalWait(ms); 1740 CTL.getAndAdd(this, (block > 0) ? RC_UNIT : 0L); 1741 } 1742 s = task.status; 1743 } 1744 } 1745 } 1746 return s; 1747 } 1748 1749 /** 1750 * Runs tasks until {@code isQuiescent()}. Rather than blocking 1751 * when tasks cannot be found, rescans until all others cannot 1752 * find tasks either. 1753 */ 1754 final void helpQuiescePool(WorkQueue w) { 1755 int prevSrc = w.source; 1756 int seed = ThreadLocalRandom.nextSecondarySeed(); 1757 int r = seed >>> 16, step = r | 1; 1758 for (int source = prevSrc, released = -1;;) { // -1 until known 1759 ForkJoinTask<?> localTask; WorkQueue[] ws; 1760 while ((localTask = w.nextLocalTask()) != null) 1761 localTask.doExec(); 1762 if (w.phase >= 0 && released == -1) 1763 released = 1; 1764 boolean quiet = true, empty = true; 1765 int n = (ws = workQueues) == null ? 0 : ws.length; 1766 for (int m = n - 1; n > 0; r += step, --n) { 1767 WorkQueue q; int b; 1768 if ((q = ws[r & m]) != null) { 1769 int qs = q.source; 1770 if (q.top != (b = q.base)) { 1771 quiet = empty = false; 1772 ForkJoinTask<?>[] a; int cap, k; 1773 int qid = q.id; 1774 if ((a = q.array) != null && (cap = a.length) > 0) { 1775 if (released == 0) { // increment 1776 released = 1; 1777 CTL.getAndAdd(this, RC_UNIT); 1778 } 1779 ForkJoinTask<?> t = (ForkJoinTask<?>) 1780 QA.getAcquire(a, k = (cap - 1) & b); 1781 if (q.base == b++ && t != null && 1782 QA.compareAndSet(a, k, t, null)) { 1783 q.base = b; 1784 w.source = qid; 1785 t.doExec(); 1786 w.source = source = prevSrc; 1787 } 1788 } 1789 break; 1790 } 1791 else if ((qs & QUIET) == 0) 1792 quiet = false; 1793 } 1794 } 1795 if (quiet) { 1796 if (released == 0) 1797 CTL.getAndAdd(this, RC_UNIT); 1798 w.source = prevSrc; 1799 break; 1800 } 1801 else if (empty) { 1802 if (source != QUIET) 1803 w.source = source = QUIET; 1804 if (released == 1) { // decrement 1805 released = 0; 1806 CTL.getAndAdd(this, RC_MASK & -RC_UNIT); 1807 } 1808 } 1809 } 1810 } 1811 1812 /** 1813 * Scans for and returns a polled task, if available. 1814 * Used only for untracked polls. 1815 * 1816 * @param submissionsOnly if true, only scan submission queues 1817 */ 1818 private ForkJoinTask<?> pollScan(boolean submissionsOnly) { 1819 WorkQueue[] ws; int n; 1820 rescan: while ((mode & STOP) == 0 && (ws = workQueues) != null && 1821 (n = ws.length) > 0) { 1822 int m = n - 1; 1823 int r = ThreadLocalRandom.nextSecondarySeed(); 1824 int h = r >>> 16; 1825 int origin, step; 1826 if (submissionsOnly) { 1827 origin = (r & ~1) & m; // even indices and steps 1828 step = (h & ~1) | 2; 1829 } 1830 else { 1831 origin = r & m; 1832 step = h | 1; 1833 } 1834 boolean nonempty = false; 1835 for (int i = origin, oldSum = 0, checkSum = 0;;) { 1836 WorkQueue q; 1837 if ((q = ws[i]) != null) { 1838 int b; ForkJoinTask<?> t; 1839 if (q.top - (b = q.base) > 0) { 1840 nonempty = true; 1841 if ((t = q.poll()) != null) 1842 return t; 1843 } 1844 else 1845 checkSum += b + q.id; 1846 } 1847 if ((i = (i + step) & m) == origin) { 1848 if (!nonempty && oldSum == (oldSum = checkSum)) 1849 break rescan; 1850 checkSum = 0; 1851 nonempty = false; 1852 } 1853 } 1854 } 1855 return null; 1856 } 1857 1858 /** 1859 * Gets and removes a local or stolen task for the given worker. 1860 * 1861 * @return a task, if available 1862 */ 1863 final ForkJoinTask<?> nextTaskFor(WorkQueue w) { 1864 ForkJoinTask<?> t; 1865 if (w == null || (t = w.nextLocalTask()) == null) 1866 t = pollScan(false); 1867 return t; 1868 } 1869 1870 // External operations 1871 1872 /** 1873 * Adds the given task to a submission queue at submitter's 1874 * current queue, creating one if null or contended. 1875 * 1876 * @param task the task. Caller must ensure non-null. 1877 */ 1878 final void externalPush(ForkJoinTask<?> task) { 1879 int r; // initialize caller's probe 1880 if ((r = ThreadLocalRandom.getProbe()) == 0) { 1881 ThreadLocalRandom.localInit(); 1882 r = ThreadLocalRandom.getProbe(); 1883 } 1884 for (;;) { 1885 WorkQueue q; 1886 int md = mode, n; 1887 WorkQueue[] ws = workQueues; 1888 if ((md & SHUTDOWN) != 0 || ws == null || (n = ws.length) <= 0) 1889 throw new RejectedExecutionException(); 1890 else if ((q = ws[(n - 1) & r & SQMASK]) == null) { // add queue 1891 int qid = (r | QUIET) & ~(FIFO | OWNED); 1892 Object lock = workerNamePrefix; 1893 ForkJoinTask<?>[] qa = 1894 new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY]; 1895 q = new WorkQueue(this, null); 1896 q.array = qa; 1897 q.id = qid; 1898 q.source = QUIET; 1899 if (lock != null) { // unless disabled, lock pool to install 1900 synchronized (lock) { 1901 WorkQueue[] vs; int i, vn; 1902 if ((vs = workQueues) != null && (vn = vs.length) > 0 && 1903 vs[i = qid & (vn - 1) & SQMASK] == null) 1904 vs[i] = q; // else another thread already installed 1905 } 1906 } 1907 } 1908 else if (!q.tryLockPhase()) // move if busy 1909 r = ThreadLocalRandom.advanceProbe(r); 1910 else { 1911 if (q.lockedPush(task)) 1912 signalWork(null); 1913 return; 1914 } 1915 } 1916 } 1917 1918 /** 1919 * Pushes a possibly-external submission. 1920 */ 1921 private <T> ForkJoinTask<T> externalSubmit(ForkJoinTask<T> task) { 1922 Thread t; ForkJoinWorkerThread w; WorkQueue q; 1923 if (task == null) 1924 throw new NullPointerException(); 1925 if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) && 1926 (w = (ForkJoinWorkerThread)t).pool == this && 1927 (q = w.workQueue) != null) 1928 q.push(task); 1929 else 1930 externalPush(task); 1931 return task; 1932 } 1933 1934 /** 1935 * Returns common pool queue for an external thread. 1936 */ 1937 static WorkQueue commonSubmitterQueue() { 1938 ForkJoinPool p = common; 1939 int r = ThreadLocalRandom.getProbe(); 1940 WorkQueue[] ws; int n; 1941 return (p != null && (ws = p.workQueues) != null && 1942 (n = ws.length) > 0) ? 1943 ws[(n - 1) & r & SQMASK] : null; 1944 } 1945 1946 /** 1947 * Performs tryUnpush for an external submitter. 1948 */ 1949 final boolean tryExternalUnpush(ForkJoinTask<?> task) { 1950 int r = ThreadLocalRandom.getProbe(); 1951 WorkQueue[] ws; WorkQueue w; int n; 1952 return ((ws = workQueues) != null && 1953 (n = ws.length) > 0 && 1954 (w = ws[(n - 1) & r & SQMASK]) != null && 1955 w.tryLockedUnpush(task)); 1956 } 1957 1958 /** 1959 * Performs helpComplete for an external submitter. 1960 */ 1961 final int externalHelpComplete(CountedCompleter<?> task, int maxTasks) { 1962 int r = ThreadLocalRandom.getProbe(); 1963 WorkQueue[] ws; WorkQueue w; int n; 1964 return ((ws = workQueues) != null && (n = ws.length) > 0 && 1965 (w = ws[(n - 1) & r & SQMASK]) != null) ? 1966 w.helpCC(task, maxTasks, true) : 0; 1967 } 1968 1969 /** 1970 * Tries to steal and run tasks within the target's computation. 1971 * The maxTasks argument supports external usages; internal calls 1972 * use zero, allowing unbounded steps (external calls trap 1973 * non-positive values). 1974 * 1975 * @param w caller 1976 * @param maxTasks if non-zero, the maximum number of other tasks to run 1977 * @return task status on exit 1978 */ 1979 final int helpComplete(WorkQueue w, CountedCompleter<?> task, 1980 int maxTasks) { 1981 return (w == null) ? 0 : w.helpCC(task, maxTasks, false); 1982 } 1983 1984 /** 1985 * Returns a cheap heuristic guide for task partitioning when 1986 * programmers, frameworks, tools, or languages have little or no 1987 * idea about task granularity. In essence, by offering this 1988 * method, we ask users only about tradeoffs in overhead vs 1989 * expected throughput and its variance, rather than how finely to 1990 * partition tasks. 1991 * 1992 * In a steady state strict (tree-structured) computation, each 1993 * thread makes available for stealing enough tasks for other 1994 * threads to remain active. Inductively, if all threads play by 1995 * the same rules, each thread should make available only a 1996 * constant number of tasks. 1997 * 1998 * The minimum useful constant is just 1. But using a value of 1 1999 * would require immediate replenishment upon each steal to 2000 * maintain enough tasks, which is infeasible. Further, 2001 * partitionings/granularities of offered tasks should minimize 2002 * steal rates, which in general means that threads nearer the top 2003 * of computation tree should generate more than those nearer the 2004 * bottom. In perfect steady state, each thread is at 2005 * approximately the same level of computation tree. However, 2006 * producing extra tasks amortizes the uncertainty of progress and 2007 * diffusion assumptions. 2008 * 2009 * So, users will want to use values larger (but not much larger) 2010 * than 1 to both smooth over transient shortages and hedge 2011 * against uneven progress; as traded off against the cost of 2012 * extra task overhead. We leave the user to pick a threshold 2013 * value to compare with the results of this call to guide 2014 * decisions, but recommend values such as 3. 2015 * 2016 * When all threads are active, it is on average OK to estimate 2017 * surplus strictly locally. In steady-state, if one thread is 2018 * maintaining say 2 surplus tasks, then so are others. So we can 2019 * just use estimated queue length. However, this strategy alone 2020 * leads to serious mis-estimates in some non-steady-state 2021 * conditions (ramp-up, ramp-down, other stalls). We can detect 2022 * many of these by further considering the number of "idle" 2023 * threads, that are known to have zero queued tasks, so 2024 * compensate by a factor of (#idle/#active) threads. 2025 */ 2026 static int getSurplusQueuedTaskCount() { 2027 Thread t; ForkJoinWorkerThread wt; ForkJoinPool pool; WorkQueue q; 2028 if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) && 2029 (pool = (wt = (ForkJoinWorkerThread)t).pool) != null && 2030 (q = wt.workQueue) != null) { 2031 int p = pool.mode & SMASK; 2032 int a = p + (int)(pool.ctl >> RC_SHIFT); 2033 int n = q.top - q.base; 2034 return n - (a > (p >>>= 1) ? 0 : 2035 a > (p >>>= 1) ? 1 : 2036 a > (p >>>= 1) ? 2 : 2037 a > (p >>>= 1) ? 4 : 2038 8); 2039 } 2040 return 0; 2041 } 2042 2043 // Termination 2044 2045 /** 2046 * Possibly initiates and/or completes termination. 2047 * 2048 * @param now if true, unconditionally terminate, else only 2049 * if no work and no active workers 2050 * @param enable if true, terminate when next possible 2051 * @return true if terminating or terminated 2052 */ 2053 private boolean tryTerminate(boolean now, boolean enable) { 2054 int md; // 3 phases: try to set SHUTDOWN, then STOP, then TERMINATED 2055 2056 while (((md = mode) & SHUTDOWN) == 0) { 2057 if (!enable || this == common) // cannot shutdown 2058 return false; 2059 else 2060 MODE.compareAndSet(this, md, md | SHUTDOWN); 2061 } 2062 2063 while (((md = mode) & STOP) == 0) { // try to initiate termination 2064 if (!now) { // check if quiescent & empty 2065 for (long oldSum = 0L;;) { // repeat until stable 2066 boolean running = false; 2067 long checkSum = ctl; 2068 WorkQueue[] ws = workQueues; 2069 if ((md & SMASK) + (int)(checkSum >> RC_SHIFT) > 0) 2070 running = true; 2071 else if (ws != null) { 2072 WorkQueue w; 2073 for (int i = 0; i < ws.length; ++i) { 2074 if ((w = ws[i]) != null) { 2075 int s = w.source, p = w.phase; 2076 int d = w.id, b = w.base; 2077 if (b != w.top || 2078 ((d & 1) == 1 && (s >= 0 || p >= 0))) { 2079 running = true; 2080 break; // working, scanning, or have work 2081 } 2082 checkSum += (((long)s << 48) + ((long)p << 32) + 2083 ((long)b << 16) + (long)d); 2084 } 2085 } 2086 } 2087 if (((md = mode) & STOP) != 0) 2088 break; // already triggered 2089 else if (running) 2090 return false; 2091 else if (workQueues == ws && oldSum == (oldSum = checkSum)) 2092 break; 2093 } 2094 } 2095 if ((md & STOP) == 0) 2096 MODE.compareAndSet(this, md, md | STOP); 2097 } 2098 2099 while (((md = mode) & TERMINATED) == 0) { // help terminate others 2100 for (long oldSum = 0L;;) { // repeat until stable 2101 WorkQueue[] ws; WorkQueue w; 2102 long checkSum = ctl; 2103 if ((ws = workQueues) != null) { 2104 for (int i = 0; i < ws.length; ++i) { 2105 if ((w = ws[i]) != null) { 2106 ForkJoinWorkerThread wt = w.owner; 2107 w.cancelAll(); // clear queues 2108 if (wt != null) { 2109 try { // unblock join or park 2110 wt.interrupt(); 2111 } catch (Throwable ignore) { 2112 } 2113 } 2114 checkSum += ((long)w.phase << 32) + w.base; 2115 } 2116 } 2117 } 2118 if (((md = mode) & TERMINATED) != 0 || 2119 (workQueues == ws && oldSum == (oldSum = checkSum))) 2120 break; 2121 } 2122 if ((md & TERMINATED) != 0) 2123 break; 2124 else if ((md & SMASK) + (short)(ctl >>> TC_SHIFT) > 0) 2125 break; 2126 else if (MODE.compareAndSet(this, md, md | TERMINATED)) { 2127 synchronized (this) { 2128 notifyAll(); // for awaitTermination 2129 } 2130 break; 2131 } 2132 } 2133 return true; 2134 } 2135 2136 // Exported methods 2137 2138 // Constructors 2139 2140 /** 2141 * Creates a {@code ForkJoinPool} with parallelism equal to {@link 2142 * java.lang.Runtime#availableProcessors}, using defaults for all 2143 * other parameters (see {@link #ForkJoinPool(int, 2144 * ForkJoinWorkerThreadFactory, UncaughtExceptionHandler, boolean, 2145 * int, int, int, Predicate, long, TimeUnit)}). 2146 * 2147 * @throws SecurityException if a security manager exists and 2148 * the caller is not permitted to modify threads 2149 * because it does not hold {@link 2150 * java.lang.RuntimePermission}{@code ("modifyThread")} 2151 */ 2152 public ForkJoinPool() { 2153 this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()), 2154 defaultForkJoinWorkerThreadFactory, null, false, 2155 0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS); 2156 } 2157 2158 /** 2159 * Creates a {@code ForkJoinPool} with the indicated parallelism 2160 * level, using defaults for all other parameters (see {@link 2161 * #ForkJoinPool(int, ForkJoinWorkerThreadFactory, 2162 * UncaughtExceptionHandler, boolean, int, int, int, Predicate, 2163 * long, TimeUnit)}). 2164 * 2165 * @param parallelism the parallelism level 2166 * @throws IllegalArgumentException if parallelism less than or 2167 * equal to zero, or greater than implementation limit 2168 * @throws SecurityException if a security manager exists and 2169 * the caller is not permitted to modify threads 2170 * because it does not hold {@link 2171 * java.lang.RuntimePermission}{@code ("modifyThread")} 2172 */ 2173 public ForkJoinPool(int parallelism) { 2174 this(parallelism, defaultForkJoinWorkerThreadFactory, null, false, 2175 0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS); 2176 } 2177 2178 /** 2179 * Creates a {@code ForkJoinPool} with the given parameters (using 2180 * defaults for others -- see {@link #ForkJoinPool(int, 2181 * ForkJoinWorkerThreadFactory, UncaughtExceptionHandler, boolean, 2182 * int, int, int, Predicate, long, TimeUnit)}). 2183 * 2184 * @param parallelism the parallelism level. For default value, 2185 * use {@link java.lang.Runtime#availableProcessors}. 2186 * @param factory the factory for creating new threads. For default value, 2187 * use {@link #defaultForkJoinWorkerThreadFactory}. 2188 * @param handler the handler for internal worker threads that 2189 * terminate due to unrecoverable errors encountered while executing 2190 * tasks. For default value, use {@code null}. 2191 * @param asyncMode if true, 2192 * establishes local first-in-first-out scheduling mode for forked 2193 * tasks that are never joined. This mode may be more appropriate 2194 * than default locally stack-based mode in applications in which 2195 * worker threads only process event-style asynchronous tasks. 2196 * For default value, use {@code false}. 2197 * @throws IllegalArgumentException if parallelism less than or 2198 * equal to zero, or greater than implementation limit 2199 * @throws NullPointerException if the factory is null 2200 * @throws SecurityException if a security manager exists and 2201 * the caller is not permitted to modify threads 2202 * because it does not hold {@link 2203 * java.lang.RuntimePermission}{@code ("modifyThread")} 2204 */ 2205 public ForkJoinPool(int parallelism, 2206 ForkJoinWorkerThreadFactory factory, 2207 UncaughtExceptionHandler handler, 2208 boolean asyncMode) { 2209 this(parallelism, factory, handler, asyncMode, 2210 0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS); 2211 } 2212 2213 /** 2214 * Creates a {@code ForkJoinPool} with the given parameters. 2215 * 2216 * @param parallelism the parallelism level. For default value, 2217 * use {@link java.lang.Runtime#availableProcessors}. 2218 * 2219 * @param factory the factory for creating new threads. For 2220 * default value, use {@link #defaultForkJoinWorkerThreadFactory}. 2221 * 2222 * @param handler the handler for internal worker threads that 2223 * terminate due to unrecoverable errors encountered while 2224 * executing tasks. For default value, use {@code null}. 2225 * 2226 * @param asyncMode if true, establishes local first-in-first-out 2227 * scheduling mode for forked tasks that are never joined. This 2228 * mode may be more appropriate than default locally stack-based 2229 * mode in applications in which worker threads only process 2230 * event-style asynchronous tasks. For default value, use {@code 2231 * false}. 2232 * 2233 * @param corePoolSize the number of threads to keep in the pool 2234 * (unless timed out after an elapsed keep-alive). Normally (and 2235 * by default) this is the same value as the parallelism level, 2236 * but may be set to a larger value to reduce dynamic overhead if 2237 * tasks regularly block. Using a smaller value (for example 2238 * {@code 0}) has the same effect as the default. 2239 * 2240 * @param maximumPoolSize the maximum number of threads allowed. 2241 * When the maximum is reached, attempts to replace blocked 2242 * threads fail. (However, because creation and termination of 2243 * different threads may overlap, and may be managed by the given 2244 * thread factory, this value may be transiently exceeded.) To 2245 * arrange the same value as is used by default for the common 2246 * pool, use {@code 256} plus the {@code parallelism} level. (By 2247 * default, the common pool allows a maximum of 256 spare 2248 * threads.) Using a value (for example {@code 2249 * Integer.MAX_VALUE}) larger than the implementation's total 2250 * thread limit has the same effect as using this limit (which is 2251 * the default). 2252 * 2253 * @param minimumRunnable the minimum allowed number of core 2254 * threads not blocked by a join or {@link ManagedBlocker}. To 2255 * ensure progress, when too few unblocked threads exist and 2256 * unexecuted tasks may exist, new threads are constructed, up to 2257 * the given maximumPoolSize. For the default value, use {@code 2258 * 1}, that ensures liveness. A larger value might improve 2259 * throughput in the presence of blocked activities, but might 2260 * not, due to increased overhead. A value of zero may be 2261 * acceptable when submitted tasks cannot have dependencies 2262 * requiring additional threads. 2263 * 2264 * @param saturate if non-null, a predicate invoked upon attempts 2265 * to create more than the maximum total allowed threads. By 2266 * default, when a thread is about to block on a join or {@link 2267 * ManagedBlocker}, but cannot be replaced because the 2268 * maximumPoolSize would be exceeded, a {@link 2269 * RejectedExecutionException} is thrown. But if this predicate 2270 * returns {@code true}, then no exception is thrown, so the pool 2271 * continues to operate with fewer than the target number of 2272 * runnable threads, which might not ensure progress. 2273 * 2274 * @param keepAliveTime the elapsed time since last use before 2275 * a thread is terminated (and then later replaced if needed). 2276 * For the default value, use {@code 60, TimeUnit.SECONDS}. 2277 * 2278 * @param unit the time unit for the {@code keepAliveTime} argument 2279 * 2280 * @throws IllegalArgumentException if parallelism is less than or 2281 * equal to zero, or is greater than implementation limit, 2282 * or if maximumPoolSize is less than parallelism, 2283 * of if the keepAliveTime is less than or equal to zero. 2284 * @throws NullPointerException if the factory is null 2285 * @throws SecurityException if a security manager exists and 2286 * the caller is not permitted to modify threads 2287 * because it does not hold {@link 2288 * java.lang.RuntimePermission}{@code ("modifyThread")} 2289 * @since 9 2290 */ 2291 public ForkJoinPool(int parallelism, 2292 ForkJoinWorkerThreadFactory factory, 2293 UncaughtExceptionHandler handler, 2294 boolean asyncMode, 2295 int corePoolSize, 2296 int maximumPoolSize, 2297 int minimumRunnable, 2298 Predicate<? super ForkJoinPool> saturate, 2299 long keepAliveTime, 2300 TimeUnit unit) { 2301 // check, encode, pack parameters 2302 if (parallelism <= 0 || parallelism > MAX_CAP || 2303 maximumPoolSize < parallelism || keepAliveTime <= 0L) 2304 throw new IllegalArgumentException(); 2305 if (factory == null) 2306 throw new NullPointerException(); 2307 long ms = Math.max(unit.toMillis(keepAliveTime), TIMEOUT_SLOP); 2308 2309 int corep = Math.min(Math.max(corePoolSize, parallelism), MAX_CAP); 2310 long c = ((((long)(-corep) << TC_SHIFT) & TC_MASK) | 2311 (((long)(-parallelism) << RC_SHIFT) & RC_MASK)); 2312 int m = parallelism | (asyncMode ? FIFO : 0); 2313 int maxSpares = Math.min(maximumPoolSize, MAX_CAP) - parallelism; 2314 int minAvail = Math.min(Math.max(minimumRunnable, 0), MAX_CAP); 2315 int b = ((minAvail - parallelism) & SMASK) | (maxSpares << SWIDTH); 2316 int n = (parallelism > 1) ? parallelism - 1 : 1; // at least 2 slots 2317 n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; 2318 n = (n + 1) << 1; // power of two, including space for submission queues 2319 2320 this.workerNamePrefix = "ForkJoinPool-" + nextPoolId() + "-worker-"; 2321 this.workQueues = new WorkQueue[n]; 2322 this.factory = factory; 2323 this.ueh = handler; 2324 this.saturate = saturate; 2325 this.keepAlive = ms; 2326 this.bounds = b; 2327 this.mode = m; 2328 this.ctl = c; 2329 checkPermission(); 2330 } 2331 2332 private static Object newInstanceFromSystemProperty(String property) 2333 throws ReflectiveOperationException { 2334 String className = System.getProperty(property); 2335 return (className == null) 2336 ? null 2337 : ClassLoader.getSystemClassLoader().loadClass(className) 2338 .getConstructor().newInstance(); 2339 } 2340 2341 /** 2342 * Constructor for common pool using parameters possibly 2343 * overridden by system properties 2344 */ 2345 private ForkJoinPool(byte forCommonPoolOnly) { 2346 int parallelism = -1; 2347 ForkJoinWorkerThreadFactory fac = null; 2348 UncaughtExceptionHandler handler = null; 2349 try { // ignore exceptions in accessing/parsing properties 2350 String pp = System.getProperty 2351 ("java.util.concurrent.ForkJoinPool.common.parallelism"); 2352 if (pp != null) 2353 parallelism = Integer.parseInt(pp); 2354 fac = (ForkJoinWorkerThreadFactory) newInstanceFromSystemProperty( 2355 "java.util.concurrent.ForkJoinPool.common.threadFactory"); 2356 handler = (UncaughtExceptionHandler) newInstanceFromSystemProperty( 2357 "java.util.concurrent.ForkJoinPool.common.exceptionHandler"); 2358 } catch (Exception ignore) { 2359 } 2360 2361 if (fac == null) { 2362 if (System.getSecurityManager() == null) 2363 fac = defaultForkJoinWorkerThreadFactory; 2364 else // use security-managed default 2365 fac = new InnocuousForkJoinWorkerThreadFactory(); 2366 } 2367 if (parallelism < 0 && // default 1 less than #cores 2368 (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0) 2369 parallelism = 1; 2370 if (parallelism > MAX_CAP) 2371 parallelism = MAX_CAP; 2372 2373 long c = ((((long)(-parallelism) << TC_SHIFT) & TC_MASK) | 2374 (((long)(-parallelism) << RC_SHIFT) & RC_MASK)); 2375 int b = ((1 - parallelism) & SMASK) | (COMMON_MAX_SPARES << SWIDTH); 2376 int n = (parallelism > 1) ? parallelism - 1 : 1; 2377 n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; 2378 n = (n + 1) << 1; 2379 2380 this.workerNamePrefix = "ForkJoinPool.commonPool-worker-"; 2381 this.workQueues = new WorkQueue[n]; 2382 this.factory = fac; 2383 this.ueh = handler; 2384 this.saturate = null; 2385 this.keepAlive = DEFAULT_KEEPALIVE; 2386 this.bounds = b; 2387 this.mode = parallelism; 2388 this.ctl = c; 2389 } 2390 2391 /** 2392 * Returns the common pool instance. This pool is statically 2393 * constructed; its run state is unaffected by attempts to {@link 2394 * #shutdown} or {@link #shutdownNow}. However this pool and any 2395 * ongoing processing are automatically terminated upon program 2396 * {@link System#exit}. Any program that relies on asynchronous 2397 * task processing to complete before program termination should 2398 * invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence}, 2399 * before exit. 2400 * 2401 * @return the common pool instance 2402 * @since 1.8 2403 */ 2404 public static ForkJoinPool commonPool() { 2405 // assert common != null : "static init error"; 2406 return common; 2407 } 2408 2409 // Execution methods 2410 2411 /** 2412 * Performs the given task, returning its result upon completion. 2413 * If the computation encounters an unchecked Exception or Error, 2414 * it is rethrown as the outcome of this invocation. Rethrown 2415 * exceptions behave in the same way as regular exceptions, but, 2416 * when possible, contain stack traces (as displayed for example 2417 * using {@code ex.printStackTrace()}) of both the current thread 2418 * as well as the thread actually encountering the exception; 2419 * minimally only the latter. 2420 * 2421 * @param task the task 2422 * @param <T> the type of the task's result 2423 * @return the task's result 2424 * @throws NullPointerException if the task is null 2425 * @throws RejectedExecutionException if the task cannot be 2426 * scheduled for execution 2427 */ 2428 public <T> T invoke(ForkJoinTask<T> task) { 2429 if (task == null) 2430 throw new NullPointerException(); 2431 externalSubmit(task); 2432 return task.join(); 2433 } 2434 2435 /** 2436 * Arranges for (asynchronous) execution of the given task. 2437 * 2438 * @param task the task 2439 * @throws NullPointerException if the task is null 2440 * @throws RejectedExecutionException if the task cannot be 2441 * scheduled for execution 2442 */ 2443 public void execute(ForkJoinTask<?> task) { 2444 externalSubmit(task); 2445 } 2446 2447 // AbstractExecutorService methods 2448 2449 /** 2450 * @throws NullPointerException if the task is null 2451 * @throws RejectedExecutionException if the task cannot be 2452 * scheduled for execution 2453 */ 2454 public void execute(Runnable task) { 2455 if (task == null) 2456 throw new NullPointerException(); 2457 ForkJoinTask<?> job; 2458 if (task instanceof ForkJoinTask<?>) // avoid re-wrap 2459 job = (ForkJoinTask<?>) task; 2460 else 2461 job = new ForkJoinTask.RunnableExecuteAction(task); 2462 externalSubmit(job); 2463 } 2464 2465 /** 2466 * Submits a ForkJoinTask for execution. 2467 * 2468 * @param task the task to submit 2469 * @param <T> the type of the task's result 2470 * @return the task 2471 * @throws NullPointerException if the task is null 2472 * @throws RejectedExecutionException if the task cannot be 2473 * scheduled for execution 2474 */ 2475 public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) { 2476 return externalSubmit(task); 2477 } 2478 2479 /** 2480 * @throws NullPointerException if the task is null 2481 * @throws RejectedExecutionException if the task cannot be 2482 * scheduled for execution 2483 */ 2484 public <T> ForkJoinTask<T> submit(Callable<T> task) { 2485 return externalSubmit(new ForkJoinTask.AdaptedCallable<T>(task)); 2486 } 2487 2488 /** 2489 * @throws NullPointerException if the task is null 2490 * @throws RejectedExecutionException if the task cannot be 2491 * scheduled for execution 2492 */ 2493 public <T> ForkJoinTask<T> submit(Runnable task, T result) { 2494 return externalSubmit(new ForkJoinTask.AdaptedRunnable<T>(task, result)); 2495 } 2496 2497 /** 2498 * @throws NullPointerException if the task is null 2499 * @throws RejectedExecutionException if the task cannot be 2500 * scheduled for execution 2501 */ 2502 @SuppressWarnings("unchecked") 2503 public ForkJoinTask<?> submit(Runnable task) { 2504 if (task == null) 2505 throw new NullPointerException(); 2506 return externalSubmit((task instanceof ForkJoinTask<?>) 2507 ? (ForkJoinTask<Void>) task // avoid re-wrap 2508 : new ForkJoinTask.AdaptedRunnableAction(task)); 2509 } 2510 2511 /** 2512 * @throws NullPointerException {@inheritDoc} 2513 * @throws RejectedExecutionException {@inheritDoc} 2514 */ 2515 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) { 2516 // In previous versions of this class, this method constructed 2517 // a task to run ForkJoinTask.invokeAll, but now external 2518 // invocation of multiple tasks is at least as efficient. 2519 ArrayList<Future<T>> futures = new ArrayList<>(tasks.size()); 2520 2521 try { 2522 for (Callable<T> t : tasks) { 2523 ForkJoinTask<T> f = new ForkJoinTask.AdaptedCallable<T>(t); 2524 futures.add(f); 2525 externalSubmit(f); 2526 } 2527 for (int i = 0, size = futures.size(); i < size; i++) 2528 ((ForkJoinTask<?>)futures.get(i)).quietlyJoin(); 2529 return futures; 2530 } catch (Throwable t) { 2531 for (int i = 0, size = futures.size(); i < size; i++) 2532 futures.get(i).cancel(false); 2533 throw t; 2534 } 2535 } 2536 2537 /** 2538 * Returns the factory used for constructing new workers. 2539 * 2540 * @return the factory used for constructing new workers 2541 */ 2542 public ForkJoinWorkerThreadFactory getFactory() { 2543 return factory; 2544 } 2545 2546 /** 2547 * Returns the handler for internal worker threads that terminate 2548 * due to unrecoverable errors encountered while executing tasks. 2549 * 2550 * @return the handler, or {@code null} if none 2551 */ 2552 public UncaughtExceptionHandler getUncaughtExceptionHandler() { 2553 return ueh; 2554 } 2555 2556 /** 2557 * Returns the targeted parallelism level of this pool. 2558 * 2559 * @return the targeted parallelism level of this pool 2560 */ 2561 public int getParallelism() { 2562 int par = mode & SMASK; 2563 return (par > 0) ? par : 1; 2564 } 2565 2566 /** 2567 * Returns the targeted parallelism level of the common pool. 2568 * 2569 * @return the targeted parallelism level of the common pool 2570 * @since 1.8 2571 */ 2572 public static int getCommonPoolParallelism() { 2573 return COMMON_PARALLELISM; 2574 } 2575 2576 /** 2577 * Returns the number of worker threads that have started but not 2578 * yet terminated. The result returned by this method may differ 2579 * from {@link #getParallelism} when threads are created to 2580 * maintain parallelism when others are cooperatively blocked. 2581 * 2582 * @return the number of worker threads 2583 */ 2584 public int getPoolSize() { 2585 return ((mode & SMASK) + (short)(ctl >>> TC_SHIFT)); 2586 } 2587 2588 /** 2589 * Returns {@code true} if this pool uses local first-in-first-out 2590 * scheduling mode for forked tasks that are never joined. 2591 * 2592 * @return {@code true} if this pool uses async mode 2593 */ 2594 public boolean getAsyncMode() { 2595 return (mode & FIFO) != 0; 2596 } 2597 2598 /** 2599 * Returns an estimate of the number of worker threads that are 2600 * not blocked waiting to join tasks or for other managed 2601 * synchronization. This method may overestimate the 2602 * number of running threads. 2603 * 2604 * @return the number of worker threads 2605 */ 2606 public int getRunningThreadCount() { 2607 WorkQueue[] ws; WorkQueue w; 2608 VarHandle.acquireFence(); 2609 int rc = 0; 2610 if ((ws = workQueues) != null) { 2611 for (int i = 1; i < ws.length; i += 2) { 2612 if ((w = ws[i]) != null && w.isApparentlyUnblocked()) 2613 ++rc; 2614 } 2615 } 2616 return rc; 2617 } 2618 2619 /** 2620 * Returns an estimate of the number of threads that are currently 2621 * stealing or executing tasks. This method may overestimate the 2622 * number of active threads. 2623 * 2624 * @return the number of active threads 2625 */ 2626 public int getActiveThreadCount() { 2627 int r = (mode & SMASK) + (int)(ctl >> RC_SHIFT); 2628 return (r <= 0) ? 0 : r; // suppress momentarily negative values 2629 } 2630 2631 /** 2632 * Returns {@code true} if all worker threads are currently idle. 2633 * An idle worker is one that cannot obtain a task to execute 2634 * because none are available to steal from other threads, and 2635 * there are no pending submissions to the pool. This method is 2636 * conservative; it might not return {@code true} immediately upon 2637 * idleness of all threads, but will eventually become true if 2638 * threads remain inactive. 2639 * 2640 * @return {@code true} if all threads are currently idle 2641 */ 2642 public boolean isQuiescent() { 2643 for (;;) { 2644 long c = ctl; 2645 int md = mode, pc = md & SMASK; 2646 int tc = pc + (short)(c >>> TC_SHIFT); 2647 int rc = pc + (int)(c >> RC_SHIFT); 2648 if ((md & (STOP | TERMINATED)) != 0) 2649 return true; 2650 else if (rc > 0) 2651 return false; 2652 else { 2653 WorkQueue[] ws; WorkQueue v; 2654 if ((ws = workQueues) != null) { 2655 for (int i = 1; i < ws.length; i += 2) { 2656 if ((v = ws[i]) != null) { 2657 if (v.source > 0) 2658 return false; 2659 --tc; 2660 } 2661 } 2662 } 2663 if (tc == 0 && ctl == c) 2664 return true; 2665 } 2666 } 2667 } 2668 2669 /** 2670 * Returns an estimate of the total number of completed tasks that 2671 * were executed by a thread other than their submitter. The 2672 * reported value underestimates the actual total number of steals 2673 * when the pool is not quiescent. This value may be useful for 2674 * monitoring and tuning fork/join programs: in general, steal 2675 * counts should be high enough to keep threads busy, but low 2676 * enough to avoid overhead and contention across threads. 2677 * 2678 * @return the number of steals 2679 */ 2680 public long getStealCount() { 2681 long count = stealCount; 2682 WorkQueue[] ws; WorkQueue w; 2683 if ((ws = workQueues) != null) { 2684 for (int i = 1; i < ws.length; i += 2) { 2685 if ((w = ws[i]) != null) 2686 count += (long)w.nsteals & 0xffffffffL; 2687 } 2688 } 2689 return count; 2690 } 2691 2692 /** 2693 * Returns an estimate of the total number of tasks currently held 2694 * in queues by worker threads (but not including tasks submitted 2695 * to the pool that have not begun executing). This value is only 2696 * an approximation, obtained by iterating across all threads in 2697 * the pool. This method may be useful for tuning task 2698 * granularities. 2699 * 2700 * @return the number of queued tasks 2701 */ 2702 public long getQueuedTaskCount() { 2703 WorkQueue[] ws; WorkQueue w; 2704 VarHandle.acquireFence(); 2705 int count = 0; 2706 if ((ws = workQueues) != null) { 2707 for (int i = 1; i < ws.length; i += 2) { 2708 if ((w = ws[i]) != null) 2709 count += w.queueSize(); 2710 } 2711 } 2712 return count; 2713 } 2714 2715 /** 2716 * Returns an estimate of the number of tasks submitted to this 2717 * pool that have not yet begun executing. This method may take 2718 * time proportional to the number of submissions. 2719 * 2720 * @return the number of queued submissions 2721 */ 2722 public int getQueuedSubmissionCount() { 2723 WorkQueue[] ws; WorkQueue w; 2724 VarHandle.acquireFence(); 2725 int count = 0; 2726 if ((ws = workQueues) != null) { 2727 for (int i = 0; i < ws.length; i += 2) { 2728 if ((w = ws[i]) != null) 2729 count += w.queueSize(); 2730 } 2731 } 2732 return count; 2733 } 2734 2735 /** 2736 * Returns {@code true} if there are any tasks submitted to this 2737 * pool that have not yet begun executing. 2738 * 2739 * @return {@code true} if there are any queued submissions 2740 */ 2741 public boolean hasQueuedSubmissions() { 2742 WorkQueue[] ws; WorkQueue w; 2743 VarHandle.acquireFence(); 2744 if ((ws = workQueues) != null) { 2745 for (int i = 0; i < ws.length; i += 2) { 2746 if ((w = ws[i]) != null && !w.isEmpty()) 2747 return true; 2748 } 2749 } 2750 return false; 2751 } 2752 2753 /** 2754 * Removes and returns the next unexecuted submission if one is 2755 * available. This method may be useful in extensions to this 2756 * class that re-assign work in systems with multiple pools. 2757 * 2758 * @return the next submission, or {@code null} if none 2759 */ 2760 protected ForkJoinTask<?> pollSubmission() { 2761 return pollScan(true); 2762 } 2763 2764 /** 2765 * Removes all available unexecuted submitted and forked tasks 2766 * from scheduling queues and adds them to the given collection, 2767 * without altering their execution status. These may include 2768 * artificially generated or wrapped tasks. This method is 2769 * designed to be invoked only when the pool is known to be 2770 * quiescent. Invocations at other times may not remove all 2771 * tasks. A failure encountered while attempting to add elements 2772 * to collection {@code c} may result in elements being in 2773 * neither, either or both collections when the associated 2774 * exception is thrown. The behavior of this operation is 2775 * undefined if the specified collection is modified while the 2776 * operation is in progress. 2777 * 2778 * @param c the collection to transfer elements into 2779 * @return the number of elements transferred 2780 */ 2781 protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) { 2782 WorkQueue[] ws; WorkQueue w; ForkJoinTask<?> t; 2783 VarHandle.acquireFence(); 2784 int count = 0; 2785 if ((ws = workQueues) != null) { 2786 for (int i = 0; i < ws.length; ++i) { 2787 if ((w = ws[i]) != null) { 2788 while ((t = w.poll()) != null) { 2789 c.add(t); 2790 ++count; 2791 } 2792 } 2793 } 2794 } 2795 return count; 2796 } 2797 2798 /** 2799 * Returns a string identifying this pool, as well as its state, 2800 * including indications of run state, parallelism level, and 2801 * worker and task counts. 2802 * 2803 * @return a string identifying this pool, as well as its state 2804 */ 2805 public String toString() { 2806 // Use a single pass through workQueues to collect counts 2807 int md = mode; // read volatile fields first 2808 long c = ctl; 2809 long st = stealCount; 2810 long qt = 0L, qs = 0L; int rc = 0; 2811 WorkQueue[] ws; WorkQueue w; 2812 if ((ws = workQueues) != null) { 2813 for (int i = 0; i < ws.length; ++i) { 2814 if ((w = ws[i]) != null) { 2815 int size = w.queueSize(); 2816 if ((i & 1) == 0) 2817 qs += size; 2818 else { 2819 qt += size; 2820 st += (long)w.nsteals & 0xffffffffL; 2821 if (w.isApparentlyUnblocked()) 2822 ++rc; 2823 } 2824 } 2825 } 2826 } 2827 2828 int pc = (md & SMASK); 2829 int tc = pc + (short)(c >>> TC_SHIFT); 2830 int ac = pc + (int)(c >> RC_SHIFT); 2831 if (ac < 0) // ignore transient negative 2832 ac = 0; 2833 String level = ((md & TERMINATED) != 0 ? "Terminated" : 2834 (md & STOP) != 0 ? "Terminating" : 2835 (md & SHUTDOWN) != 0 ? "Shutting down" : 2836 "Running"); 2837 return super.toString() + 2838 "[" + level + 2839 ", parallelism = " + pc + 2840 ", size = " + tc + 2841 ", active = " + ac + 2842 ", running = " + rc + 2843 ", steals = " + st + 2844 ", tasks = " + qt + 2845 ", submissions = " + qs + 2846 "]"; 2847 } 2848 2849 /** 2850 * Possibly initiates an orderly shutdown in which previously 2851 * submitted tasks are executed, but no new tasks will be 2852 * accepted. Invocation has no effect on execution state if this 2853 * is the {@link #commonPool()}, and no additional effect if 2854 * already shut down. Tasks that are in the process of being 2855 * submitted concurrently during the course of this method may or 2856 * may not be rejected. 2857 * 2858 * @throws SecurityException if a security manager exists and 2859 * the caller is not permitted to modify threads 2860 * because it does not hold {@link 2861 * java.lang.RuntimePermission}{@code ("modifyThread")} 2862 */ 2863 public void shutdown() { 2864 checkPermission(); 2865 tryTerminate(false, true); 2866 } 2867 2868 /** 2869 * Possibly attempts to cancel and/or stop all tasks, and reject 2870 * all subsequently submitted tasks. Invocation has no effect on 2871 * execution state if this is the {@link #commonPool()}, and no 2872 * additional effect if already shut down. Otherwise, tasks that 2873 * are in the process of being submitted or executed concurrently 2874 * during the course of this method may or may not be 2875 * rejected. This method cancels both existing and unexecuted 2876 * tasks, in order to permit termination in the presence of task 2877 * dependencies. So the method always returns an empty list 2878 * (unlike the case for some other Executors). 2879 * 2880 * @return an empty list 2881 * @throws SecurityException if a security manager exists and 2882 * the caller is not permitted to modify threads 2883 * because it does not hold {@link 2884 * java.lang.RuntimePermission}{@code ("modifyThread")} 2885 */ 2886 public List<Runnable> shutdownNow() { 2887 checkPermission(); 2888 tryTerminate(true, true); 2889 return Collections.emptyList(); 2890 } 2891 2892 /** 2893 * Returns {@code true} if all tasks have completed following shut down. 2894 * 2895 * @return {@code true} if all tasks have completed following shut down 2896 */ 2897 public boolean isTerminated() { 2898 return (mode & TERMINATED) != 0; 2899 } 2900 2901 /** 2902 * Returns {@code true} if the process of termination has 2903 * commenced but not yet completed. This method may be useful for 2904 * debugging. A return of {@code true} reported a sufficient 2905 * period after shutdown may indicate that submitted tasks have 2906 * ignored or suppressed interruption, or are waiting for I/O, 2907 * causing this executor not to properly terminate. (See the 2908 * advisory notes for class {@link ForkJoinTask} stating that 2909 * tasks should not normally entail blocking operations. But if 2910 * they do, they must abort them on interrupt.) 2911 * 2912 * @return {@code true} if terminating but not yet terminated 2913 */ 2914 public boolean isTerminating() { 2915 int md = mode; 2916 return (md & STOP) != 0 && (md & TERMINATED) == 0; 2917 } 2918 2919 /** 2920 * Returns {@code true} if this pool has been shut down. 2921 * 2922 * @return {@code true} if this pool has been shut down 2923 */ 2924 public boolean isShutdown() { 2925 return (mode & SHUTDOWN) != 0; 2926 } 2927 2928 /** 2929 * Blocks until all tasks have completed execution after a 2930 * shutdown request, or the timeout occurs, or the current thread 2931 * is interrupted, whichever happens first. Because the {@link 2932 * #commonPool()} never terminates until program shutdown, when 2933 * applied to the common pool, this method is equivalent to {@link 2934 * #awaitQuiescence(long, TimeUnit)} but always returns {@code false}. 2935 * 2936 * @param timeout the maximum time to wait 2937 * @param unit the time unit of the timeout argument 2938 * @return {@code true} if this executor terminated and 2939 * {@code false} if the timeout elapsed before termination 2940 * @throws InterruptedException if interrupted while waiting 2941 */ 2942 public boolean awaitTermination(long timeout, TimeUnit unit) 2943 throws InterruptedException { 2944 if (Thread.interrupted()) 2945 throw new InterruptedException(); 2946 if (this == common) { 2947 awaitQuiescence(timeout, unit); 2948 return false; 2949 } 2950 long nanos = unit.toNanos(timeout); 2951 if (isTerminated()) 2952 return true; 2953 if (nanos <= 0L) 2954 return false; 2955 long deadline = System.nanoTime() + nanos; 2956 synchronized (this) { 2957 for (;;) { 2958 if (isTerminated()) 2959 return true; 2960 if (nanos <= 0L) 2961 return false; 2962 long millis = TimeUnit.NANOSECONDS.toMillis(nanos); 2963 wait(millis > 0L ? millis : 1L); 2964 nanos = deadline - System.nanoTime(); 2965 } 2966 } 2967 } 2968 2969 /** 2970 * If called by a ForkJoinTask operating in this pool, equivalent 2971 * in effect to {@link ForkJoinTask#helpQuiesce}. Otherwise, 2972 * waits and/or attempts to assist performing tasks until this 2973 * pool {@link #isQuiescent} or the indicated timeout elapses. 2974 * 2975 * @param timeout the maximum time to wait 2976 * @param unit the time unit of the timeout argument 2977 * @return {@code true} if quiescent; {@code false} if the 2978 * timeout elapsed. 2979 */ 2980 public boolean awaitQuiescence(long timeout, TimeUnit unit) { 2981 long nanos = unit.toNanos(timeout); 2982 ForkJoinWorkerThread wt; 2983 Thread thread = Thread.currentThread(); 2984 if ((thread instanceof ForkJoinWorkerThread) && 2985 (wt = (ForkJoinWorkerThread)thread).pool == this) { 2986 helpQuiescePool(wt.workQueue); 2987 return true; 2988 } 2989 else { 2990 for (long startTime = System.nanoTime();;) { 2991 ForkJoinTask<?> t; 2992 if ((t = pollScan(false)) != null) 2993 t.doExec(); 2994 else if (isQuiescent()) 2995 return true; 2996 else if ((System.nanoTime() - startTime) > nanos) 2997 return false; 2998 else 2999 Thread.yield(); // cannot block 3000 } 3001 } 3002 } 3003 3004 /** 3005 * Waits and/or attempts to assist performing tasks indefinitely 3006 * until the {@link #commonPool()} {@link #isQuiescent}. 3007 */ 3008 static void quiesceCommonPool() { 3009 common.awaitQuiescence(Long.MAX_VALUE, TimeUnit.NANOSECONDS); 3010 } 3011 3012 /** 3013 * Interface for extending managed parallelism for tasks running 3014 * in {@link ForkJoinPool}s. 3015 * 3016 * <p>A {@code ManagedBlocker} provides two methods. Method 3017 * {@link #isReleasable} must return {@code true} if blocking is 3018 * not necessary. Method {@link #block} blocks the current thread 3019 * if necessary (perhaps internally invoking {@code isReleasable} 3020 * before actually blocking). These actions are performed by any 3021 * thread invoking {@link ForkJoinPool#managedBlock(ManagedBlocker)}. 3022 * The unusual methods in this API accommodate synchronizers that 3023 * may, but don't usually, block for long periods. Similarly, they 3024 * allow more efficient internal handling of cases in which 3025 * additional workers may be, but usually are not, needed to 3026 * ensure sufficient parallelism. Toward this end, 3027 * implementations of method {@code isReleasable} must be amenable 3028 * to repeated invocation. 3029 * 3030 * <p>For example, here is a ManagedBlocker based on a 3031 * ReentrantLock: 3032 * <pre> {@code 3033 * class ManagedLocker implements ManagedBlocker { 3034 * final ReentrantLock lock; 3035 * boolean hasLock = false; 3036 * ManagedLocker(ReentrantLock lock) { this.lock = lock; } 3037 * public boolean block() { 3038 * if (!hasLock) 3039 * lock.lock(); 3040 * return true; 3041 * } 3042 * public boolean isReleasable() { 3043 * return hasLock || (hasLock = lock.tryLock()); 3044 * } 3045 * }}</pre> 3046 * 3047 * <p>Here is a class that possibly blocks waiting for an 3048 * item on a given queue: 3049 * <pre> {@code 3050 * class QueueTaker<E> implements ManagedBlocker { 3051 * final BlockingQueue<E> queue; 3052 * volatile E item = null; 3053 * QueueTaker(BlockingQueue<E> q) { this.queue = q; } 3054 * public boolean block() throws InterruptedException { 3055 * if (item == null) 3056 * item = queue.take(); 3057 * return true; 3058 * } 3059 * public boolean isReleasable() { 3060 * return item != null || (item = queue.poll()) != null; 3061 * } 3062 * public E getItem() { // call after pool.managedBlock completes 3063 * return item; 3064 * } 3065 * }}</pre> 3066 */ 3067 public static interface ManagedBlocker { 3068 /** 3069 * Possibly blocks the current thread, for example waiting for 3070 * a lock or condition. 3071 * 3072 * @return {@code true} if no additional blocking is necessary 3073 * (i.e., if isReleasable would return true) 3074 * @throws InterruptedException if interrupted while waiting 3075 * (the method is not required to do so, but is allowed to) 3076 */ 3077 boolean block() throws InterruptedException; 3078 3079 /** 3080 * Returns {@code true} if blocking is unnecessary. 3081 * @return {@code true} if blocking is unnecessary 3082 */ 3083 boolean isReleasable(); 3084 } 3085 3086 /** 3087 * Runs the given possibly blocking task. When {@linkplain 3088 * ForkJoinTask#inForkJoinPool() running in a ForkJoinPool}, this 3089 * method possibly arranges for a spare thread to be activated if 3090 * necessary to ensure sufficient parallelism while the current 3091 * thread is blocked in {@link ManagedBlocker#block blocker.block()}. 3092 * 3093 * <p>This method repeatedly calls {@code blocker.isReleasable()} and 3094 * {@code blocker.block()} until either method returns {@code true}. 3095 * Every call to {@code blocker.block()} is preceded by a call to 3096 * {@code blocker.isReleasable()} that returned {@code false}. 3097 * 3098 * <p>If not running in a ForkJoinPool, this method is 3099 * behaviorally equivalent to 3100 * <pre> {@code 3101 * while (!blocker.isReleasable()) 3102 * if (blocker.block()) 3103 * break;}</pre> 3104 * 3105 * If running in a ForkJoinPool, the pool may first be expanded to 3106 * ensure sufficient parallelism available during the call to 3107 * {@code blocker.block()}. 3108 * 3109 * @param blocker the blocker task 3110 * @throws InterruptedException if {@code blocker.block()} did so 3111 */ 3112 public static void managedBlock(ManagedBlocker blocker) 3113 throws InterruptedException { 3114 if (blocker == null) throw new NullPointerException(); 3115 ForkJoinPool p; 3116 ForkJoinWorkerThread wt; 3117 WorkQueue w; 3118 Thread t = Thread.currentThread(); 3119 if ((t instanceof ForkJoinWorkerThread) && 3120 (p = (wt = (ForkJoinWorkerThread)t).pool) != null && 3121 (w = wt.workQueue) != null) { 3122 int block; 3123 while (!blocker.isReleasable()) { 3124 if ((block = p.tryCompensate(w)) != 0) { 3125 try { 3126 do {} while (!blocker.isReleasable() && 3127 !blocker.block()); 3128 } finally { 3129 CTL.getAndAdd(p, (block > 0) ? RC_UNIT : 0L); 3130 } 3131 break; 3132 } 3133 } 3134 } 3135 else { 3136 do {} while (!blocker.isReleasable() && 3137 !blocker.block()); 3138 } 3139 } 3140 3141 /** 3142 * If the given executor is a ForkJoinPool, poll and execute 3143 * AsynchronousCompletionTasks from worker's queue until none are 3144 * available or blocker is released. 3145 */ 3146 static void helpAsyncBlocker(Executor e, ManagedBlocker blocker) { 3147 if (e instanceof ForkJoinPool) { 3148 WorkQueue w; ForkJoinWorkerThread wt; WorkQueue[] ws; int r, n; 3149 ForkJoinPool p = (ForkJoinPool)e; 3150 Thread thread = Thread.currentThread(); 3151 if (thread instanceof ForkJoinWorkerThread && 3152 (wt = (ForkJoinWorkerThread)thread).pool == p) 3153 w = wt.workQueue; 3154 else if ((r = ThreadLocalRandom.getProbe()) != 0 && 3155 (ws = p.workQueues) != null && (n = ws.length) > 0) 3156 w = ws[(n - 1) & r & SQMASK]; 3157 else 3158 w = null; 3159 if (w != null) 3160 w.helpAsyncBlocker(blocker); 3161 } 3162 } 3163 3164 // AbstractExecutorService overrides. These rely on undocumented 3165 // fact that ForkJoinTask.adapt returns ForkJoinTasks that also 3166 // implement RunnableFuture. 3167 3168 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { 3169 return new ForkJoinTask.AdaptedRunnable<T>(runnable, value); 3170 } 3171 3172 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { 3173 return new ForkJoinTask.AdaptedCallable<T>(callable); 3174 } 3175 3176 // VarHandle mechanics 3177 private static final VarHandle CTL; 3178 private static final VarHandle MODE; 3179 static final VarHandle QA; 3180 3181 static { 3182 try { 3183 MethodHandles.Lookup l = MethodHandles.lookup(); 3184 CTL = l.findVarHandle(ForkJoinPool.class, "ctl", long.class); 3185 MODE = l.findVarHandle(ForkJoinPool.class, "mode", int.class); 3186 QA = MethodHandles.arrayElementVarHandle(ForkJoinTask[].class); 3187 } catch (ReflectiveOperationException e) { 3188 throw new ExceptionInInitializerError(e); 3189 } 3190 3191 // Reduce the risk of rare disastrous classloading in first call to 3192 // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773 3193 Class<?> ensureLoaded = LockSupport.class; 3194 3195 int commonMaxSpares = DEFAULT_COMMON_MAX_SPARES; 3196 try { 3197 String p = System.getProperty 3198 ("java.util.concurrent.ForkJoinPool.common.maximumSpares"); 3199 if (p != null) 3200 commonMaxSpares = Integer.parseInt(p); 3201 } catch (Exception ignore) {} 3202 COMMON_MAX_SPARES = commonMaxSpares; 3203 3204 defaultForkJoinWorkerThreadFactory = 3205 new DefaultForkJoinWorkerThreadFactory(); 3206 modifyThreadPermission = new RuntimePermission("modifyThread"); 3207 3208 common = AccessController.doPrivileged(new PrivilegedAction<>() { 3209 public ForkJoinPool run() { 3210 return new ForkJoinPool((byte)0); }}); 3211 3212 COMMON_PARALLELISM = Math.max(common.mode & SMASK, 1); 3213 } 3214 3215 /** 3216 * Factory for innocuous worker threads. 3217 */ 3218 private static final class InnocuousForkJoinWorkerThreadFactory 3219 implements ForkJoinWorkerThreadFactory { 3220 3221 /** 3222 * An ACC to restrict permissions for the factory itself. 3223 * The constructed workers have no permissions set. 3224 */ 3225 private static final AccessControlContext ACC = contextWithPermissions( 3226 modifyThreadPermission, 3227 new RuntimePermission("enableContextClassLoaderOverride"), 3228 new RuntimePermission("modifyThreadGroup"), 3229 new RuntimePermission("getClassLoader"), 3230 new RuntimePermission("setContextClassLoader")); 3231 3232 public final ForkJoinWorkerThread newThread(ForkJoinPool pool) { 3233 return AccessController.doPrivileged( 3234 new PrivilegedAction<>() { 3235 public ForkJoinWorkerThread run() { 3236 return new ForkJoinWorkerThread. 3237 InnocuousForkJoinWorkerThread(pool); }}, 3238 ACC); 3239 } 3240 } 3241 }