1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent; 37 38 import java.lang.Thread.UncaughtExceptionHandler; 39 import java.util.ArrayList; 40 import java.util.Arrays; 41 import java.util.Collection; 42 import java.util.Collections; 43 import java.util.List; 44 import java.util.concurrent.AbstractExecutorService; 45 import java.util.concurrent.Callable; 46 import java.util.concurrent.ExecutorService; 47 import java.util.concurrent.Future; 48 import java.util.concurrent.RejectedExecutionException; 49 import java.util.concurrent.RunnableFuture; 50 import java.util.concurrent.ThreadLocalRandom; 51 import java.util.concurrent.TimeUnit; 52 53 /** 54 * An {@link ExecutorService} for running {@link ForkJoinTask}s. 55 * A {@code ForkJoinPool} provides the entry point for submissions 56 * from non-{@code ForkJoinTask} clients, as well as management and 57 * monitoring operations. 58 * 59 * <p>A {@code ForkJoinPool} differs from other kinds of {@link 60 * ExecutorService} mainly by virtue of employing 61 * <em>work-stealing</em>: all threads in the pool attempt to find and 62 * execute tasks submitted to the pool and/or created by other active 63 * tasks (eventually blocking waiting for work if none exist). This 64 * enables efficient processing when most tasks spawn other subtasks 65 * (as do most {@code ForkJoinTask}s), as well as when many small 66 * tasks are submitted to the pool from external clients. Especially 67 * when setting <em>asyncMode</em> to true in constructors, {@code 68 * ForkJoinPool}s may also be appropriate for use with event-style 69 * tasks that are never joined. 70 * 71 * <p>A static {@link #commonPool()} is available and appropriate for 72 * most applications. The common pool is used by any ForkJoinTask that 73 * is not explicitly submitted to a specified pool. Using the common 74 * pool normally reduces resource usage (its threads are slowly 75 * reclaimed during periods of non-use, and reinstated upon subsequent 76 * use). 77 * 78 * <p>For applications that require separate or custom pools, a {@code 79 * ForkJoinPool} may be constructed with a given target parallelism 80 * level; by default, equal to the number of available processors. The 81 * pool attempts to maintain enough active (or available) threads by 82 * dynamically adding, suspending, or resuming internal worker 83 * threads, even if some tasks are stalled waiting to join others. 84 * However, no such adjustments are guaranteed in the face of blocked 85 * I/O or other unmanaged synchronization. The nested {@link 86 * ManagedBlocker} interface enables extension of the kinds of 87 * synchronization accommodated. 88 * 89 * <p>In addition to execution and lifecycle control methods, this 90 * class provides status check methods (for example 91 * {@link #getStealCount}) that are intended to aid in developing, 92 * tuning, and monitoring fork/join applications. Also, method 93 * {@link #toString} returns indications of pool state in a 94 * convenient form for informal monitoring. 95 * 96 * <p>As is the case with other ExecutorServices, there are three 97 * main task execution methods summarized in the following table. 98 * These are designed to be used primarily by clients not already 99 * engaged in fork/join computations in the current pool. The main 100 * forms of these methods accept instances of {@code ForkJoinTask}, 101 * but overloaded forms also allow mixed execution of plain {@code 102 * Runnable}- or {@code Callable}- based activities as well. However, 103 * tasks that are already executing in a pool should normally instead 104 * use the within-computation forms listed in the table unless using 105 * async event-style tasks that are not usually joined, in which case 106 * there is little difference among choice of methods. 107 * 108 * <table BORDER CELLPADDING=3 CELLSPACING=1> 109 * <caption>Summary of task execution methods</caption> 110 * <tr> 111 * <td></td> 112 * <td ALIGN=CENTER> <b>Call from non-fork/join clients</b></td> 113 * <td ALIGN=CENTER> <b>Call from within fork/join computations</b></td> 114 * </tr> 115 * <tr> 116 * <td> <b>Arrange async execution</b></td> 117 * <td> {@link #execute(ForkJoinTask)}</td> 118 * <td> {@link ForkJoinTask#fork}</td> 119 * </tr> 120 * <tr> 121 * <td> <b>Await and obtain result</b></td> 122 * <td> {@link #invoke(ForkJoinTask)}</td> 123 * <td> {@link ForkJoinTask#invoke}</td> 124 * </tr> 125 * <tr> 126 * <td> <b>Arrange exec and obtain Future</b></td> 127 * <td> {@link #submit(ForkJoinTask)}</td> 128 * <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td> 129 * </tr> 130 * </table> 131 * 132 * <p>The common pool is by default constructed with default 133 * parameters, but these may be controlled by setting three 134 * {@linkplain System#getProperty system properties}: 135 * <ul> 136 * <li>{@code java.util.concurrent.ForkJoinPool.common.parallelism} 137 * - the parallelism level, a non-negative integer 138 * <li>{@code java.util.concurrent.ForkJoinPool.common.threadFactory} 139 * - the class name of a {@link ForkJoinWorkerThreadFactory} 140 * <li>{@code java.util.concurrent.ForkJoinPool.common.exceptionHandler} 141 * - the class name of a {@link UncaughtExceptionHandler} 142 * </ul> 143 * The system class loader is used to load these classes. 144 * Upon any error in establishing these settings, default parameters 145 * are used. It is possible to disable or limit the use of threads in 146 * the common pool by setting the parallelism property to zero, and/or 147 * using a factory that may return {@code null}. However doing so may 148 * cause unjoined tasks to never be executed. 149 * 150 * <p><b>Implementation notes</b>: This implementation restricts the 151 * maximum number of running threads to 32767. Attempts to create 152 * pools with greater than the maximum number result in 153 * {@code IllegalArgumentException}. 154 * 155 * <p>This implementation rejects submitted tasks (that is, by throwing 156 * {@link RejectedExecutionException}) only when the pool is shut down 157 * or internal resources have been exhausted. 158 * 159 * @since 1.7 160 * @author Doug Lea 161 */ 162 @sun.misc.Contended 163 public class ForkJoinPool extends AbstractExecutorService { 164 165 /* 166 * Implementation Overview 167 * 168 * This class and its nested classes provide the main 169 * functionality and control for a set of worker threads: 170 * Submissions from non-FJ threads enter into submission queues. 171 * Workers take these tasks and typically split them into subtasks 172 * that may be stolen by other workers. Preference rules give 173 * first priority to processing tasks from their own queues (LIFO 174 * or FIFO, depending on mode), then to randomized FIFO steals of 175 * tasks in other queues. 176 * 177 * WorkQueues 178 * ========== 179 * 180 * Most operations occur within work-stealing queues (in nested 181 * class WorkQueue). These are special forms of Deques that 182 * support only three of the four possible end-operations -- push, 183 * pop, and poll (aka steal), under the further constraints that 184 * push and pop are called only from the owning thread (or, as 185 * extended here, under a lock), while poll may be called from 186 * other threads. (If you are unfamiliar with them, you probably 187 * want to read Herlihy and Shavit's book "The Art of 188 * Multiprocessor programming", chapter 16 describing these in 189 * more detail before proceeding.) The main work-stealing queue 190 * design is roughly similar to those in the papers "Dynamic 191 * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005 192 * (http://labs.oracle.com/pls/apex/f?p=labs:10:0) and 193 * "Idempotent work stealing" by Michael, Saraswat, and Vechev, 194 * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186). 195 * See also "Correct and Efficient Work-Stealing for Weak Memory 196 * Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013 197 * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an 198 * analysis of memory ordering (atomic, volatile etc) issues. The 199 * main differences ultimately stem from GC requirements that we 200 * null out taken slots as soon as we can, to maintain as small a 201 * footprint as possible even in programs generating huge numbers 202 * of tasks. To accomplish this, we shift the CAS arbitrating pop 203 * vs poll (steal) from being on the indices ("base" and "top") to 204 * the slots themselves. So, both a successful pop and poll 205 * mainly entail a CAS of a slot from non-null to null. Because 206 * we rely on CASes of references, we do not need tag bits on base 207 * or top. They are simple ints as used in any circular 208 * array-based queue (see for example ArrayDeque). Updates to the 209 * indices must still be ordered in a way that guarantees that top 210 * == base means the queue is empty, but otherwise may err on the 211 * side of possibly making the queue appear nonempty when a push, 212 * pop, or poll have not fully committed. Note that this means 213 * that the poll operation, considered individually, is not 214 * wait-free. One thief cannot successfully continue until another 215 * in-progress one (or, if previously empty, a push) completes. 216 * However, in the aggregate, we ensure at least probabilistic 217 * non-blockingness. If an attempted steal fails, a thief always 218 * chooses a different random victim target to try next. So, in 219 * order for one thief to progress, it suffices for any 220 * in-progress poll or new push on any empty queue to 221 * complete. (This is why we normally use method pollAt and its 222 * variants that try once at the apparent base index, else 223 * consider alternative actions, rather than method poll.) 224 * 225 * This approach also enables support of a user mode in which local 226 * task processing is in FIFO, not LIFO order, simply by using 227 * poll rather than pop. This can be useful in message-passing 228 * frameworks in which tasks are never joined. However neither 229 * mode considers affinities, loads, cache localities, etc, so 230 * rarely provide the best possible performance on a given 231 * machine, but portably provide good throughput by averaging over 232 * these factors. (Further, even if we did try to use such 233 * information, we do not usually have a basis for exploiting it. 234 * For example, some sets of tasks profit from cache affinities, 235 * but others are harmed by cache pollution effects.) 236 * 237 * WorkQueues are also used in a similar way for tasks submitted 238 * to the pool. We cannot mix these tasks in the same queues used 239 * for work-stealing (this would contaminate lifo/fifo 240 * processing). Instead, we randomly associate submission queues 241 * with submitting threads, using a form of hashing. The 242 * ThreadLocalRandom probe value serves as a hash code for 243 * choosing existing queues, and may be randomly repositioned upon 244 * contention with other submitters. In essence, submitters act 245 * like workers except that they are restricted to executing local 246 * tasks that they submitted (or in the case of CountedCompleters, 247 * others with the same root task). However, because most 248 * shared/external queue operations are more expensive than 249 * internal, and because, at steady state, external submitters 250 * will compete for CPU with workers, ForkJoinTask.join and 251 * related methods disable them from repeatedly helping to process 252 * tasks if all workers are active. Insertion of tasks in shared 253 * mode requires a lock (mainly to protect in the case of 254 * resizing) but we use only a simple spinlock (using bits in 255 * field qlock), because submitters encountering a busy queue move 256 * on to try or create other queues -- they block only when 257 * creating and registering new queues. 258 * 259 * Management 260 * ========== 261 * 262 * The main throughput advantages of work-stealing stem from 263 * decentralized control -- workers mostly take tasks from 264 * themselves or each other. We cannot negate this in the 265 * implementation of other management responsibilities. The main 266 * tactic for avoiding bottlenecks is packing nearly all 267 * essentially atomic control state into two volatile variables 268 * that are by far most often read (not written) as status and 269 * consistency checks. 270 * 271 * Field "ctl" contains 64 bits holding all the information needed 272 * to atomically decide to add, inactivate, enqueue (on an event 273 * queue), dequeue, and/or re-activate workers. To enable this 274 * packing, we restrict maximum parallelism to (1<<15)-1 (which is 275 * far in excess of normal operating range) to allow ids, counts, 276 * and their negations (used for thresholding) to fit into 16bit 277 * fields. 278 * 279 * Field "plock" is a form of sequence lock with a saturating 280 * shutdown bit (similarly for per-queue "qlocks"), mainly 281 * protecting updates to the workQueues array, as well as to 282 * enable shutdown. When used as a lock, it is normally only very 283 * briefly held, so is nearly always available after at most a 284 * brief spin, but we use a monitor-based backup strategy to 285 * block when needed. 286 * 287 * Recording WorkQueues. WorkQueues are recorded in the 288 * "workQueues" array that is created upon first use and expanded 289 * if necessary. Updates to the array while recording new workers 290 * and unrecording terminated ones are protected from each other 291 * by a lock but the array is otherwise concurrently readable, and 292 * accessed directly. To simplify index-based operations, the 293 * array size is always a power of two, and all readers must 294 * tolerate null slots. Worker queues are at odd indices. Shared 295 * (submission) queues are at even indices, up to a maximum of 64 296 * slots, to limit growth even if array needs to expand to add 297 * more workers. Grouping them together in this way simplifies and 298 * speeds up task scanning. 299 * 300 * All worker thread creation is on-demand, triggered by task 301 * submissions, replacement of terminated workers, and/or 302 * compensation for blocked workers. However, all other support 303 * code is set up to work with other policies. To ensure that we 304 * do not hold on to worker references that would prevent GC, ALL 305 * accesses to workQueues are via indices into the workQueues 306 * array (which is one source of some of the messy code 307 * constructions here). In essence, the workQueues array serves as 308 * a weak reference mechanism. Thus for example the wait queue 309 * field of ctl stores indices, not references. Access to the 310 * workQueues in associated methods (for example signalWork) must 311 * both index-check and null-check the IDs. All such accesses 312 * ignore bad IDs by returning out early from what they are doing, 313 * since this can only be associated with termination, in which 314 * case it is OK to give up. All uses of the workQueues array 315 * also check that it is non-null (even if previously 316 * non-null). This allows nulling during termination, which is 317 * currently not necessary, but remains an option for 318 * resource-revocation-based shutdown schemes. It also helps 319 * reduce JIT issuance of uncommon-trap code, which tends to 320 * unnecessarily complicate control flow in some methods. 321 * 322 * Event Queuing. Unlike HPC work-stealing frameworks, we cannot 323 * let workers spin indefinitely scanning for tasks when none can 324 * be found immediately, and we cannot start/resume workers unless 325 * there appear to be tasks available. On the other hand, we must 326 * quickly prod them into action when new tasks are submitted or 327 * generated. In many usages, ramp-up time to activate workers is 328 * the main limiting factor in overall performance (this is 329 * compounded at program start-up by JIT compilation and 330 * allocation). So we try to streamline this as much as possible. 331 * We park/unpark workers after placing in an event wait queue 332 * when they cannot find work. This "queue" is actually a simple 333 * Treiber stack, headed by the "id" field of ctl, plus a 15bit 334 * counter value (that reflects the number of times a worker has 335 * been inactivated) to avoid ABA effects (we need only as many 336 * version numbers as worker threads). Successors are held in 337 * field WorkQueue.nextWait. Queuing deals with several intrinsic 338 * races, mainly that a task-producing thread can miss seeing (and 339 * signalling) another thread that gave up looking for work but 340 * has not yet entered the wait queue. We solve this by requiring 341 * a full sweep of all workers (via repeated calls to method 342 * scan()) both before and after a newly waiting worker is added 343 * to the wait queue. Because enqueued workers may actually be 344 * rescanning rather than waiting, we set and clear the "parker" 345 * field of WorkQueues to reduce unnecessary calls to unpark. 346 * (This requires a secondary recheck to avoid missed signals.) 347 * Note the unusual conventions about Thread.interrupts 348 * surrounding parking and other blocking: Because interrupts are 349 * used solely to alert threads to check termination, which is 350 * checked anyway upon blocking, we clear status (using 351 * Thread.interrupted) before any call to park, so that park does 352 * not immediately return due to status being set via some other 353 * unrelated call to interrupt in user code. 354 * 355 * Signalling. We create or wake up workers only when there 356 * appears to be at least one task they might be able to find and 357 * execute. When a submission is added or another worker adds a 358 * task to a queue that has fewer than two tasks, they signal 359 * waiting workers (or trigger creation of new ones if fewer than 360 * the given parallelism level -- signalWork). These primary 361 * signals are buttressed by others whenever other threads remove 362 * a task from a queue and notice that there are other tasks there 363 * as well. So in general, pools will be over-signalled. On most 364 * platforms, signalling (unpark) overhead time is noticeably 365 * long, and the time between signalling a thread and it actually 366 * making progress can be very noticeably long, so it is worth 367 * offloading these delays from critical paths as much as 368 * possible. Additionally, workers spin-down gradually, by staying 369 * alive so long as they see the ctl state changing. Similar 370 * stability-sensing techniques are also used before blocking in 371 * awaitJoin and helpComplete. 372 * 373 * Trimming workers. To release resources after periods of lack of 374 * use, a worker starting to wait when the pool is quiescent will 375 * time out and terminate if the pool has remained quiescent for a 376 * given period -- a short period if there are more threads than 377 * parallelism, longer as the number of threads decreases. This 378 * will slowly propagate, eventually terminating all workers after 379 * periods of non-use. 380 * 381 * Shutdown and Termination. A call to shutdownNow atomically sets 382 * a plock bit and then (non-atomically) sets each worker's 383 * qlock status, cancels all unprocessed tasks, and wakes up 384 * all waiting workers. Detecting whether termination should 385 * commence after a non-abrupt shutdown() call requires more work 386 * and bookkeeping. We need consensus about quiescence (i.e., that 387 * there is no more work). The active count provides a primary 388 * indication but non-abrupt shutdown still requires a rechecking 389 * scan for any workers that are inactive but not queued. 390 * 391 * Joining Tasks 392 * ============= 393 * 394 * Any of several actions may be taken when one worker is waiting 395 * to join a task stolen (or always held) by another. Because we 396 * are multiplexing many tasks on to a pool of workers, we can't 397 * just let them block (as in Thread.join). We also cannot just 398 * reassign the joiner's run-time stack with another and replace 399 * it later, which would be a form of "continuation", that even if 400 * possible is not necessarily a good idea since we sometimes need 401 * both an unblocked task and its continuation to progress. 402 * Instead we combine two tactics: 403 * 404 * Helping: Arranging for the joiner to execute some task that it 405 * would be running if the steal had not occurred. 406 * 407 * Compensating: Unless there are already enough live threads, 408 * method tryCompensate() may create or re-activate a spare 409 * thread to compensate for blocked joiners until they unblock. 410 * 411 * A third form (implemented in tryRemoveAndExec) amounts to 412 * helping a hypothetical compensator: If we can readily tell that 413 * a possible action of a compensator is to steal and execute the 414 * task being joined, the joining thread can do so directly, 415 * without the need for a compensation thread (although at the 416 * expense of larger run-time stacks, but the tradeoff is 417 * typically worthwhile). 418 * 419 * The ManagedBlocker extension API can't use helping so relies 420 * only on compensation in method awaitBlocker. 421 * 422 * The algorithm in tryHelpStealer entails a form of "linear" 423 * helping: Each worker records (in field currentSteal) the most 424 * recent task it stole from some other worker. Plus, it records 425 * (in field currentJoin) the task it is currently actively 426 * joining. Method tryHelpStealer uses these markers to try to 427 * find a worker to help (i.e., steal back a task from and execute 428 * it) that could hasten completion of the actively joined task. 429 * In essence, the joiner executes a task that would be on its own 430 * local deque had the to-be-joined task not been stolen. This may 431 * be seen as a conservative variant of the approach in Wagner & 432 * Calder "Leapfrogging: a portable technique for implementing 433 * efficient futures" SIGPLAN Notices, 1993 434 * (http://portal.acm.org/citation.cfm?id=155354). It differs in 435 * that: (1) We only maintain dependency links across workers upon 436 * steals, rather than use per-task bookkeeping. This sometimes 437 * requires a linear scan of workQueues array to locate stealers, 438 * but often doesn't because stealers leave hints (that may become 439 * stale/wrong) of where to locate them. It is only a hint 440 * because a worker might have had multiple steals and the hint 441 * records only one of them (usually the most current). Hinting 442 * isolates cost to when it is needed, rather than adding to 443 * per-task overhead. (2) It is "shallow", ignoring nesting and 444 * potentially cyclic mutual steals. (3) It is intentionally 445 * racy: field currentJoin is updated only while actively joining, 446 * which means that we miss links in the chain during long-lived 447 * tasks, GC stalls etc (which is OK since blocking in such cases 448 * is usually a good idea). (4) We bound the number of attempts 449 * to find work (see MAX_HELP) and fall back to suspending the 450 * worker and if necessary replacing it with another. 451 * 452 * Helping actions for CountedCompleters are much simpler: Method 453 * helpComplete can take and execute any task with the same root 454 * as the task being waited on. However, this still entails some 455 * traversal of completer chains, so is less efficient than using 456 * CountedCompleters without explicit joins. 457 * 458 * It is impossible to keep exactly the target parallelism number 459 * of threads running at any given time. Determining the 460 * existence of conservatively safe helping targets, the 461 * availability of already-created spares, and the apparent need 462 * to create new spares are all racy, so we rely on multiple 463 * retries of each. Compensation in the apparent absence of 464 * helping opportunities is challenging to control on JVMs, where 465 * GC and other activities can stall progress of tasks that in 466 * turn stall out many other dependent tasks, without us being 467 * able to determine whether they will ever require compensation. 468 * Even though work-stealing otherwise encounters little 469 * degradation in the presence of more threads than cores, 470 * aggressively adding new threads in such cases entails risk of 471 * unwanted positive feedback control loops in which more threads 472 * cause more dependent stalls (as well as delayed progress of 473 * unblocked threads to the point that we know they are available) 474 * leading to more situations requiring more threads, and so 475 * on. This aspect of control can be seen as an (analytically 476 * intractable) game with an opponent that may choose the worst 477 * (for us) active thread to stall at any time. We take several 478 * precautions to bound losses (and thus bound gains), mainly in 479 * methods tryCompensate and awaitJoin. 480 * 481 * Common Pool 482 * =========== 483 * 484 * The static common pool always exists after static 485 * initialization. Since it (or any other created pool) need 486 * never be used, we minimize initial construction overhead and 487 * footprint to the setup of about a dozen fields, with no nested 488 * allocation. Most bootstrapping occurs within method 489 * fullExternalPush during the first submission to the pool. 490 * 491 * When external threads submit to the common pool, they can 492 * perform subtask processing (see externalHelpJoin and related 493 * methods). This caller-helps policy makes it sensible to set 494 * common pool parallelism level to one (or more) less than the 495 * total number of available cores, or even zero for pure 496 * caller-runs. We do not need to record whether external 497 * submissions are to the common pool -- if not, externalHelpJoin 498 * returns quickly (at the most helping to signal some common pool 499 * workers). These submitters would otherwise be blocked waiting 500 * for completion, so the extra effort (with liberally sprinkled 501 * task status checks) in inapplicable cases amounts to an odd 502 * form of limited spin-wait before blocking in ForkJoinTask.join. 503 * 504 * Style notes 505 * =========== 506 * 507 * There is a lot of representation-level coupling among classes 508 * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask. The 509 * fields of WorkQueue maintain data structures managed by 510 * ForkJoinPool, so are directly accessed. There is little point 511 * trying to reduce this, since any associated future changes in 512 * representations will need to be accompanied by algorithmic 513 * changes anyway. Several methods intrinsically sprawl because 514 * they must accumulate sets of consistent reads of volatiles held 515 * in local variables. Methods signalWork() and scan() are the 516 * main bottlenecks, so are especially heavily 517 * micro-optimized/mangled. There are lots of inline assignments 518 * (of form "while ((local = field) != 0)") which are usually the 519 * simplest way to ensure the required read orderings (which are 520 * sometimes critical). This leads to a "C"-like style of listing 521 * declarations of these locals at the heads of methods or blocks. 522 * There are several occurrences of the unusual "do {} while 523 * (!cas...)" which is the simplest way to force an update of a 524 * CAS'ed variable. There are also other coding oddities (including 525 * several unnecessary-looking hoisted null checks) that help 526 * some methods perform reasonably even when interpreted (not 527 * compiled). 528 * 529 * The order of declarations in this file is: 530 * (1) Static utility functions 531 * (2) Nested (static) classes 532 * (3) Static fields 533 * (4) Fields, along with constants used when unpacking some of them 534 * (5) Internal control methods 535 * (6) Callbacks and other support for ForkJoinTask methods 536 * (7) Exported methods 537 * (8) Static block initializing statics in minimally dependent order 538 */ 539 540 // Static utilities 541 542 /** 543 * If there is a security manager, makes sure caller has 544 * permission to modify threads. 545 */ 546 private static void checkPermission() { 547 SecurityManager security = System.getSecurityManager(); 548 if (security != null) 549 security.checkPermission(modifyThreadPermission); 550 } 551 552 // Nested classes 553 554 /** 555 * Factory for creating new {@link ForkJoinWorkerThread}s. 556 * A {@code ForkJoinWorkerThreadFactory} must be defined and used 557 * for {@code ForkJoinWorkerThread} subclasses that extend base 558 * functionality or initialize threads with different contexts. 559 */ 560 public static interface ForkJoinWorkerThreadFactory { 561 /** 562 * Returns a new worker thread operating in the given pool. 563 * 564 * @param pool the pool this thread works in 565 * @return the new worker thread 566 * @throws NullPointerException if the pool is null 567 */ 568 public ForkJoinWorkerThread newThread(ForkJoinPool pool); 569 } 570 571 /** 572 * Default ForkJoinWorkerThreadFactory implementation; creates a 573 * new ForkJoinWorkerThread. 574 */ 575 static final class DefaultForkJoinWorkerThreadFactory 576 implements ForkJoinWorkerThreadFactory { 577 public final ForkJoinWorkerThread newThread(ForkJoinPool pool) { 578 return new ForkJoinWorkerThread(pool); 579 } 580 } 581 582 /** 583 * Class for artificial tasks that are used to replace the target 584 * of local joins if they are removed from an interior queue slot 585 * in WorkQueue.tryRemoveAndExec. We don't need the proxy to 586 * actually do anything beyond having a unique identity. 587 */ 588 static final class EmptyTask extends ForkJoinTask<Void> { 589 private static final long serialVersionUID = -7721805057305804111L; 590 EmptyTask() { status = ForkJoinTask.NORMAL; } // force done 591 public final Void getRawResult() { return null; } 592 public final void setRawResult(Void x) {} 593 public final boolean exec() { return true; } 594 } 595 596 /** 597 * Queues supporting work-stealing as well as external task 598 * submission. See above for main rationale and algorithms. 599 * Implementation relies heavily on "Unsafe" intrinsics 600 * and selective use of "volatile": 601 * 602 * Field "base" is the index (mod array.length) of the least valid 603 * queue slot, which is always the next position to steal (poll) 604 * from if nonempty. Reads and writes require volatile orderings 605 * but not CAS, because updates are only performed after slot 606 * CASes. 607 * 608 * Field "top" is the index (mod array.length) of the next queue 609 * slot to push to or pop from. It is written only by owner thread 610 * for push, or under lock for external/shared push, and accessed 611 * by other threads only after reading (volatile) base. Both top 612 * and base are allowed to wrap around on overflow, but (top - 613 * base) (or more commonly -(base - top) to force volatile read of 614 * base before top) still estimates size. The lock ("qlock") is 615 * forced to -1 on termination, causing all further lock attempts 616 * to fail. (Note: we don't need CAS for termination state because 617 * upon pool shutdown, all shared-queues will stop being used 618 * anyway.) Nearly all lock bodies are set up so that exceptions 619 * within lock bodies are "impossible" (modulo JVM errors that 620 * would cause failure anyway.) 621 * 622 * The array slots are read and written using the emulation of 623 * volatiles/atomics provided by Unsafe. Insertions must in 624 * general use putOrderedObject as a form of releasing store to 625 * ensure that all writes to the task object are ordered before 626 * its publication in the queue. All removals entail a CAS to 627 * null. The array is always a power of two. To ensure safety of 628 * Unsafe array operations, all accesses perform explicit null 629 * checks and implicit bounds checks via power-of-two masking. 630 * 631 * In addition to basic queuing support, this class contains 632 * fields described elsewhere to control execution. It turns out 633 * to work better memory-layout-wise to include them in this class 634 * rather than a separate class. 635 * 636 * Performance on most platforms is very sensitive to placement of 637 * instances of both WorkQueues and their arrays -- we absolutely 638 * do not want multiple WorkQueue instances or multiple queue 639 * arrays sharing cache lines. (It would be best for queue objects 640 * and their arrays to share, but there is nothing available to 641 * help arrange that). The @Contended annotation alerts JVMs to 642 * try to keep instances apart. 643 */ 644 @sun.misc.Contended 645 static final class WorkQueue { 646 /** 647 * Capacity of work-stealing queue array upon initialization. 648 * Must be a power of two; at least 4, but should be larger to 649 * reduce or eliminate cacheline sharing among queues. 650 * Currently, it is much larger, as a partial workaround for 651 * the fact that JVMs often place arrays in locations that 652 * share GC bookkeeping (especially cardmarks) such that 653 * per-write accesses encounter serious memory contention. 654 */ 655 static final int INITIAL_QUEUE_CAPACITY = 1 << 13; 656 657 /** 658 * Maximum size for queue arrays. Must be a power of two less 659 * than or equal to 1 << (31 - width of array entry) to ensure 660 * lack of wraparound of index calculations, but defined to a 661 * value a bit less than this to help users trap runaway 662 * programs before saturating systems. 663 */ 664 static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M 665 666 volatile int eventCount; // encoded inactivation count; < 0 if inactive 667 int nextWait; // encoded record of next event waiter 668 int nsteals; // number of steals 669 int hint; // steal index hint 670 short poolIndex; // index of this queue in pool 671 final short mode; // 0: lifo, > 0: fifo, < 0: shared 672 volatile int qlock; // 1: locked, -1: terminate; else 0 673 volatile int base; // index of next slot for poll 674 int top; // index of next slot for push 675 ForkJoinTask<?>[] array; // the elements (initially unallocated) 676 final ForkJoinPool pool; // the containing pool (may be null) 677 final ForkJoinWorkerThread owner; // owning thread or null if shared 678 volatile Thread parker; // == owner during call to park; else null 679 volatile ForkJoinTask<?> currentJoin; // task being joined in awaitJoin 680 ForkJoinTask<?> currentSteal; // current non-local task being executed 681 682 WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner, int mode, 683 int seed) { 684 this.pool = pool; 685 this.owner = owner; 686 this.mode = (short)mode; 687 this.hint = seed; // store initial seed for runWorker 688 // Place indices in the center of array (that is not yet allocated) 689 base = top = INITIAL_QUEUE_CAPACITY >>> 1; 690 } 691 692 /** 693 * Returns the approximate number of tasks in the queue. 694 */ 695 final int queueSize() { 696 int n = base - top; // non-owner callers must read base first 697 return (n >= 0) ? 0 : -n; // ignore transient negative 698 } 699 700 /** 701 * Provides a more accurate estimate of whether this queue has 702 * any tasks than does queueSize, by checking whether a 703 * near-empty queue has at least one unclaimed task. 704 */ 705 final boolean isEmpty() { 706 ForkJoinTask<?>[] a; int m, s; 707 int n = base - (s = top); 708 return (n >= 0 || 709 (n == -1 && 710 ((a = array) == null || 711 (m = a.length - 1) < 0 || 712 U.getObject 713 (a, (long)((m & (s - 1)) << ASHIFT) + ABASE) == null))); 714 } 715 716 /** 717 * Pushes a task. Call only by owner in unshared queues. (The 718 * shared-queue version is embedded in method externalPush.) 719 * 720 * @param task the task. Caller must ensure non-null. 721 * @throws RejectedExecutionException if array cannot be resized 722 */ 723 final void push(ForkJoinTask<?> task) { 724 ForkJoinTask<?>[] a; ForkJoinPool p; 725 int s = top, n; 726 if ((a = array) != null) { // ignore if queue removed 727 int m = a.length - 1; 728 U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task); 729 if ((n = (top = s + 1) - base) <= 2) 730 (p = pool).signalWork(p.workQueues, this); 731 else if (n >= m) 732 growArray(); 733 } 734 } 735 736 /** 737 * Initializes or doubles the capacity of array. Call either 738 * by owner or with lock held -- it is OK for base, but not 739 * top, to move while resizings are in progress. 740 */ 741 final ForkJoinTask<?>[] growArray() { 742 ForkJoinTask<?>[] oldA = array; 743 int size = oldA != null ? oldA.length << 1 : INITIAL_QUEUE_CAPACITY; 744 if (size > MAXIMUM_QUEUE_CAPACITY) 745 throw new RejectedExecutionException("Queue capacity exceeded"); 746 int oldMask, t, b; 747 ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size]; 748 if (oldA != null && (oldMask = oldA.length - 1) >= 0 && 749 (t = top) - (b = base) > 0) { 750 int mask = size - 1; 751 do { 752 ForkJoinTask<?> x; 753 int oldj = ((b & oldMask) << ASHIFT) + ABASE; 754 int j = ((b & mask) << ASHIFT) + ABASE; 755 x = (ForkJoinTask<?>)U.getObjectVolatile(oldA, oldj); 756 if (x != null && 757 U.compareAndSwapObject(oldA, oldj, x, null)) 758 U.putObjectVolatile(a, j, x); 759 } while (++b != t); 760 } 761 return a; 762 } 763 764 /** 765 * Takes next task, if one exists, in LIFO order. Call only 766 * by owner in unshared queues. 767 */ 768 final ForkJoinTask<?> pop() { 769 ForkJoinTask<?>[] a; ForkJoinTask<?> t; int m; 770 if ((a = array) != null && (m = a.length - 1) >= 0) { 771 for (int s; (s = top - 1) - base >= 0;) { 772 long j = ((m & s) << ASHIFT) + ABASE; 773 if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null) 774 break; 775 if (U.compareAndSwapObject(a, j, t, null)) { 776 top = s; 777 return t; 778 } 779 } 780 } 781 return null; 782 } 783 784 /** 785 * Takes a task in FIFO order if b is base of queue and a task 786 * can be claimed without contention. Specialized versions 787 * appear in ForkJoinPool methods scan and tryHelpStealer. 788 */ 789 final ForkJoinTask<?> pollAt(int b) { 790 ForkJoinTask<?> t; ForkJoinTask<?>[] a; 791 if ((a = array) != null) { 792 int j = (((a.length - 1) & b) << ASHIFT) + ABASE; 793 if ((t = (ForkJoinTask<?>)U.getObjectVolatile(a, j)) != null && 794 base == b && U.compareAndSwapObject(a, j, t, null)) { 795 U.putOrderedInt(this, QBASE, b + 1); 796 return t; 797 } 798 } 799 return null; 800 } 801 802 /** 803 * Takes next task, if one exists, in FIFO order. 804 */ 805 final ForkJoinTask<?> poll() { 806 ForkJoinTask<?>[] a; int b; ForkJoinTask<?> t; 807 while ((b = base) - top < 0 && (a = array) != null) { 808 int j = (((a.length - 1) & b) << ASHIFT) + ABASE; 809 t = (ForkJoinTask<?>)U.getObjectVolatile(a, j); 810 if (t != null) { 811 if (U.compareAndSwapObject(a, j, t, null)) { 812 U.putOrderedInt(this, QBASE, b + 1); 813 return t; 814 } 815 } 816 else if (base == b) { 817 if (b + 1 == top) 818 break; 819 Thread.yield(); // wait for lagging update (very rare) 820 } 821 } 822 return null; 823 } 824 825 /** 826 * Takes next task, if one exists, in order specified by mode. 827 */ 828 final ForkJoinTask<?> nextLocalTask() { 829 return mode == 0 ? pop() : poll(); 830 } 831 832 /** 833 * Returns next task, if one exists, in order specified by mode. 834 */ 835 final ForkJoinTask<?> peek() { 836 ForkJoinTask<?>[] a = array; int m; 837 if (a == null || (m = a.length - 1) < 0) 838 return null; 839 int i = mode == 0 ? top - 1 : base; 840 int j = ((i & m) << ASHIFT) + ABASE; 841 return (ForkJoinTask<?>)U.getObjectVolatile(a, j); 842 } 843 844 /** 845 * Pops the given task only if it is at the current top. 846 * (A shared version is available only via FJP.tryExternalUnpush) 847 */ 848 final boolean tryUnpush(ForkJoinTask<?> t) { 849 ForkJoinTask<?>[] a; int s; 850 if ((a = array) != null && (s = top) != base && 851 U.compareAndSwapObject 852 (a, (((a.length - 1) & --s) << ASHIFT) + ABASE, t, null)) { 853 top = s; 854 return true; 855 } 856 return false; 857 } 858 859 /** 860 * Removes and cancels all known tasks, ignoring any exceptions. 861 */ 862 final void cancelAll() { 863 ForkJoinTask.cancelIgnoringExceptions(currentJoin); 864 ForkJoinTask.cancelIgnoringExceptions(currentSteal); 865 for (ForkJoinTask<?> t; (t = poll()) != null; ) 866 ForkJoinTask.cancelIgnoringExceptions(t); 867 } 868 869 // Specialized execution methods 870 871 /** 872 * Polls and runs tasks until empty. 873 */ 874 final void pollAndExecAll() { 875 for (ForkJoinTask<?> t; (t = poll()) != null;) 876 t.doExec(); 877 } 878 879 /** 880 * Executes a top-level task and any local tasks remaining 881 * after execution. 882 */ 883 final void runTask(ForkJoinTask<?> task) { 884 if ((currentSteal = task) != null) { 885 task.doExec(); 886 ForkJoinTask<?>[] a = array; 887 int md = mode; 888 ++nsteals; 889 currentSteal = null; 890 if (md != 0) 891 pollAndExecAll(); 892 else if (a != null) { 893 int s, m = a.length - 1; 894 ForkJoinTask<?> t; 895 while ((s = top - 1) - base >= 0 && 896 (t = (ForkJoinTask<?>)U.getAndSetObject 897 (a, ((m & s) << ASHIFT) + ABASE, null)) != null) { 898 top = s; 899 t.doExec(); 900 } 901 } 902 } 903 } 904 905 /** 906 * If present, removes from queue and executes the given task, 907 * or any other cancelled task. Returns (true) on any CAS 908 * or consistency check failure so caller can retry. 909 * 910 * @return false if no progress can be made, else true 911 */ 912 final boolean tryRemoveAndExec(ForkJoinTask<?> task) { 913 boolean stat; 914 ForkJoinTask<?>[] a; int m, s, b, n; 915 if (task != null && (a = array) != null && (m = a.length - 1) >= 0 && 916 (n = (s = top) - (b = base)) > 0) { 917 boolean removed = false, empty = true; 918 stat = true; 919 for (ForkJoinTask<?> t;;) { // traverse from s to b 920 long j = ((--s & m) << ASHIFT) + ABASE; 921 t = (ForkJoinTask<?>)U.getObject(a, j); 922 if (t == null) // inconsistent length 923 break; 924 else if (t == task) { 925 if (s + 1 == top) { // pop 926 if (!U.compareAndSwapObject(a, j, task, null)) 927 break; 928 top = s; 929 removed = true; 930 } 931 else if (base == b) // replace with proxy 932 removed = U.compareAndSwapObject(a, j, task, 933 new EmptyTask()); 934 break; 935 } 936 else if (t.status >= 0) 937 empty = false; 938 else if (s + 1 == top) { // pop and throw away 939 if (U.compareAndSwapObject(a, j, t, null)) 940 top = s; 941 break; 942 } 943 if (--n == 0) { 944 if (!empty && base == b) 945 stat = false; 946 break; 947 } 948 } 949 if (removed) 950 task.doExec(); 951 } 952 else 953 stat = false; 954 return stat; 955 } 956 957 /** 958 * Tries to poll for and execute the given task or any other 959 * task in its CountedCompleter computation. 960 */ 961 final boolean pollAndExecCC(CountedCompleter<?> root) { 962 ForkJoinTask<?>[] a; int b; Object o; CountedCompleter<?> t, r; 963 if ((b = base) - top < 0 && (a = array) != null) { 964 long j = (((a.length - 1) & b) << ASHIFT) + ABASE; 965 if ((o = U.getObjectVolatile(a, j)) == null) 966 return true; // retry 967 if (o instanceof CountedCompleter) { 968 for (t = (CountedCompleter<?>)o, r = t;;) { 969 if (r == root) { 970 if (base == b && 971 U.compareAndSwapObject(a, j, t, null)) { 972 U.putOrderedInt(this, QBASE, b + 1); 973 t.doExec(); 974 } 975 return true; 976 } 977 else if ((r = r.completer) == null) 978 break; // not part of root computation 979 } 980 } 981 } 982 return false; 983 } 984 985 /** 986 * Tries to pop and execute the given task or any other task 987 * in its CountedCompleter computation. 988 */ 989 final boolean externalPopAndExecCC(CountedCompleter<?> root) { 990 ForkJoinTask<?>[] a; int s; Object o; CountedCompleter<?> t, r; 991 if (base - (s = top) < 0 && (a = array) != null) { 992 long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE; 993 if ((o = U.getObject(a, j)) instanceof CountedCompleter) { 994 for (t = (CountedCompleter<?>)o, r = t;;) { 995 if (r == root) { 996 if (U.compareAndSwapInt(this, QLOCK, 0, 1)) { 997 if (top == s && array == a && 998 U.compareAndSwapObject(a, j, t, null)) { 999 top = s - 1; 1000 qlock = 0; 1001 t.doExec(); 1002 } 1003 else 1004 qlock = 0; 1005 } 1006 return true; 1007 } 1008 else if ((r = r.completer) == null) 1009 break; 1010 } 1011 } 1012 } 1013 return false; 1014 } 1015 1016 /** 1017 * Internal version 1018 */ 1019 final boolean internalPopAndExecCC(CountedCompleter<?> root) { 1020 ForkJoinTask<?>[] a; int s; Object o; CountedCompleter<?> t, r; 1021 if (base - (s = top) < 0 && (a = array) != null) { 1022 long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE; 1023 if ((o = U.getObject(a, j)) instanceof CountedCompleter) { 1024 for (t = (CountedCompleter<?>)o, r = t;;) { 1025 if (r == root) { 1026 if (U.compareAndSwapObject(a, j, t, null)) { 1027 top = s - 1; 1028 t.doExec(); 1029 } 1030 return true; 1031 } 1032 else if ((r = r.completer) == null) 1033 break; 1034 } 1035 } 1036 } 1037 return false; 1038 } 1039 1040 /** 1041 * Returns true if owned and not known to be blocked. 1042 */ 1043 final boolean isApparentlyUnblocked() { 1044 Thread wt; Thread.State s; 1045 return (eventCount >= 0 && 1046 (wt = owner) != null && 1047 (s = wt.getState()) != Thread.State.BLOCKED && 1048 s != Thread.State.WAITING && 1049 s != Thread.State.TIMED_WAITING); 1050 } 1051 1052 // Unsafe mechanics 1053 private static final sun.misc.Unsafe U; 1054 private static final long QBASE; 1055 private static final long QLOCK; 1056 private static final int ABASE; 1057 private static final int ASHIFT; 1058 static { 1059 try { 1060 U = sun.misc.Unsafe.getUnsafe(); 1061 Class<?> k = WorkQueue.class; 1062 Class<?> ak = ForkJoinTask[].class; 1063 QBASE = U.objectFieldOffset 1064 (k.getDeclaredField("base")); 1065 QLOCK = U.objectFieldOffset 1066 (k.getDeclaredField("qlock")); 1067 ABASE = U.arrayBaseOffset(ak); 1068 int scale = U.arrayIndexScale(ak); 1069 if ((scale & (scale - 1)) != 0) 1070 throw new Error("data type scale not a power of two"); 1071 ASHIFT = 31 - Integer.numberOfLeadingZeros(scale); 1072 } catch (Exception e) { 1073 throw new Error(e); 1074 } 1075 } 1076 } 1077 1078 // static fields (initialized in static initializer below) 1079 1080 /** 1081 * Creates a new ForkJoinWorkerThread. This factory is used unless 1082 * overridden in ForkJoinPool constructors. 1083 */ 1084 public static final ForkJoinWorkerThreadFactory 1085 defaultForkJoinWorkerThreadFactory; 1086 1087 /** 1088 * Permission required for callers of methods that may start or 1089 * kill threads. 1090 */ 1091 private static final RuntimePermission modifyThreadPermission; 1092 1093 /** 1094 * Common (static) pool. Non-null for public use unless a static 1095 * construction exception, but internal usages null-check on use 1096 * to paranoically avoid potential initialization circularities 1097 * as well as to simplify generated code. 1098 */ 1099 static final ForkJoinPool common; 1100 1101 /** 1102 * Common pool parallelism. To allow simpler use and management 1103 * when common pool threads are disabled, we allow the underlying 1104 * common.parallelism field to be zero, but in that case still report 1105 * parallelism as 1 to reflect resulting caller-runs mechanics. 1106 */ 1107 static final int commonParallelism; 1108 1109 /** 1110 * Sequence number for creating workerNamePrefix. 1111 */ 1112 private static int poolNumberSequence; 1113 1114 /** 1115 * Returns the next sequence number. We don't expect this to 1116 * ever contend, so use simple builtin sync. 1117 */ 1118 private static final synchronized int nextPoolId() { 1119 return ++poolNumberSequence; 1120 } 1121 1122 // static constants 1123 1124 /** 1125 * Initial timeout value (in nanoseconds) for the thread 1126 * triggering quiescence to park waiting for new work. On timeout, 1127 * the thread will instead try to shrink the number of 1128 * workers. The value should be large enough to avoid overly 1129 * aggressive shrinkage during most transient stalls (long GCs 1130 * etc). 1131 */ 1132 private static final long IDLE_TIMEOUT = 2000L * 1000L * 1000L; // 2sec 1133 1134 /** 1135 * Timeout value when there are more threads than parallelism level 1136 */ 1137 private static final long FAST_IDLE_TIMEOUT = 200L * 1000L * 1000L; 1138 1139 /** 1140 * Tolerance for idle timeouts, to cope with timer undershoots 1141 */ 1142 private static final long TIMEOUT_SLOP = 2000000L; 1143 1144 /** 1145 * The maximum stolen->joining link depth allowed in method 1146 * tryHelpStealer. Must be a power of two. Depths for legitimate 1147 * chains are unbounded, but we use a fixed constant to avoid 1148 * (otherwise unchecked) cycles and to bound staleness of 1149 * traversal parameters at the expense of sometimes blocking when 1150 * we could be helping. 1151 */ 1152 private static final int MAX_HELP = 64; 1153 1154 /** 1155 * Increment for seed generators. See class ThreadLocal for 1156 * explanation. 1157 */ 1158 private static final int SEED_INCREMENT = 0x61c88647; 1159 1160 /* 1161 * Bits and masks for control variables 1162 * 1163 * Field ctl is a long packed with: 1164 * AC: Number of active running workers minus target parallelism (16 bits) 1165 * TC: Number of total workers minus target parallelism (16 bits) 1166 * ST: true if pool is terminating (1 bit) 1167 * EC: the wait count of top waiting thread (15 bits) 1168 * ID: poolIndex of top of Treiber stack of waiters (16 bits) 1169 * 1170 * When convenient, we can extract the upper 32 bits of counts and 1171 * the lower 32 bits of queue state, u = (int)(ctl >>> 32) and e = 1172 * (int)ctl. The ec field is never accessed alone, but always 1173 * together with id and st. The offsets of counts by the target 1174 * parallelism and the positionings of fields makes it possible to 1175 * perform the most common checks via sign tests of fields: When 1176 * ac is negative, there are not enough active workers, when tc is 1177 * negative, there are not enough total workers, and when e is 1178 * negative, the pool is terminating. To deal with these possibly 1179 * negative fields, we use casts in and out of "short" and/or 1180 * signed shifts to maintain signedness. 1181 * 1182 * When a thread is queued (inactivated), its eventCount field is 1183 * set negative, which is the only way to tell if a worker is 1184 * prevented from executing tasks, even though it must continue to 1185 * scan for them to avoid queuing races. Note however that 1186 * eventCount updates lag releases so usage requires care. 1187 * 1188 * Field plock is an int packed with: 1189 * SHUTDOWN: true if shutdown is enabled (1 bit) 1190 * SEQ: a sequence lock, with PL_LOCK bit set if locked (30 bits) 1191 * SIGNAL: set when threads may be waiting on the lock (1 bit) 1192 * 1193 * The sequence number enables simple consistency checks: 1194 * Staleness of read-only operations on the workQueues array can 1195 * be checked by comparing plock before vs after the reads. 1196 */ 1197 1198 // bit positions/shifts for fields 1199 private static final int AC_SHIFT = 48; 1200 private static final int TC_SHIFT = 32; 1201 private static final int ST_SHIFT = 31; 1202 private static final int EC_SHIFT = 16; 1203 1204 // bounds 1205 private static final int SMASK = 0xffff; // short bits 1206 private static final int MAX_CAP = 0x7fff; // max #workers - 1 1207 private static final int EVENMASK = 0xfffe; // even short bits 1208 private static final int SQMASK = 0x007e; // max 64 (even) slots 1209 private static final int SHORT_SIGN = 1 << 15; 1210 private static final int INT_SIGN = 1 << 31; 1211 1212 // masks 1213 private static final long STOP_BIT = 0x0001L << ST_SHIFT; 1214 private static final long AC_MASK = ((long)SMASK) << AC_SHIFT; 1215 private static final long TC_MASK = ((long)SMASK) << TC_SHIFT; 1216 1217 // units for incrementing and decrementing 1218 private static final long TC_UNIT = 1L << TC_SHIFT; 1219 private static final long AC_UNIT = 1L << AC_SHIFT; 1220 1221 // masks and units for dealing with u = (int)(ctl >>> 32) 1222 private static final int UAC_SHIFT = AC_SHIFT - 32; 1223 private static final int UTC_SHIFT = TC_SHIFT - 32; 1224 private static final int UAC_MASK = SMASK << UAC_SHIFT; 1225 private static final int UTC_MASK = SMASK << UTC_SHIFT; 1226 private static final int UAC_UNIT = 1 << UAC_SHIFT; 1227 private static final int UTC_UNIT = 1 << UTC_SHIFT; 1228 1229 // masks and units for dealing with e = (int)ctl 1230 private static final int E_MASK = 0x7fffffff; // no STOP_BIT 1231 private static final int E_SEQ = 1 << EC_SHIFT; 1232 1233 // plock bits 1234 private static final int SHUTDOWN = 1 << 31; 1235 private static final int PL_LOCK = 2; 1236 private static final int PL_SIGNAL = 1; 1237 private static final int PL_SPINS = 1 << 8; 1238 1239 // access mode for WorkQueue 1240 static final int LIFO_QUEUE = 0; 1241 static final int FIFO_QUEUE = 1; 1242 static final int SHARED_QUEUE = -1; 1243 1244 // Instance fields 1245 volatile long stealCount; // collects worker counts 1246 volatile long ctl; // main pool control 1247 volatile int plock; // shutdown status and seqLock 1248 volatile int indexSeed; // worker/submitter index seed 1249 final short parallelism; // parallelism level 1250 final short mode; // LIFO/FIFO 1251 WorkQueue[] workQueues; // main registry 1252 final ForkJoinWorkerThreadFactory factory; 1253 final UncaughtExceptionHandler ueh; // per-worker UEH 1254 final String workerNamePrefix; // to create worker name string 1255 1256 /** 1257 * Acquires the plock lock to protect worker array and related 1258 * updates. This method is called only if an initial CAS on plock 1259 * fails. This acts as a spinlock for normal cases, but falls back 1260 * to builtin monitor to block when (rarely) needed. This would be 1261 * a terrible idea for a highly contended lock, but works fine as 1262 * a more conservative alternative to a pure spinlock. 1263 */ 1264 private int acquirePlock() { 1265 int spins = PL_SPINS, ps, nps; 1266 for (;;) { 1267 if (((ps = plock) & PL_LOCK) == 0 && 1268 U.compareAndSwapInt(this, PLOCK, ps, nps = ps + PL_LOCK)) 1269 return nps; 1270 else if (spins >= 0) { 1271 if (ThreadLocalRandom.nextSecondarySeed() >= 0) 1272 --spins; 1273 } 1274 else if (U.compareAndSwapInt(this, PLOCK, ps, ps | PL_SIGNAL)) { 1275 synchronized (this) { 1276 if ((plock & PL_SIGNAL) != 0) { 1277 try { 1278 wait(); 1279 } catch (InterruptedException ie) { 1280 try { 1281 Thread.currentThread().interrupt(); 1282 } catch (SecurityException ignore) { 1283 } 1284 } 1285 } 1286 else 1287 notifyAll(); 1288 } 1289 } 1290 } 1291 } 1292 1293 /** 1294 * Unlocks and signals any thread waiting for plock. Called only 1295 * when CAS of seq value for unlock fails. 1296 */ 1297 private void releasePlock(int ps) { 1298 plock = ps; 1299 synchronized (this) { notifyAll(); } 1300 } 1301 1302 /** 1303 * Tries to create and start one worker if fewer than target 1304 * parallelism level exist. Adjusts counts etc on failure. 1305 */ 1306 private void tryAddWorker() { 1307 long c; int u, e; 1308 while ((u = (int)((c = ctl) >>> 32)) < 0 && 1309 (u & SHORT_SIGN) != 0 && (e = (int)c) >= 0) { 1310 long nc = ((long)(((u + UTC_UNIT) & UTC_MASK) | 1311 ((u + UAC_UNIT) & UAC_MASK)) << 32) | (long)e; 1312 if (U.compareAndSwapLong(this, CTL, c, nc)) { 1313 ForkJoinWorkerThreadFactory fac; 1314 Throwable ex = null; 1315 ForkJoinWorkerThread wt = null; 1316 try { 1317 if ((fac = factory) != null && 1318 (wt = fac.newThread(this)) != null) { 1319 wt.start(); 1320 break; 1321 } 1322 } catch (Throwable rex) { 1323 ex = rex; 1324 } 1325 deregisterWorker(wt, ex); 1326 break; 1327 } 1328 } 1329 } 1330 1331 // Registering and deregistering workers 1332 1333 /** 1334 * Callback from ForkJoinWorkerThread to establish and record its 1335 * WorkQueue. To avoid scanning bias due to packing entries in 1336 * front of the workQueues array, we treat the array as a simple 1337 * power-of-two hash table using per-thread seed as hash, 1338 * expanding as needed. 1339 * 1340 * @param wt the worker thread 1341 * @return the worker's queue 1342 */ 1343 final WorkQueue registerWorker(ForkJoinWorkerThread wt) { 1344 UncaughtExceptionHandler handler; WorkQueue[] ws; int s, ps; 1345 wt.setDaemon(true); 1346 if ((handler = ueh) != null) 1347 wt.setUncaughtExceptionHandler(handler); 1348 do {} while (!U.compareAndSwapInt(this, INDEXSEED, s = indexSeed, 1349 s += SEED_INCREMENT) || 1350 s == 0); // skip 0 1351 WorkQueue w = new WorkQueue(this, wt, mode, s); 1352 if (((ps = plock) & PL_LOCK) != 0 || 1353 !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK)) 1354 ps = acquirePlock(); 1355 int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN); 1356 try { 1357 if ((ws = workQueues) != null) { // skip if shutting down 1358 int n = ws.length, m = n - 1; 1359 int r = (s << 1) | 1; // use odd-numbered indices 1360 if (ws[r &= m] != null) { // collision 1361 int probes = 0; // step by approx half size 1362 int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2; 1363 while (ws[r = (r + step) & m] != null) { 1364 if (++probes >= n) { 1365 workQueues = ws = Arrays.copyOf(ws, n <<= 1); 1366 m = n - 1; 1367 probes = 0; 1368 } 1369 } 1370 } 1371 w.poolIndex = (short)r; 1372 w.eventCount = r; // volatile write orders 1373 ws[r] = w; 1374 } 1375 } finally { 1376 if (!U.compareAndSwapInt(this, PLOCK, ps, nps)) 1377 releasePlock(nps); 1378 } 1379 wt.setName(workerNamePrefix.concat(Integer.toString(w.poolIndex >>> 1))); 1380 return w; 1381 } 1382 1383 /** 1384 * Final callback from terminating worker, as well as upon failure 1385 * to construct or start a worker. Removes record of worker from 1386 * array, and adjusts counts. If pool is shutting down, tries to 1387 * complete termination. 1388 * 1389 * @param wt the worker thread, or null if construction failed 1390 * @param ex the exception causing failure, or null if none 1391 */ 1392 final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) { 1393 WorkQueue w = null; 1394 if (wt != null && (w = wt.workQueue) != null) { 1395 int ps; 1396 w.qlock = -1; // ensure set 1397 U.getAndAddLong(this, STEALCOUNT, w.nsteals); // collect steals 1398 if (((ps = plock) & PL_LOCK) != 0 || 1399 !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK)) 1400 ps = acquirePlock(); 1401 int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN); 1402 try { 1403 int idx = w.poolIndex; 1404 WorkQueue[] ws = workQueues; 1405 if (ws != null && idx >= 0 && idx < ws.length && ws[idx] == w) 1406 ws[idx] = null; 1407 } finally { 1408 if (!U.compareAndSwapInt(this, PLOCK, ps, nps)) 1409 releasePlock(nps); 1410 } 1411 } 1412 1413 long c; // adjust ctl counts 1414 do {} while (!U.compareAndSwapLong 1415 (this, CTL, c = ctl, (((c - AC_UNIT) & AC_MASK) | 1416 ((c - TC_UNIT) & TC_MASK) | 1417 (c & ~(AC_MASK|TC_MASK))))); 1418 1419 if (!tryTerminate(false, false) && w != null && w.array != null) { 1420 w.cancelAll(); // cancel remaining tasks 1421 WorkQueue[] ws; WorkQueue v; Thread p; int u, i, e; 1422 while ((u = (int)((c = ctl) >>> 32)) < 0 && (e = (int)c) >= 0) { 1423 if (e > 0) { // activate or create replacement 1424 if ((ws = workQueues) == null || 1425 (i = e & SMASK) >= ws.length || 1426 (v = ws[i]) == null) 1427 break; 1428 long nc = (((long)(v.nextWait & E_MASK)) | 1429 ((long)(u + UAC_UNIT) << 32)); 1430 if (v.eventCount != (e | INT_SIGN)) 1431 break; 1432 if (U.compareAndSwapLong(this, CTL, c, nc)) { 1433 v.eventCount = (e + E_SEQ) & E_MASK; 1434 if ((p = v.parker) != null) 1435 U.unpark(p); 1436 break; 1437 } 1438 } 1439 else { 1440 if ((short)u < 0) 1441 tryAddWorker(); 1442 break; 1443 } 1444 } 1445 } 1446 if (ex == null) // help clean refs on way out 1447 ForkJoinTask.helpExpungeStaleExceptions(); 1448 else // rethrow 1449 ForkJoinTask.rethrow(ex); 1450 } 1451 1452 // Submissions 1453 1454 /** 1455 * Unless shutting down, adds the given task to a submission queue 1456 * at submitter's current queue index (modulo submission 1457 * range). Only the most common path is directly handled in this 1458 * method. All others are relayed to fullExternalPush. 1459 * 1460 * @param task the task. Caller must ensure non-null. 1461 */ 1462 final void externalPush(ForkJoinTask<?> task) { 1463 WorkQueue q; int m, s, n, am; ForkJoinTask<?>[] a; 1464 int r = ThreadLocalRandom.getProbe(); 1465 int ps = plock; 1466 WorkQueue[] ws = workQueues; 1467 if (ps > 0 && ws != null && (m = (ws.length - 1)) >= 0 && 1468 (q = ws[m & r & SQMASK]) != null && r != 0 && 1469 U.compareAndSwapInt(q, QLOCK, 0, 1)) { // lock 1470 if ((a = q.array) != null && 1471 (am = a.length - 1) > (n = (s = q.top) - q.base)) { 1472 int j = ((am & s) << ASHIFT) + ABASE; 1473 U.putOrderedObject(a, j, task); 1474 q.top = s + 1; // push on to deque 1475 q.qlock = 0; 1476 if (n <= 1) 1477 signalWork(ws, q); 1478 return; 1479 } 1480 q.qlock = 0; 1481 } 1482 fullExternalPush(task); 1483 } 1484 1485 /** 1486 * Full version of externalPush. This method is called, among 1487 * other times, upon the first submission of the first task to the 1488 * pool, so must perform secondary initialization. It also 1489 * detects first submission by an external thread by looking up 1490 * its ThreadLocal, and creates a new shared queue if the one at 1491 * index if empty or contended. The plock lock body must be 1492 * exception-free (so no try/finally) so we optimistically 1493 * allocate new queues outside the lock and throw them away if 1494 * (very rarely) not needed. 1495 * 1496 * Secondary initialization occurs when plock is zero, to create 1497 * workQueue array and set plock to a valid value. This lock body 1498 * must also be exception-free. Because the plock seq value can 1499 * eventually wrap around zero, this method harmlessly fails to 1500 * reinitialize if workQueues exists, while still advancing plock. 1501 */ 1502 private void fullExternalPush(ForkJoinTask<?> task) { 1503 int r; 1504 if ((r = ThreadLocalRandom.getProbe()) == 0) { 1505 ThreadLocalRandom.localInit(); 1506 r = ThreadLocalRandom.getProbe(); 1507 } 1508 for (;;) { 1509 WorkQueue[] ws; WorkQueue q; int ps, m, k; 1510 boolean move = false; 1511 if ((ps = plock) < 0) 1512 throw new RejectedExecutionException(); 1513 else if (ps == 0 || (ws = workQueues) == null || 1514 (m = ws.length - 1) < 0) { // initialize workQueues 1515 int p = parallelism; // find power of two table size 1516 int n = (p > 1) ? p - 1 : 1; // ensure at least 2 slots 1517 n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; 1518 n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1; 1519 WorkQueue[] nws = ((ws = workQueues) == null || ws.length == 0 ? 1520 new WorkQueue[n] : null); 1521 if (((ps = plock) & PL_LOCK) != 0 || 1522 !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK)) 1523 ps = acquirePlock(); 1524 if (((ws = workQueues) == null || ws.length == 0) && nws != null) 1525 workQueues = nws; 1526 int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN); 1527 if (!U.compareAndSwapInt(this, PLOCK, ps, nps)) 1528 releasePlock(nps); 1529 } 1530 else if ((q = ws[k = r & m & SQMASK]) != null) { 1531 if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) { 1532 ForkJoinTask<?>[] a = q.array; 1533 int s = q.top; 1534 boolean submitted = false; 1535 try { // locked version of push 1536 if ((a != null && a.length > s + 1 - q.base) || 1537 (a = q.growArray()) != null) { // must presize 1538 int j = (((a.length - 1) & s) << ASHIFT) + ABASE; 1539 U.putOrderedObject(a, j, task); 1540 q.top = s + 1; 1541 submitted = true; 1542 } 1543 } finally { 1544 q.qlock = 0; // unlock 1545 } 1546 if (submitted) { 1547 signalWork(ws, q); 1548 return; 1549 } 1550 } 1551 move = true; // move on failure 1552 } 1553 else if (((ps = plock) & PL_LOCK) == 0) { // create new queue 1554 q = new WorkQueue(this, null, SHARED_QUEUE, r); 1555 q.poolIndex = (short)k; 1556 if (((ps = plock) & PL_LOCK) != 0 || 1557 !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK)) 1558 ps = acquirePlock(); 1559 if ((ws = workQueues) != null && k < ws.length && ws[k] == null) 1560 ws[k] = q; 1561 int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN); 1562 if (!U.compareAndSwapInt(this, PLOCK, ps, nps)) 1563 releasePlock(nps); 1564 } 1565 else 1566 move = true; // move if busy 1567 if (move) 1568 r = ThreadLocalRandom.advanceProbe(r); 1569 } 1570 } 1571 1572 // Maintaining ctl counts 1573 1574 /** 1575 * Increments active count; mainly called upon return from blocking. 1576 */ 1577 final void incrementActiveCount() { 1578 long c; 1579 do {} while (!U.compareAndSwapLong 1580 (this, CTL, c = ctl, ((c & ~AC_MASK) | 1581 ((c & AC_MASK) + AC_UNIT)))); 1582 } 1583 1584 /** 1585 * Tries to create or activate a worker if too few are active. 1586 * 1587 * @param ws the worker array to use to find signallees 1588 * @param q if non-null, the queue holding tasks to be processed 1589 */ 1590 final void signalWork(WorkQueue[] ws, WorkQueue q) { 1591 for (;;) { 1592 long c; int e, u, i; WorkQueue w; Thread p; 1593 if ((u = (int)((c = ctl) >>> 32)) >= 0) 1594 break; 1595 if ((e = (int)c) <= 0) { 1596 if ((short)u < 0) 1597 tryAddWorker(); 1598 break; 1599 } 1600 if (ws == null || ws.length <= (i = e & SMASK) || 1601 (w = ws[i]) == null) 1602 break; 1603 long nc = (((long)(w.nextWait & E_MASK)) | 1604 ((long)(u + UAC_UNIT)) << 32); 1605 int ne = (e + E_SEQ) & E_MASK; 1606 if (w.eventCount == (e | INT_SIGN) && 1607 U.compareAndSwapLong(this, CTL, c, nc)) { 1608 w.eventCount = ne; 1609 if ((p = w.parker) != null) 1610 U.unpark(p); 1611 break; 1612 } 1613 if (q != null && q.base >= q.top) 1614 break; 1615 } 1616 } 1617 1618 // Scanning for tasks 1619 1620 /** 1621 * Top-level runloop for workers, called by ForkJoinWorkerThread.run. 1622 */ 1623 final void runWorker(WorkQueue w) { 1624 w.growArray(); // allocate queue 1625 for (int r = w.hint; scan(w, r) == 0; ) { 1626 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift 1627 } 1628 } 1629 1630 /** 1631 * Scans for and, if found, runs one task, else possibly 1632 * inactivates the worker. This method operates on single reads of 1633 * volatile state and is designed to be re-invoked continuously, 1634 * in part because it returns upon detecting inconsistencies, 1635 * contention, or state changes that indicate possible success on 1636 * re-invocation. 1637 * 1638 * The scan searches for tasks across queues starting at a random 1639 * index, checking each at least twice. The scan terminates upon 1640 * either finding a non-empty queue, or completing the sweep. If 1641 * the worker is not inactivated, it takes and runs a task from 1642 * this queue. Otherwise, if not activated, it tries to activate 1643 * itself or some other worker by signalling. On failure to find a 1644 * task, returns (for retry) if pool state may have changed during 1645 * an empty scan, or tries to inactivate if active, else possibly 1646 * blocks or terminates via method awaitWork. 1647 * 1648 * @param w the worker (via its WorkQueue) 1649 * @param r a random seed 1650 * @return worker qlock status if would have waited, else 0 1651 */ 1652 private final int scan(WorkQueue w, int r) { 1653 WorkQueue[] ws; int m; 1654 long c = ctl; // for consistency check 1655 if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 && w != null) { 1656 for (int j = m + m + 1, ec = w.eventCount;;) { 1657 WorkQueue q; int b, e; ForkJoinTask<?>[] a; ForkJoinTask<?> t; 1658 if ((q = ws[(r - j) & m]) != null && 1659 (b = q.base) - q.top < 0 && (a = q.array) != null) { 1660 long i = (((a.length - 1) & b) << ASHIFT) + ABASE; 1661 if ((t = ((ForkJoinTask<?>) 1662 U.getObjectVolatile(a, i))) != null) { 1663 if (ec < 0) 1664 helpRelease(c, ws, w, q, b); 1665 else if (q.base == b && 1666 U.compareAndSwapObject(a, i, t, null)) { 1667 U.putOrderedInt(q, QBASE, b + 1); 1668 if ((b + 1) - q.top < 0) 1669 signalWork(ws, q); 1670 w.runTask(t); 1671 } 1672 } 1673 break; 1674 } 1675 else if (--j < 0) { 1676 if ((ec | (e = (int)c)) < 0) // inactive or terminating 1677 return awaitWork(w, c, ec); 1678 else if (ctl == c) { // try to inactivate and enqueue 1679 long nc = (long)ec | ((c - AC_UNIT) & (AC_MASK|TC_MASK)); 1680 w.nextWait = e; 1681 w.eventCount = ec | INT_SIGN; 1682 if (!U.compareAndSwapLong(this, CTL, c, nc)) 1683 w.eventCount = ec; // back out 1684 } 1685 break; 1686 } 1687 } 1688 } 1689 return 0; 1690 } 1691 1692 /** 1693 * A continuation of scan(), possibly blocking or terminating 1694 * worker w. Returns without blocking if pool state has apparently 1695 * changed since last invocation. Also, if inactivating w has 1696 * caused the pool to become quiescent, checks for pool 1697 * termination, and, so long as this is not the only worker, waits 1698 * for event for up to a given duration. On timeout, if ctl has 1699 * not changed, terminates the worker, which will in turn wake up 1700 * another worker to possibly repeat this process. 1701 * 1702 * @param w the calling worker 1703 * @param c the ctl value on entry to scan 1704 * @param ec the worker's eventCount on entry to scan 1705 */ 1706 private final int awaitWork(WorkQueue w, long c, int ec) { 1707 int stat, ns; long parkTime, deadline; 1708 if ((stat = w.qlock) >= 0 && w.eventCount == ec && ctl == c && 1709 !Thread.interrupted()) { 1710 int e = (int)c; 1711 int u = (int)(c >>> 32); 1712 int d = (u >> UAC_SHIFT) + parallelism; // active count 1713 1714 if (e < 0 || (d <= 0 && tryTerminate(false, false))) 1715 stat = w.qlock = -1; // pool is terminating 1716 else if ((ns = w.nsteals) != 0) { // collect steals and retry 1717 w.nsteals = 0; 1718 U.getAndAddLong(this, STEALCOUNT, (long)ns); 1719 } 1720 else { 1721 long pc = ((d > 0 || ec != (e | INT_SIGN)) ? 0L : 1722 ((long)(w.nextWait & E_MASK)) | // ctl to restore 1723 ((long)(u + UAC_UNIT)) << 32); 1724 if (pc != 0L) { // timed wait if last waiter 1725 int dc = -(short)(c >>> TC_SHIFT); 1726 parkTime = (dc < 0 ? FAST_IDLE_TIMEOUT: 1727 (dc + 1) * IDLE_TIMEOUT); 1728 deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP; 1729 } 1730 else 1731 parkTime = deadline = 0L; 1732 if (w.eventCount == ec && ctl == c) { 1733 Thread wt = Thread.currentThread(); 1734 U.putObject(wt, PARKBLOCKER, this); 1735 w.parker = wt; // emulate LockSupport.park 1736 if (w.eventCount == ec && ctl == c) 1737 U.park(false, parkTime); // must recheck before park 1738 w.parker = null; 1739 U.putObject(wt, PARKBLOCKER, null); 1740 if (parkTime != 0L && ctl == c && 1741 deadline - System.nanoTime() <= 0L && 1742 U.compareAndSwapLong(this, CTL, c, pc)) 1743 stat = w.qlock = -1; // shrink pool 1744 } 1745 } 1746 } 1747 return stat; 1748 } 1749 1750 /** 1751 * Possibly releases (signals) a worker. Called only from scan() 1752 * when a worker with apparently inactive status finds a non-empty 1753 * queue. This requires revalidating all of the associated state 1754 * from caller. 1755 */ 1756 private final void helpRelease(long c, WorkQueue[] ws, WorkQueue w, 1757 WorkQueue q, int b) { 1758 WorkQueue v; int e, i; Thread p; 1759 if (w != null && w.eventCount < 0 && (e = (int)c) > 0 && 1760 ws != null && ws.length > (i = e & SMASK) && 1761 (v = ws[i]) != null && ctl == c) { 1762 long nc = (((long)(v.nextWait & E_MASK)) | 1763 ((long)((int)(c >>> 32) + UAC_UNIT)) << 32); 1764 int ne = (e + E_SEQ) & E_MASK; 1765 if (q != null && q.base == b && w.eventCount < 0 && 1766 v.eventCount == (e | INT_SIGN) && 1767 U.compareAndSwapLong(this, CTL, c, nc)) { 1768 v.eventCount = ne; 1769 if ((p = v.parker) != null) 1770 U.unpark(p); 1771 } 1772 } 1773 } 1774 1775 /** 1776 * Tries to locate and execute tasks for a stealer of the given 1777 * task, or in turn one of its stealers, Traces currentSteal -> 1778 * currentJoin links looking for a thread working on a descendant 1779 * of the given task and with a non-empty queue to steal back and 1780 * execute tasks from. The first call to this method upon a 1781 * waiting join will often entail scanning/search, (which is OK 1782 * because the joiner has nothing better to do), but this method 1783 * leaves hints in workers to speed up subsequent calls. The 1784 * implementation is very branchy to cope with potential 1785 * inconsistencies or loops encountering chains that are stale, 1786 * unknown, or so long that they are likely cyclic. 1787 * 1788 * @param joiner the joining worker 1789 * @param task the task to join 1790 * @return 0 if no progress can be made, negative if task 1791 * known complete, else positive 1792 */ 1793 private int tryHelpStealer(WorkQueue joiner, ForkJoinTask<?> task) { 1794 int stat = 0, steps = 0; // bound to avoid cycles 1795 if (task != null && joiner != null && 1796 joiner.base - joiner.top >= 0) { // hoist checks 1797 restart: for (;;) { 1798 ForkJoinTask<?> subtask = task; // current target 1799 for (WorkQueue j = joiner, v;;) { // v is stealer of subtask 1800 WorkQueue[] ws; int m, s, h; 1801 if ((s = task.status) < 0) { 1802 stat = s; 1803 break restart; 1804 } 1805 if ((ws = workQueues) == null || (m = ws.length - 1) <= 0) 1806 break restart; // shutting down 1807 if ((v = ws[h = (j.hint | 1) & m]) == null || 1808 v.currentSteal != subtask) { 1809 for (int origin = h;;) { // find stealer 1810 if (((h = (h + 2) & m) & 15) == 1 && 1811 (subtask.status < 0 || j.currentJoin != subtask)) 1812 continue restart; // occasional staleness check 1813 if ((v = ws[h]) != null && 1814 v.currentSteal == subtask) { 1815 j.hint = h; // save hint 1816 break; 1817 } 1818 if (h == origin) 1819 break restart; // cannot find stealer 1820 } 1821 } 1822 for (;;) { // help stealer or descend to its stealer 1823 ForkJoinTask[] a; int b; 1824 if (subtask.status < 0) // surround probes with 1825 continue restart; // consistency checks 1826 if ((b = v.base) - v.top < 0 && (a = v.array) != null) { 1827 int i = (((a.length - 1) & b) << ASHIFT) + ABASE; 1828 ForkJoinTask<?> t = 1829 (ForkJoinTask<?>)U.getObjectVolatile(a, i); 1830 if (subtask.status < 0 || j.currentJoin != subtask || 1831 v.currentSteal != subtask) 1832 continue restart; // stale 1833 stat = 1; // apparent progress 1834 if (v.base == b) { 1835 if (t == null) 1836 break restart; 1837 if (U.compareAndSwapObject(a, i, t, null)) { 1838 U.putOrderedInt(v, QBASE, b + 1); 1839 ForkJoinTask<?> ps = joiner.currentSteal; 1840 int jt = joiner.top; 1841 do { 1842 joiner.currentSteal = t; 1843 t.doExec(); // clear local tasks too 1844 } while (task.status >= 0 && 1845 joiner.top != jt && 1846 (t = joiner.pop()) != null); 1847 joiner.currentSteal = ps; 1848 break restart; 1849 } 1850 } 1851 } 1852 else { // empty -- try to descend 1853 ForkJoinTask<?> next = v.currentJoin; 1854 if (subtask.status < 0 || j.currentJoin != subtask || 1855 v.currentSteal != subtask) 1856 continue restart; // stale 1857 else if (next == null || ++steps == MAX_HELP) 1858 break restart; // dead-end or maybe cyclic 1859 else { 1860 subtask = next; 1861 j = v; 1862 break; 1863 } 1864 } 1865 } 1866 } 1867 } 1868 } 1869 return stat; 1870 } 1871 1872 /** 1873 * Analog of tryHelpStealer for CountedCompleters. Tries to steal 1874 * and run tasks within the target's computation. 1875 * 1876 * @param task the task to join 1877 * @param maxTasks the maximum number of other tasks to run 1878 */ 1879 final int helpComplete(WorkQueue joiner, CountedCompleter<?> task, 1880 int maxTasks) { 1881 WorkQueue[] ws; int m; 1882 int s = 0; 1883 if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 && 1884 joiner != null && task != null) { 1885 int j = joiner.poolIndex; 1886 int scans = m + m + 1; 1887 long c = 0L; // for stability check 1888 for (int k = scans; ; j += 2) { 1889 WorkQueue q; 1890 if ((s = task.status) < 0) 1891 break; 1892 else if (joiner.internalPopAndExecCC(task)) { 1893 if (--maxTasks <= 0) { 1894 s = task.status; 1895 break; 1896 } 1897 k = scans; 1898 } 1899 else if ((s = task.status) < 0) 1900 break; 1901 else if ((q = ws[j & m]) != null && q.pollAndExecCC(task)) { 1902 if (--maxTasks <= 0) { 1903 s = task.status; 1904 break; 1905 } 1906 k = scans; 1907 } 1908 else if (--k < 0) { 1909 if (c == (c = ctl)) 1910 break; 1911 k = scans; 1912 } 1913 } 1914 } 1915 return s; 1916 } 1917 1918 /** 1919 * Tries to decrement active count (sometimes implicitly) and 1920 * possibly release or create a compensating worker in preparation 1921 * for blocking. Fails on contention or termination. Otherwise, 1922 * adds a new thread if no idle workers are available and pool 1923 * may become starved. 1924 * 1925 * @param c the assumed ctl value 1926 */ 1927 final boolean tryCompensate(long c) { 1928 WorkQueue[] ws = workQueues; 1929 int pc = parallelism, e = (int)c, m, tc; 1930 if (ws != null && (m = ws.length - 1) >= 0 && e >= 0 && ctl == c) { 1931 WorkQueue w = ws[e & m]; 1932 if (e != 0 && w != null) { 1933 Thread p; 1934 long nc = ((long)(w.nextWait & E_MASK) | 1935 (c & (AC_MASK|TC_MASK))); 1936 int ne = (e + E_SEQ) & E_MASK; 1937 if (w.eventCount == (e | INT_SIGN) && 1938 U.compareAndSwapLong(this, CTL, c, nc)) { 1939 w.eventCount = ne; 1940 if ((p = w.parker) != null) 1941 U.unpark(p); 1942 return true; // replace with idle worker 1943 } 1944 } 1945 else if ((tc = (short)(c >>> TC_SHIFT)) >= 0 && 1946 (int)(c >> AC_SHIFT) + pc > 1) { 1947 long nc = ((c - AC_UNIT) & AC_MASK) | (c & ~AC_MASK); 1948 if (U.compareAndSwapLong(this, CTL, c, nc)) 1949 return true; // no compensation 1950 } 1951 else if (tc + pc < MAX_CAP) { 1952 long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK); 1953 if (U.compareAndSwapLong(this, CTL, c, nc)) { 1954 ForkJoinWorkerThreadFactory fac; 1955 Throwable ex = null; 1956 ForkJoinWorkerThread wt = null; 1957 try { 1958 if ((fac = factory) != null && 1959 (wt = fac.newThread(this)) != null) { 1960 wt.start(); 1961 return true; 1962 } 1963 } catch (Throwable rex) { 1964 ex = rex; 1965 } 1966 deregisterWorker(wt, ex); // clean up and return false 1967 } 1968 } 1969 } 1970 return false; 1971 } 1972 1973 /** 1974 * Helps and/or blocks until the given task is done. 1975 * 1976 * @param joiner the joining worker 1977 * @param task the task 1978 * @return task status on exit 1979 */ 1980 final int awaitJoin(WorkQueue joiner, ForkJoinTask<?> task) { 1981 int s = 0; 1982 if (task != null && (s = task.status) >= 0 && joiner != null) { 1983 ForkJoinTask<?> prevJoin = joiner.currentJoin; 1984 joiner.currentJoin = task; 1985 do {} while (joiner.tryRemoveAndExec(task) && // process local tasks 1986 (s = task.status) >= 0); 1987 if (s >= 0 && (task instanceof CountedCompleter)) 1988 s = helpComplete(joiner, (CountedCompleter<?>)task, Integer.MAX_VALUE); 1989 long cc = 0; // for stability checks 1990 while (s >= 0 && (s = task.status) >= 0) { 1991 if ((s = tryHelpStealer(joiner, task)) == 0 && 1992 (s = task.status) >= 0) { 1993 if (!tryCompensate(cc)) 1994 cc = ctl; 1995 else { 1996 if (task.trySetSignal() && (s = task.status) >= 0) { 1997 synchronized (task) { 1998 if (task.status >= 0) { 1999 try { // see ForkJoinTask 2000 task.wait(); // for explanation 2001 } catch (InterruptedException ie) { 2002 } 2003 } 2004 else 2005 task.notifyAll(); 2006 } 2007 } 2008 long c; // reactivate 2009 do {} while (!U.compareAndSwapLong 2010 (this, CTL, c = ctl, 2011 ((c & ~AC_MASK) | 2012 ((c & AC_MASK) + AC_UNIT)))); 2013 } 2014 } 2015 } 2016 joiner.currentJoin = prevJoin; 2017 } 2018 return s; 2019 } 2020 2021 /** 2022 * Stripped-down variant of awaitJoin used by timed joins. Tries 2023 * to help join only while there is continuous progress. (Caller 2024 * will then enter a timed wait.) 2025 * 2026 * @param joiner the joining worker 2027 * @param task the task 2028 */ 2029 final void helpJoinOnce(WorkQueue joiner, ForkJoinTask<?> task) { 2030 int s; 2031 if (joiner != null && task != null && (s = task.status) >= 0) { 2032 ForkJoinTask<?> prevJoin = joiner.currentJoin; 2033 joiner.currentJoin = task; 2034 do {} while (joiner.tryRemoveAndExec(task) && // process local tasks 2035 (s = task.status) >= 0); 2036 if (s >= 0) { 2037 if (task instanceof CountedCompleter) 2038 helpComplete(joiner, (CountedCompleter<?>)task, Integer.MAX_VALUE); 2039 do {} while (task.status >= 0 && 2040 tryHelpStealer(joiner, task) > 0); 2041 } 2042 joiner.currentJoin = prevJoin; 2043 } 2044 } 2045 2046 /** 2047 * Returns a (probably) non-empty steal queue, if one is found 2048 * during a scan, else null. This method must be retried by 2049 * caller if, by the time it tries to use the queue, it is empty. 2050 */ 2051 private WorkQueue findNonEmptyStealQueue() { 2052 int r = ThreadLocalRandom.nextSecondarySeed(); 2053 for (;;) { 2054 int ps = plock, m; WorkQueue[] ws; WorkQueue q; 2055 if ((ws = workQueues) != null && (m = ws.length - 1) >= 0) { 2056 for (int j = (m + 1) << 2; j >= 0; --j) { 2057 if ((q = ws[(((r - j) << 1) | 1) & m]) != null && 2058 q.base - q.top < 0) 2059 return q; 2060 } 2061 } 2062 if (plock == ps) 2063 return null; 2064 } 2065 } 2066 2067 /** 2068 * Runs tasks until {@code isQuiescent()}. We piggyback on 2069 * active count ctl maintenance, but rather than blocking 2070 * when tasks cannot be found, we rescan until all others cannot 2071 * find tasks either. 2072 */ 2073 final void helpQuiescePool(WorkQueue w) { 2074 ForkJoinTask<?> ps = w.currentSteal; 2075 for (boolean active = true;;) { 2076 long c; WorkQueue q; ForkJoinTask<?> t; int b; 2077 while ((t = w.nextLocalTask()) != null) 2078 t.doExec(); 2079 if ((q = findNonEmptyStealQueue()) != null) { 2080 if (!active) { // re-establish active count 2081 active = true; 2082 do {} while (!U.compareAndSwapLong 2083 (this, CTL, c = ctl, 2084 ((c & ~AC_MASK) | 2085 ((c & AC_MASK) + AC_UNIT)))); 2086 } 2087 if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) { 2088 (w.currentSteal = t).doExec(); 2089 w.currentSteal = ps; 2090 } 2091 } 2092 else if (active) { // decrement active count without queuing 2093 long nc = ((c = ctl) & ~AC_MASK) | ((c & AC_MASK) - AC_UNIT); 2094 if ((int)(nc >> AC_SHIFT) + parallelism == 0) 2095 break; // bypass decrement-then-increment 2096 if (U.compareAndSwapLong(this, CTL, c, nc)) 2097 active = false; 2098 } 2099 else if ((int)((c = ctl) >> AC_SHIFT) + parallelism <= 0 && 2100 U.compareAndSwapLong 2101 (this, CTL, c, ((c & ~AC_MASK) | 2102 ((c & AC_MASK) + AC_UNIT)))) 2103 break; 2104 } 2105 } 2106 2107 /** 2108 * Gets and removes a local or stolen task for the given worker. 2109 * 2110 * @return a task, if available 2111 */ 2112 final ForkJoinTask<?> nextTaskFor(WorkQueue w) { 2113 for (ForkJoinTask<?> t;;) { 2114 WorkQueue q; int b; 2115 if ((t = w.nextLocalTask()) != null) 2116 return t; 2117 if ((q = findNonEmptyStealQueue()) == null) 2118 return null; 2119 if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) 2120 return t; 2121 } 2122 } 2123 2124 /** 2125 * Returns a cheap heuristic guide for task partitioning when 2126 * programmers, frameworks, tools, or languages have little or no 2127 * idea about task granularity. In essence by offering this 2128 * method, we ask users only about tradeoffs in overhead vs 2129 * expected throughput and its variance, rather than how finely to 2130 * partition tasks. 2131 * 2132 * In a steady state strict (tree-structured) computation, each 2133 * thread makes available for stealing enough tasks for other 2134 * threads to remain active. Inductively, if all threads play by 2135 * the same rules, each thread should make available only a 2136 * constant number of tasks. 2137 * 2138 * The minimum useful constant is just 1. But using a value of 1 2139 * would require immediate replenishment upon each steal to 2140 * maintain enough tasks, which is infeasible. Further, 2141 * partitionings/granularities of offered tasks should minimize 2142 * steal rates, which in general means that threads nearer the top 2143 * of computation tree should generate more than those nearer the 2144 * bottom. In perfect steady state, each thread is at 2145 * approximately the same level of computation tree. However, 2146 * producing extra tasks amortizes the uncertainty of progress and 2147 * diffusion assumptions. 2148 * 2149 * So, users will want to use values larger (but not much larger) 2150 * than 1 to both smooth over transient shortages and hedge 2151 * against uneven progress; as traded off against the cost of 2152 * extra task overhead. We leave the user to pick a threshold 2153 * value to compare with the results of this call to guide 2154 * decisions, but recommend values such as 3. 2155 * 2156 * When all threads are active, it is on average OK to estimate 2157 * surplus strictly locally. In steady-state, if one thread is 2158 * maintaining say 2 surplus tasks, then so are others. So we can 2159 * just use estimated queue length. However, this strategy alone 2160 * leads to serious mis-estimates in some non-steady-state 2161 * conditions (ramp-up, ramp-down, other stalls). We can detect 2162 * many of these by further considering the number of "idle" 2163 * threads, that are known to have zero queued tasks, so 2164 * compensate by a factor of (#idle/#active) threads. 2165 * 2166 * Note: The approximation of #busy workers as #active workers is 2167 * not very good under current signalling scheme, and should be 2168 * improved. 2169 */ 2170 static int getSurplusQueuedTaskCount() { 2171 Thread t; ForkJoinWorkerThread wt; ForkJoinPool pool; WorkQueue q; 2172 if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)) { 2173 int p = (pool = (wt = (ForkJoinWorkerThread)t).pool).parallelism; 2174 int n = (q = wt.workQueue).top - q.base; 2175 int a = (int)(pool.ctl >> AC_SHIFT) + p; 2176 return n - (a > (p >>>= 1) ? 0 : 2177 a > (p >>>= 1) ? 1 : 2178 a > (p >>>= 1) ? 2 : 2179 a > (p >>>= 1) ? 4 : 2180 8); 2181 } 2182 return 0; 2183 } 2184 2185 // Termination 2186 2187 /** 2188 * Possibly initiates and/or completes termination. The caller 2189 * triggering termination runs three passes through workQueues: 2190 * (0) Setting termination status, followed by wakeups of queued 2191 * workers; (1) cancelling all tasks; (2) interrupting lagging 2192 * threads (likely in external tasks, but possibly also blocked in 2193 * joins). Each pass repeats previous steps because of potential 2194 * lagging thread creation. 2195 * 2196 * @param now if true, unconditionally terminate, else only 2197 * if no work and no active workers 2198 * @param enable if true, enable shutdown when next possible 2199 * @return true if now terminating or terminated 2200 */ 2201 private boolean tryTerminate(boolean now, boolean enable) { 2202 int ps; 2203 if (this == common) // cannot shut down 2204 return false; 2205 if ((ps = plock) >= 0) { // enable by setting plock 2206 if (!enable) 2207 return false; 2208 if ((ps & PL_LOCK) != 0 || 2209 !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK)) 2210 ps = acquirePlock(); 2211 int nps = ((ps + PL_LOCK) & ~SHUTDOWN) | SHUTDOWN; 2212 if (!U.compareAndSwapInt(this, PLOCK, ps, nps)) 2213 releasePlock(nps); 2214 } 2215 for (long c;;) { 2216 if (((c = ctl) & STOP_BIT) != 0) { // already terminating 2217 if ((short)(c >>> TC_SHIFT) + parallelism <= 0) { 2218 synchronized (this) { 2219 notifyAll(); // signal when 0 workers 2220 } 2221 } 2222 return true; 2223 } 2224 if (!now) { // check if idle & no tasks 2225 WorkQueue[] ws; WorkQueue w; 2226 if ((int)(c >> AC_SHIFT) + parallelism > 0) 2227 return false; 2228 if ((ws = workQueues) != null) { 2229 for (int i = 0; i < ws.length; ++i) { 2230 if ((w = ws[i]) != null && 2231 (!w.isEmpty() || 2232 ((i & 1) != 0 && w.eventCount >= 0))) { 2233 signalWork(ws, w); 2234 return false; 2235 } 2236 } 2237 } 2238 } 2239 if (U.compareAndSwapLong(this, CTL, c, c | STOP_BIT)) { 2240 for (int pass = 0; pass < 3; ++pass) { 2241 WorkQueue[] ws; WorkQueue w; Thread wt; 2242 if ((ws = workQueues) != null) { 2243 int n = ws.length; 2244 for (int i = 0; i < n; ++i) { 2245 if ((w = ws[i]) != null) { 2246 w.qlock = -1; 2247 if (pass > 0) { 2248 w.cancelAll(); 2249 if (pass > 1 && (wt = w.owner) != null) { 2250 if (!wt.isInterrupted()) { 2251 try { 2252 wt.interrupt(); 2253 } catch (Throwable ignore) { 2254 } 2255 } 2256 U.unpark(wt); 2257 } 2258 } 2259 } 2260 } 2261 // Wake up workers parked on event queue 2262 int i, e; long cc; Thread p; 2263 while ((e = (int)(cc = ctl) & E_MASK) != 0 && 2264 (i = e & SMASK) < n && i >= 0 && 2265 (w = ws[i]) != null) { 2266 long nc = ((long)(w.nextWait & E_MASK) | 2267 ((cc + AC_UNIT) & AC_MASK) | 2268 (cc & (TC_MASK|STOP_BIT))); 2269 if (w.eventCount == (e | INT_SIGN) && 2270 U.compareAndSwapLong(this, CTL, cc, nc)) { 2271 w.eventCount = (e + E_SEQ) & E_MASK; 2272 w.qlock = -1; 2273 if ((p = w.parker) != null) 2274 U.unpark(p); 2275 } 2276 } 2277 } 2278 } 2279 } 2280 } 2281 } 2282 2283 // external operations on common pool 2284 2285 /** 2286 * Returns common pool queue for a thread that has submitted at 2287 * least one task. 2288 */ 2289 static WorkQueue commonSubmitterQueue() { 2290 ForkJoinPool p; WorkQueue[] ws; int m, z; 2291 return ((z = ThreadLocalRandom.getProbe()) != 0 && 2292 (p = common) != null && 2293 (ws = p.workQueues) != null && 2294 (m = ws.length - 1) >= 0) ? 2295 ws[m & z & SQMASK] : null; 2296 } 2297 2298 /** 2299 * Tries to pop the given task from submitter's queue in common pool. 2300 */ 2301 final boolean tryExternalUnpush(ForkJoinTask<?> task) { 2302 WorkQueue joiner; ForkJoinTask<?>[] a; int m, s; 2303 WorkQueue[] ws = workQueues; 2304 int z = ThreadLocalRandom.getProbe(); 2305 boolean popped = false; 2306 if (ws != null && (m = ws.length - 1) >= 0 && 2307 (joiner = ws[z & m & SQMASK]) != null && 2308 joiner.base != (s = joiner.top) && 2309 (a = joiner.array) != null) { 2310 long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE; 2311 if (U.getObject(a, j) == task && 2312 U.compareAndSwapInt(joiner, QLOCK, 0, 1)) { 2313 if (joiner.top == s && joiner.array == a && 2314 U.compareAndSwapObject(a, j, task, null)) { 2315 joiner.top = s - 1; 2316 popped = true; 2317 } 2318 joiner.qlock = 0; 2319 } 2320 } 2321 return popped; 2322 } 2323 2324 final int externalHelpComplete(CountedCompleter<?> task, int maxTasks) { 2325 WorkQueue joiner; int m; 2326 WorkQueue[] ws = workQueues; 2327 int j = ThreadLocalRandom.getProbe(); 2328 int s = 0; 2329 if (ws != null && (m = ws.length - 1) >= 0 && 2330 (joiner = ws[j & m & SQMASK]) != null && task != null) { 2331 int scans = m + m + 1; 2332 long c = 0L; // for stability check 2333 j |= 1; // poll odd queues 2334 for (int k = scans; ; j += 2) { 2335 WorkQueue q; 2336 if ((s = task.status) < 0) 2337 break; 2338 else if (joiner.externalPopAndExecCC(task)) { 2339 if (--maxTasks <= 0) { 2340 s = task.status; 2341 break; 2342 } 2343 k = scans; 2344 } 2345 else if ((s = task.status) < 0) 2346 break; 2347 else if ((q = ws[j & m]) != null && q.pollAndExecCC(task)) { 2348 if (--maxTasks <= 0) { 2349 s = task.status; 2350 break; 2351 } 2352 k = scans; 2353 } 2354 else if (--k < 0) { 2355 if (c == (c = ctl)) 2356 break; 2357 k = scans; 2358 } 2359 } 2360 } 2361 return s; 2362 } 2363 2364 // Exported methods 2365 2366 // Constructors 2367 2368 /** 2369 * Creates a {@code ForkJoinPool} with parallelism equal to {@link 2370 * java.lang.Runtime#availableProcessors}, using the {@linkplain 2371 * #defaultForkJoinWorkerThreadFactory default thread factory}, 2372 * no UncaughtExceptionHandler, and non-async LIFO processing mode. 2373 * 2374 * @throws SecurityException if a security manager exists and 2375 * the caller is not permitted to modify threads 2376 * because it does not hold {@link 2377 * java.lang.RuntimePermission}{@code ("modifyThread")} 2378 */ 2379 public ForkJoinPool() { 2380 this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()), 2381 defaultForkJoinWorkerThreadFactory, null, false); 2382 } 2383 2384 /** 2385 * Creates a {@code ForkJoinPool} with the indicated parallelism 2386 * level, the {@linkplain 2387 * #defaultForkJoinWorkerThreadFactory default thread factory}, 2388 * no UncaughtExceptionHandler, and non-async LIFO processing mode. 2389 * 2390 * @param parallelism the parallelism level 2391 * @throws IllegalArgumentException if parallelism less than or 2392 * equal to zero, or greater than implementation limit 2393 * @throws SecurityException if a security manager exists and 2394 * the caller is not permitted to modify threads 2395 * because it does not hold {@link 2396 * java.lang.RuntimePermission}{@code ("modifyThread")} 2397 */ 2398 public ForkJoinPool(int parallelism) { 2399 this(parallelism, defaultForkJoinWorkerThreadFactory, null, false); 2400 } 2401 2402 /** 2403 * Creates a {@code ForkJoinPool} with the given parameters. 2404 * 2405 * @param parallelism the parallelism level. For default value, 2406 * use {@link java.lang.Runtime#availableProcessors}. 2407 * @param factory the factory for creating new threads. For default value, 2408 * use {@link #defaultForkJoinWorkerThreadFactory}. 2409 * @param handler the handler for internal worker threads that 2410 * terminate due to unrecoverable errors encountered while executing 2411 * tasks. For default value, use {@code null}. 2412 * @param asyncMode if true, 2413 * establishes local first-in-first-out scheduling mode for forked 2414 * tasks that are never joined. This mode may be more appropriate 2415 * than default locally stack-based mode in applications in which 2416 * worker threads only process event-style asynchronous tasks. 2417 * For default value, use {@code false}. 2418 * @throws IllegalArgumentException if parallelism less than or 2419 * equal to zero, or greater than implementation limit 2420 * @throws NullPointerException if the factory is null 2421 * @throws SecurityException if a security manager exists and 2422 * the caller is not permitted to modify threads 2423 * because it does not hold {@link 2424 * java.lang.RuntimePermission}{@code ("modifyThread")} 2425 */ 2426 public ForkJoinPool(int parallelism, 2427 ForkJoinWorkerThreadFactory factory, 2428 UncaughtExceptionHandler handler, 2429 boolean asyncMode) { 2430 this(checkParallelism(parallelism), 2431 checkFactory(factory), 2432 handler, 2433 (asyncMode ? FIFO_QUEUE : LIFO_QUEUE), 2434 "ForkJoinPool-" + nextPoolId() + "-worker-"); 2435 checkPermission(); 2436 } 2437 2438 private static int checkParallelism(int parallelism) { 2439 if (parallelism <= 0 || parallelism > MAX_CAP) 2440 throw new IllegalArgumentException(); 2441 return parallelism; 2442 } 2443 2444 private static ForkJoinWorkerThreadFactory checkFactory 2445 (ForkJoinWorkerThreadFactory factory) { 2446 if (factory == null) 2447 throw new NullPointerException(); 2448 return factory; 2449 } 2450 2451 /** 2452 * Creates a {@code ForkJoinPool} with the given parameters, without 2453 * any security checks or parameter validation. Invoked directly by 2454 * makeCommonPool. 2455 */ 2456 private ForkJoinPool(int parallelism, 2457 ForkJoinWorkerThreadFactory factory, 2458 UncaughtExceptionHandler handler, 2459 int mode, 2460 String workerNamePrefix) { 2461 this.workerNamePrefix = workerNamePrefix; 2462 this.factory = factory; 2463 this.ueh = handler; 2464 this.mode = (short)mode; 2465 this.parallelism = (short)parallelism; 2466 long np = (long)(-parallelism); // offset ctl counts 2467 this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK); 2468 } 2469 2470 /** 2471 * Returns the common pool instance. This pool is statically 2472 * constructed; its run state is unaffected by attempts to {@link 2473 * #shutdown} or {@link #shutdownNow}. However this pool and any 2474 * ongoing processing are automatically terminated upon program 2475 * {@link System#exit}. Any program that relies on asynchronous 2476 * task processing to complete before program termination should 2477 * invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence}, 2478 * before exit. 2479 * 2480 * @return the common pool instance 2481 * @since 1.8 2482 */ 2483 public static ForkJoinPool commonPool() { 2484 // assert common != null : "static init error"; 2485 return common; 2486 } 2487 2488 // Execution methods 2489 2490 /** 2491 * Performs the given task, returning its result upon completion. 2492 * If the computation encounters an unchecked Exception or Error, 2493 * it is rethrown as the outcome of this invocation. Rethrown 2494 * exceptions behave in the same way as regular exceptions, but, 2495 * when possible, contain stack traces (as displayed for example 2496 * using {@code ex.printStackTrace()}) of both the current thread 2497 * as well as the thread actually encountering the exception; 2498 * minimally only the latter. 2499 * 2500 * @param task the task 2501 * @param <T> the type of the task's result 2502 * @return the task's result 2503 * @throws NullPointerException if the task is null 2504 * @throws RejectedExecutionException if the task cannot be 2505 * scheduled for execution 2506 */ 2507 public <T> T invoke(ForkJoinTask<T> task) { 2508 if (task == null) 2509 throw new NullPointerException(); 2510 externalPush(task); 2511 return task.join(); 2512 } 2513 2514 /** 2515 * Arranges for (asynchronous) execution of the given task. 2516 * 2517 * @param task the task 2518 * @throws NullPointerException if the task is null 2519 * @throws RejectedExecutionException if the task cannot be 2520 * scheduled for execution 2521 */ 2522 public void execute(ForkJoinTask<?> task) { 2523 if (task == null) 2524 throw new NullPointerException(); 2525 externalPush(task); 2526 } 2527 2528 // AbstractExecutorService methods 2529 2530 /** 2531 * @throws NullPointerException if the task is null 2532 * @throws RejectedExecutionException if the task cannot be 2533 * scheduled for execution 2534 */ 2535 public void execute(Runnable task) { 2536 if (task == null) 2537 throw new NullPointerException(); 2538 ForkJoinTask<?> job; 2539 if (task instanceof ForkJoinTask<?>) // avoid re-wrap 2540 job = (ForkJoinTask<?>) task; 2541 else 2542 job = new ForkJoinTask.RunnableExecuteAction(task); 2543 externalPush(job); 2544 } 2545 2546 /** 2547 * Submits a ForkJoinTask for execution. 2548 * 2549 * @param task the task to submit 2550 * @param <T> the type of the task's result 2551 * @return the task 2552 * @throws NullPointerException if the task is null 2553 * @throws RejectedExecutionException if the task cannot be 2554 * scheduled for execution 2555 */ 2556 public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) { 2557 if (task == null) 2558 throw new NullPointerException(); 2559 externalPush(task); 2560 return task; 2561 } 2562 2563 /** 2564 * @throws NullPointerException if the task is null 2565 * @throws RejectedExecutionException if the task cannot be 2566 * scheduled for execution 2567 */ 2568 public <T> ForkJoinTask<T> submit(Callable<T> task) { 2569 ForkJoinTask<T> job = new ForkJoinTask.AdaptedCallable<T>(task); 2570 externalPush(job); 2571 return job; 2572 } 2573 2574 /** 2575 * @throws NullPointerException if the task is null 2576 * @throws RejectedExecutionException if the task cannot be 2577 * scheduled for execution 2578 */ 2579 public <T> ForkJoinTask<T> submit(Runnable task, T result) { 2580 ForkJoinTask<T> job = new ForkJoinTask.AdaptedRunnable<T>(task, result); 2581 externalPush(job); 2582 return job; 2583 } 2584 2585 /** 2586 * @throws NullPointerException if the task is null 2587 * @throws RejectedExecutionException if the task cannot be 2588 * scheduled for execution 2589 */ 2590 public ForkJoinTask<?> submit(Runnable task) { 2591 if (task == null) 2592 throw new NullPointerException(); 2593 ForkJoinTask<?> job; 2594 if (task instanceof ForkJoinTask<?>) // avoid re-wrap 2595 job = (ForkJoinTask<?>) task; 2596 else 2597 job = new ForkJoinTask.AdaptedRunnableAction(task); 2598 externalPush(job); 2599 return job; 2600 } 2601 2602 /** 2603 * @throws NullPointerException {@inheritDoc} 2604 * @throws RejectedExecutionException {@inheritDoc} 2605 */ 2606 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) { 2607 // In previous versions of this class, this method constructed 2608 // a task to run ForkJoinTask.invokeAll, but now external 2609 // invocation of multiple tasks is at least as efficient. 2610 ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); 2611 2612 boolean done = false; 2613 try { 2614 for (Callable<T> t : tasks) { 2615 ForkJoinTask<T> f = new ForkJoinTask.AdaptedCallable<T>(t); 2616 futures.add(f); 2617 externalPush(f); 2618 } 2619 for (int i = 0, size = futures.size(); i < size; i++) 2620 ((ForkJoinTask<?>)futures.get(i)).quietlyJoin(); 2621 done = true; 2622 return futures; 2623 } finally { 2624 if (!done) 2625 for (int i = 0, size = futures.size(); i < size; i++) 2626 futures.get(i).cancel(false); 2627 } 2628 } 2629 2630 /** 2631 * Returns the factory used for constructing new workers. 2632 * 2633 * @return the factory used for constructing new workers 2634 */ 2635 public ForkJoinWorkerThreadFactory getFactory() { 2636 return factory; 2637 } 2638 2639 /** 2640 * Returns the handler for internal worker threads that terminate 2641 * due to unrecoverable errors encountered while executing tasks. 2642 * 2643 * @return the handler, or {@code null} if none 2644 */ 2645 public UncaughtExceptionHandler getUncaughtExceptionHandler() { 2646 return ueh; 2647 } 2648 2649 /** 2650 * Returns the targeted parallelism level of this pool. 2651 * 2652 * @return the targeted parallelism level of this pool 2653 */ 2654 public int getParallelism() { 2655 int par; 2656 return ((par = parallelism) > 0) ? par : 1; 2657 } 2658 2659 /** 2660 * Returns the targeted parallelism level of the common pool. 2661 * 2662 * @return the targeted parallelism level of the common pool 2663 * @since 1.8 2664 */ 2665 public static int getCommonPoolParallelism() { 2666 return commonParallelism; 2667 } 2668 2669 /** 2670 * Returns the number of worker threads that have started but not 2671 * yet terminated. The result returned by this method may differ 2672 * from {@link #getParallelism} when threads are created to 2673 * maintain parallelism when others are cooperatively blocked. 2674 * 2675 * @return the number of worker threads 2676 */ 2677 public int getPoolSize() { 2678 return parallelism + (short)(ctl >>> TC_SHIFT); 2679 } 2680 2681 /** 2682 * Returns {@code true} if this pool uses local first-in-first-out 2683 * scheduling mode for forked tasks that are never joined. 2684 * 2685 * @return {@code true} if this pool uses async mode 2686 */ 2687 public boolean getAsyncMode() { 2688 return mode == FIFO_QUEUE; 2689 } 2690 2691 /** 2692 * Returns an estimate of the number of worker threads that are 2693 * not blocked waiting to join tasks or for other managed 2694 * synchronization. This method may overestimate the 2695 * number of running threads. 2696 * 2697 * @return the number of worker threads 2698 */ 2699 public int getRunningThreadCount() { 2700 int rc = 0; 2701 WorkQueue[] ws; WorkQueue w; 2702 if ((ws = workQueues) != null) { 2703 for (int i = 1; i < ws.length; i += 2) { 2704 if ((w = ws[i]) != null && w.isApparentlyUnblocked()) 2705 ++rc; 2706 } 2707 } 2708 return rc; 2709 } 2710 2711 /** 2712 * Returns an estimate of the number of threads that are currently 2713 * stealing or executing tasks. This method may overestimate the 2714 * number of active threads. 2715 * 2716 * @return the number of active threads 2717 */ 2718 public int getActiveThreadCount() { 2719 int r = parallelism + (int)(ctl >> AC_SHIFT); 2720 return (r <= 0) ? 0 : r; // suppress momentarily negative values 2721 } 2722 2723 /** 2724 * Returns {@code true} if all worker threads are currently idle. 2725 * An idle worker is one that cannot obtain a task to execute 2726 * because none are available to steal from other threads, and 2727 * there are no pending submissions to the pool. This method is 2728 * conservative; it might not return {@code true} immediately upon 2729 * idleness of all threads, but will eventually become true if 2730 * threads remain inactive. 2731 * 2732 * @return {@code true} if all threads are currently idle 2733 */ 2734 public boolean isQuiescent() { 2735 return parallelism + (int)(ctl >> AC_SHIFT) <= 0; 2736 } 2737 2738 /** 2739 * Returns an estimate of the total number of tasks stolen from 2740 * one thread's work queue by another. The reported value 2741 * underestimates the actual total number of steals when the pool 2742 * is not quiescent. This value may be useful for monitoring and 2743 * tuning fork/join programs: in general, steal counts should be 2744 * high enough to keep threads busy, but low enough to avoid 2745 * overhead and contention across threads. 2746 * 2747 * @return the number of steals 2748 */ 2749 public long getStealCount() { 2750 long count = stealCount; 2751 WorkQueue[] ws; WorkQueue w; 2752 if ((ws = workQueues) != null) { 2753 for (int i = 1; i < ws.length; i += 2) { 2754 if ((w = ws[i]) != null) 2755 count += w.nsteals; 2756 } 2757 } 2758 return count; 2759 } 2760 2761 /** 2762 * Returns an estimate of the total number of tasks currently held 2763 * in queues by worker threads (but not including tasks submitted 2764 * to the pool that have not begun executing). This value is only 2765 * an approximation, obtained by iterating across all threads in 2766 * the pool. This method may be useful for tuning task 2767 * granularities. 2768 * 2769 * @return the number of queued tasks 2770 */ 2771 public long getQueuedTaskCount() { 2772 long count = 0; 2773 WorkQueue[] ws; WorkQueue w; 2774 if ((ws = workQueues) != null) { 2775 for (int i = 1; i < ws.length; i += 2) { 2776 if ((w = ws[i]) != null) 2777 count += w.queueSize(); 2778 } 2779 } 2780 return count; 2781 } 2782 2783 /** 2784 * Returns an estimate of the number of tasks submitted to this 2785 * pool that have not yet begun executing. This method may take 2786 * time proportional to the number of submissions. 2787 * 2788 * @return the number of queued submissions 2789 */ 2790 public int getQueuedSubmissionCount() { 2791 int count = 0; 2792 WorkQueue[] ws; WorkQueue w; 2793 if ((ws = workQueues) != null) { 2794 for (int i = 0; i < ws.length; i += 2) { 2795 if ((w = ws[i]) != null) 2796 count += w.queueSize(); 2797 } 2798 } 2799 return count; 2800 } 2801 2802 /** 2803 * Returns {@code true} if there are any tasks submitted to this 2804 * pool that have not yet begun executing. 2805 * 2806 * @return {@code true} if there are any queued submissions 2807 */ 2808 public boolean hasQueuedSubmissions() { 2809 WorkQueue[] ws; WorkQueue w; 2810 if ((ws = workQueues) != null) { 2811 for (int i = 0; i < ws.length; i += 2) { 2812 if ((w = ws[i]) != null && !w.isEmpty()) 2813 return true; 2814 } 2815 } 2816 return false; 2817 } 2818 2819 /** 2820 * Removes and returns the next unexecuted submission if one is 2821 * available. This method may be useful in extensions to this 2822 * class that re-assign work in systems with multiple pools. 2823 * 2824 * @return the next submission, or {@code null} if none 2825 */ 2826 protected ForkJoinTask<?> pollSubmission() { 2827 WorkQueue[] ws; WorkQueue w; ForkJoinTask<?> t; 2828 if ((ws = workQueues) != null) { 2829 for (int i = 0; i < ws.length; i += 2) { 2830 if ((w = ws[i]) != null && (t = w.poll()) != null) 2831 return t; 2832 } 2833 } 2834 return null; 2835 } 2836 2837 /** 2838 * Removes all available unexecuted submitted and forked tasks 2839 * from scheduling queues and adds them to the given collection, 2840 * without altering their execution status. These may include 2841 * artificially generated or wrapped tasks. This method is 2842 * designed to be invoked only when the pool is known to be 2843 * quiescent. Invocations at other times may not remove all 2844 * tasks. A failure encountered while attempting to add elements 2845 * to collection {@code c} may result in elements being in 2846 * neither, either or both collections when the associated 2847 * exception is thrown. The behavior of this operation is 2848 * undefined if the specified collection is modified while the 2849 * operation is in progress. 2850 * 2851 * @param c the collection to transfer elements into 2852 * @return the number of elements transferred 2853 */ 2854 protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) { 2855 int count = 0; 2856 WorkQueue[] ws; WorkQueue w; ForkJoinTask<?> t; 2857 if ((ws = workQueues) != null) { 2858 for (int i = 0; i < ws.length; ++i) { 2859 if ((w = ws[i]) != null) { 2860 while ((t = w.poll()) != null) { 2861 c.add(t); 2862 ++count; 2863 } 2864 } 2865 } 2866 } 2867 return count; 2868 } 2869 2870 /** 2871 * Returns a string identifying this pool, as well as its state, 2872 * including indications of run state, parallelism level, and 2873 * worker and task counts. 2874 * 2875 * @return a string identifying this pool, as well as its state 2876 */ 2877 public String toString() { 2878 // Use a single pass through workQueues to collect counts 2879 long qt = 0L, qs = 0L; int rc = 0; 2880 long st = stealCount; 2881 long c = ctl; 2882 WorkQueue[] ws; WorkQueue w; 2883 if ((ws = workQueues) != null) { 2884 for (int i = 0; i < ws.length; ++i) { 2885 if ((w = ws[i]) != null) { 2886 int size = w.queueSize(); 2887 if ((i & 1) == 0) 2888 qs += size; 2889 else { 2890 qt += size; 2891 st += w.nsteals; 2892 if (w.isApparentlyUnblocked()) 2893 ++rc; 2894 } 2895 } 2896 } 2897 } 2898 int pc = parallelism; 2899 int tc = pc + (short)(c >>> TC_SHIFT); 2900 int ac = pc + (int)(c >> AC_SHIFT); 2901 if (ac < 0) // ignore transient negative 2902 ac = 0; 2903 String level; 2904 if ((c & STOP_BIT) != 0) 2905 level = (tc == 0) ? "Terminated" : "Terminating"; 2906 else 2907 level = plock < 0 ? "Shutting down" : "Running"; 2908 return super.toString() + 2909 "[" + level + 2910 ", parallelism = " + pc + 2911 ", size = " + tc + 2912 ", active = " + ac + 2913 ", running = " + rc + 2914 ", steals = " + st + 2915 ", tasks = " + qt + 2916 ", submissions = " + qs + 2917 "]"; 2918 } 2919 2920 /** 2921 * Possibly initiates an orderly shutdown in which previously 2922 * submitted tasks are executed, but no new tasks will be 2923 * accepted. Invocation has no effect on execution state if this 2924 * is the {@link #commonPool()}, and no additional effect if 2925 * already shut down. Tasks that are in the process of being 2926 * submitted concurrently during the course of this method may or 2927 * may not be rejected. 2928 * 2929 * @throws SecurityException if a security manager exists and 2930 * the caller is not permitted to modify threads 2931 * because it does not hold {@link 2932 * java.lang.RuntimePermission}{@code ("modifyThread")} 2933 */ 2934 public void shutdown() { 2935 checkPermission(); 2936 tryTerminate(false, true); 2937 } 2938 2939 /** 2940 * Possibly attempts to cancel and/or stop all tasks, and reject 2941 * all subsequently submitted tasks. Invocation has no effect on 2942 * execution state if this is the {@link #commonPool()}, and no 2943 * additional effect if already shut down. Otherwise, tasks that 2944 * are in the process of being submitted or executed concurrently 2945 * during the course of this method may or may not be 2946 * rejected. This method cancels both existing and unexecuted 2947 * tasks, in order to permit termination in the presence of task 2948 * dependencies. So the method always returns an empty list 2949 * (unlike the case for some other Executors). 2950 * 2951 * @return an empty list 2952 * @throws SecurityException if a security manager exists and 2953 * the caller is not permitted to modify threads 2954 * because it does not hold {@link 2955 * java.lang.RuntimePermission}{@code ("modifyThread")} 2956 */ 2957 public List<Runnable> shutdownNow() { 2958 checkPermission(); 2959 tryTerminate(true, true); 2960 return Collections.emptyList(); 2961 } 2962 2963 /** 2964 * Returns {@code true} if all tasks have completed following shut down. 2965 * 2966 * @return {@code true} if all tasks have completed following shut down 2967 */ 2968 public boolean isTerminated() { 2969 long c = ctl; 2970 return ((c & STOP_BIT) != 0L && 2971 (short)(c >>> TC_SHIFT) + parallelism <= 0); 2972 } 2973 2974 /** 2975 * Returns {@code true} if the process of termination has 2976 * commenced but not yet completed. This method may be useful for 2977 * debugging. A return of {@code true} reported a sufficient 2978 * period after shutdown may indicate that submitted tasks have 2979 * ignored or suppressed interruption, or are waiting for I/O, 2980 * causing this executor not to properly terminate. (See the 2981 * advisory notes for class {@link ForkJoinTask} stating that 2982 * tasks should not normally entail blocking operations. But if 2983 * they do, they must abort them on interrupt.) 2984 * 2985 * @return {@code true} if terminating but not yet terminated 2986 */ 2987 public boolean isTerminating() { 2988 long c = ctl; 2989 return ((c & STOP_BIT) != 0L && 2990 (short)(c >>> TC_SHIFT) + parallelism > 0); 2991 } 2992 2993 /** 2994 * Returns {@code true} if this pool has been shut down. 2995 * 2996 * @return {@code true} if this pool has been shut down 2997 */ 2998 public boolean isShutdown() { 2999 return plock < 0; 3000 } 3001 3002 /** 3003 * Blocks until all tasks have completed execution after a 3004 * shutdown request, or the timeout occurs, or the current thread 3005 * is interrupted, whichever happens first. Because the {@link 3006 * #commonPool()} never terminates until program shutdown, when 3007 * applied to the common pool, this method is equivalent to {@link 3008 * #awaitQuiescence(long, TimeUnit)} but always returns {@code false}. 3009 * 3010 * @param timeout the maximum time to wait 3011 * @param unit the time unit of the timeout argument 3012 * @return {@code true} if this executor terminated and 3013 * {@code false} if the timeout elapsed before termination 3014 * @throws InterruptedException if interrupted while waiting 3015 */ 3016 public boolean awaitTermination(long timeout, TimeUnit unit) 3017 throws InterruptedException { 3018 if (Thread.interrupted()) 3019 throw new InterruptedException(); 3020 if (this == common) { 3021 awaitQuiescence(timeout, unit); 3022 return false; 3023 } 3024 long nanos = unit.toNanos(timeout); 3025 if (isTerminated()) 3026 return true; 3027 if (nanos <= 0L) 3028 return false; 3029 long deadline = System.nanoTime() + nanos; 3030 synchronized (this) { 3031 for (;;) { 3032 if (isTerminated()) 3033 return true; 3034 if (nanos <= 0L) 3035 return false; 3036 long millis = TimeUnit.NANOSECONDS.toMillis(nanos); 3037 wait(millis > 0L ? millis : 1L); 3038 nanos = deadline - System.nanoTime(); 3039 } 3040 } 3041 } 3042 3043 /** 3044 * If called by a ForkJoinTask operating in this pool, equivalent 3045 * in effect to {@link ForkJoinTask#helpQuiesce}. Otherwise, 3046 * waits and/or attempts to assist performing tasks until this 3047 * pool {@link #isQuiescent} or the indicated timeout elapses. 3048 * 3049 * @param timeout the maximum time to wait 3050 * @param unit the time unit of the timeout argument 3051 * @return {@code true} if quiescent; {@code false} if the 3052 * timeout elapsed. 3053 */ 3054 public boolean awaitQuiescence(long timeout, TimeUnit unit) { 3055 long nanos = unit.toNanos(timeout); 3056 ForkJoinWorkerThread wt; 3057 Thread thread = Thread.currentThread(); 3058 if ((thread instanceof ForkJoinWorkerThread) && 3059 (wt = (ForkJoinWorkerThread)thread).pool == this) { 3060 helpQuiescePool(wt.workQueue); 3061 return true; 3062 } 3063 long startTime = System.nanoTime(); 3064 WorkQueue[] ws; 3065 int r = 0, m; 3066 boolean found = true; 3067 while (!isQuiescent() && (ws = workQueues) != null && 3068 (m = ws.length - 1) >= 0) { 3069 if (!found) { 3070 if ((System.nanoTime() - startTime) > nanos) 3071 return false; 3072 Thread.yield(); // cannot block 3073 } 3074 found = false; 3075 for (int j = (m + 1) << 2; j >= 0; --j) { 3076 ForkJoinTask<?> t; WorkQueue q; int b; 3077 if ((q = ws[r++ & m]) != null && (b = q.base) - q.top < 0) { 3078 found = true; 3079 if ((t = q.pollAt(b)) != null) 3080 t.doExec(); 3081 break; 3082 } 3083 } 3084 } 3085 return true; 3086 } 3087 3088 /** 3089 * Waits and/or attempts to assist performing tasks indefinitely 3090 * until the {@link #commonPool()} {@link #isQuiescent}. 3091 */ 3092 static void quiesceCommonPool() { 3093 common.awaitQuiescence(Long.MAX_VALUE, TimeUnit.NANOSECONDS); 3094 } 3095 3096 /** 3097 * Interface for extending managed parallelism for tasks running 3098 * in {@link ForkJoinPool}s. 3099 * 3100 * <p>A {@code ManagedBlocker} provides two methods. Method 3101 * {@code isReleasable} must return {@code true} if blocking is 3102 * not necessary. Method {@code block} blocks the current thread 3103 * if necessary (perhaps internally invoking {@code isReleasable} 3104 * before actually blocking). These actions are performed by any 3105 * thread invoking {@link ForkJoinPool#managedBlock(ManagedBlocker)}. 3106 * The unusual methods in this API accommodate synchronizers that 3107 * may, but don't usually, block for long periods. Similarly, they 3108 * allow more efficient internal handling of cases in which 3109 * additional workers may be, but usually are not, needed to 3110 * ensure sufficient parallelism. Toward this end, 3111 * implementations of method {@code isReleasable} must be amenable 3112 * to repeated invocation. 3113 * 3114 * <p>For example, here is a ManagedBlocker based on a 3115 * ReentrantLock: 3116 * <pre> {@code 3117 * class ManagedLocker implements ManagedBlocker { 3118 * final ReentrantLock lock; 3119 * boolean hasLock = false; 3120 * ManagedLocker(ReentrantLock lock) { this.lock = lock; } 3121 * public boolean block() { 3122 * if (!hasLock) 3123 * lock.lock(); 3124 * return true; 3125 * } 3126 * public boolean isReleasable() { 3127 * return hasLock || (hasLock = lock.tryLock()); 3128 * } 3129 * }}</pre> 3130 * 3131 * <p>Here is a class that possibly blocks waiting for an 3132 * item on a given queue: 3133 * <pre> {@code 3134 * class QueueTaker<E> implements ManagedBlocker { 3135 * final BlockingQueue<E> queue; 3136 * volatile E item = null; 3137 * QueueTaker(BlockingQueue<E> q) { this.queue = q; } 3138 * public boolean block() throws InterruptedException { 3139 * if (item == null) 3140 * item = queue.take(); 3141 * return true; 3142 * } 3143 * public boolean isReleasable() { 3144 * return item != null || (item = queue.poll()) != null; 3145 * } 3146 * public E getItem() { // call after pool.managedBlock completes 3147 * return item; 3148 * } 3149 * }}</pre> 3150 */ 3151 public static interface ManagedBlocker { 3152 /** 3153 * Possibly blocks the current thread, for example waiting for 3154 * a lock or condition. 3155 * 3156 * @return {@code true} if no additional blocking is necessary 3157 * (i.e., if isReleasable would return true) 3158 * @throws InterruptedException if interrupted while waiting 3159 * (the method is not required to do so, but is allowed to) 3160 */ 3161 boolean block() throws InterruptedException; 3162 3163 /** 3164 * Returns {@code true} if blocking is unnecessary. 3165 * @return {@code true} if blocking is unnecessary 3166 */ 3167 boolean isReleasable(); 3168 } 3169 3170 /** 3171 * Blocks in accord with the given blocker. If the current thread 3172 * is a {@link ForkJoinWorkerThread}, this method possibly 3173 * arranges for a spare thread to be activated if necessary to 3174 * ensure sufficient parallelism while the current thread is blocked. 3175 * 3176 * <p>If the caller is not a {@link ForkJoinTask}, this method is 3177 * behaviorally equivalent to 3178 * <pre> {@code 3179 * while (!blocker.isReleasable()) 3180 * if (blocker.block()) 3181 * return; 3182 * }</pre> 3183 * 3184 * If the caller is a {@code ForkJoinTask}, then the pool may 3185 * first be expanded to ensure parallelism, and later adjusted. 3186 * 3187 * @param blocker the blocker 3188 * @throws InterruptedException if blocker.block did so 3189 */ 3190 public static void managedBlock(ManagedBlocker blocker) 3191 throws InterruptedException { 3192 Thread t = Thread.currentThread(); 3193 if (t instanceof ForkJoinWorkerThread) { 3194 ForkJoinPool p = ((ForkJoinWorkerThread)t).pool; 3195 while (!blocker.isReleasable()) { 3196 if (p.tryCompensate(p.ctl)) { 3197 try { 3198 do {} while (!blocker.isReleasable() && 3199 !blocker.block()); 3200 } finally { 3201 p.incrementActiveCount(); 3202 } 3203 break; 3204 } 3205 } 3206 } 3207 else { 3208 do {} while (!blocker.isReleasable() && 3209 !blocker.block()); 3210 } 3211 } 3212 3213 // AbstractExecutorService overrides. These rely on undocumented 3214 // fact that ForkJoinTask.adapt returns ForkJoinTasks that also 3215 // implement RunnableFuture. 3216 3217 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { 3218 return new ForkJoinTask.AdaptedRunnable<T>(runnable, value); 3219 } 3220 3221 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { 3222 return new ForkJoinTask.AdaptedCallable<T>(callable); 3223 } 3224 3225 // Unsafe mechanics 3226 private static final sun.misc.Unsafe U; 3227 private static final long CTL; 3228 private static final long PARKBLOCKER; 3229 private static final int ABASE; 3230 private static final int ASHIFT; 3231 private static final long STEALCOUNT; 3232 private static final long PLOCK; 3233 private static final long INDEXSEED; 3234 private static final long QBASE; 3235 private static final long QLOCK; 3236 3237 static { 3238 // initialize field offsets for CAS etc 3239 try { 3240 U = sun.misc.Unsafe.getUnsafe(); 3241 Class<?> k = ForkJoinPool.class; 3242 CTL = U.objectFieldOffset 3243 (k.getDeclaredField("ctl")); 3244 STEALCOUNT = U.objectFieldOffset 3245 (k.getDeclaredField("stealCount")); 3246 PLOCK = U.objectFieldOffset 3247 (k.getDeclaredField("plock")); 3248 INDEXSEED = U.objectFieldOffset 3249 (k.getDeclaredField("indexSeed")); 3250 Class<?> tk = Thread.class; 3251 PARKBLOCKER = U.objectFieldOffset 3252 (tk.getDeclaredField("parkBlocker")); 3253 Class<?> wk = WorkQueue.class; 3254 QBASE = U.objectFieldOffset 3255 (wk.getDeclaredField("base")); 3256 QLOCK = U.objectFieldOffset 3257 (wk.getDeclaredField("qlock")); 3258 Class<?> ak = ForkJoinTask[].class; 3259 ABASE = U.arrayBaseOffset(ak); 3260 int scale = U.arrayIndexScale(ak); 3261 if ((scale & (scale - 1)) != 0) 3262 throw new Error("data type scale not a power of two"); 3263 ASHIFT = 31 - Integer.numberOfLeadingZeros(scale); 3264 } catch (Exception e) { 3265 throw new Error(e); 3266 } 3267 3268 defaultForkJoinWorkerThreadFactory = 3269 new DefaultForkJoinWorkerThreadFactory(); 3270 modifyThreadPermission = new RuntimePermission("modifyThread"); 3271 3272 common = java.security.AccessController.doPrivileged 3273 (new java.security.PrivilegedAction<ForkJoinPool>() { 3274 public ForkJoinPool run() { return makeCommonPool(); }}); 3275 int par = common.parallelism; // report 1 even if threads disabled 3276 commonParallelism = par > 0 ? par : 1; 3277 } 3278 3279 /** 3280 * Creates and returns the common pool, respecting user settings 3281 * specified via system properties. 3282 */ 3283 private static ForkJoinPool makeCommonPool() { 3284 int parallelism = -1; 3285 ForkJoinWorkerThreadFactory factory 3286 = defaultForkJoinWorkerThreadFactory; 3287 UncaughtExceptionHandler handler = null; 3288 try { // ignore exceptions in accessing/parsing properties 3289 String pp = System.getProperty 3290 ("java.util.concurrent.ForkJoinPool.common.parallelism"); 3291 String fp = System.getProperty 3292 ("java.util.concurrent.ForkJoinPool.common.threadFactory"); 3293 String hp = System.getProperty 3294 ("java.util.concurrent.ForkJoinPool.common.exceptionHandler"); 3295 if (pp != null) 3296 parallelism = Integer.parseInt(pp); 3297 if (fp != null) 3298 factory = ((ForkJoinWorkerThreadFactory)ClassLoader. 3299 getSystemClassLoader().loadClass(fp).newInstance()); 3300 if (hp != null) 3301 handler = ((UncaughtExceptionHandler)ClassLoader. 3302 getSystemClassLoader().loadClass(hp).newInstance()); 3303 } catch (Exception ignore) { 3304 } 3305 3306 if (parallelism < 0 && // default 1 less than #cores 3307 (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0) 3308 parallelism = 1; 3309 if (parallelism > MAX_CAP) 3310 parallelism = MAX_CAP; 3311 return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE, 3312 "ForkJoinPool.commonPool-worker-"); 3313 } 3314 3315 }