1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent; 37 38 import java.util.ArrayList; 39 import java.util.Arrays; 40 import java.util.Collection; 41 import java.util.Collections; 42 import java.util.List; 43 import java.util.concurrent.AbstractExecutorService; 44 import java.util.concurrent.Callable; 45 import java.util.concurrent.ExecutorService; 46 import java.util.concurrent.Future; 47 import java.util.concurrent.RejectedExecutionException; 48 import java.util.concurrent.RunnableFuture; 49 import java.util.concurrent.TimeUnit; 50 51 /** 52 * An {@link ExecutorService} for running {@link ForkJoinTask}s. 53 * A {@code ForkJoinPool} provides the entry point for submissions 54 * from non-{@code ForkJoinTask} clients, as well as management and 55 * monitoring operations. 56 * 57 * <p>A {@code ForkJoinPool} differs from other kinds of {@link 58 * ExecutorService} mainly by virtue of employing 59 * <em>work-stealing</em>: all threads in the pool attempt to find and 60 * execute tasks submitted to the pool and/or created by other active 61 * tasks (eventually blocking waiting for work if none exist). This 62 * enables efficient processing when most tasks spawn other subtasks 63 * (as do most {@code ForkJoinTask}s), as well as when many small 64 * tasks are submitted to the pool from external clients. Especially 65 * when setting <em>asyncMode</em> to true in constructors, {@code 66 * ForkJoinPool}s may also be appropriate for use with event-style 67 * tasks that are never joined. 68 * 69 * <p>A static {@link #commonPool()} is available and appropriate for 70 * most applications. The common pool is used by any ForkJoinTask that 71 * is not explicitly submitted to a specified pool. Using the common 72 * pool normally reduces resource usage (its threads are slowly 73 * reclaimed during periods of non-use, and reinstated upon subsequent 74 * use). 75 * 76 * <p>For applications that require separate or custom pools, a {@code 77 * ForkJoinPool} may be constructed with a given target parallelism 78 * level; by default, equal to the number of available processors. The 79 * pool attempts to maintain enough active (or available) threads by 80 * dynamically adding, suspending, or resuming internal worker 81 * threads, even if some tasks are stalled waiting to join 82 * others. However, no such adjustments are guaranteed in the face of 83 * blocked I/O or other unmanaged synchronization. The nested {@link 84 * ManagedBlocker} interface enables extension of the kinds of 85 * synchronization accommodated. 86 * 87 * <p>In addition to execution and lifecycle control methods, this 88 * class provides status check methods (for example 89 * {@link #getStealCount}) that are intended to aid in developing, 90 * tuning, and monitoring fork/join applications. Also, method 91 * {@link #toString} returns indications of pool state in a 92 * convenient form for informal monitoring. 93 * 94 * <p>As is the case with other ExecutorServices, there are three 95 * main task execution methods summarized in the following table. 96 * These are designed to be used primarily by clients not already 97 * engaged in fork/join computations in the current pool. The main 98 * forms of these methods accept instances of {@code ForkJoinTask}, 99 * but overloaded forms also allow mixed execution of plain {@code 100 * Runnable}- or {@code Callable}- based activities as well. However, 101 * tasks that are already executing in a pool should normally instead 102 * use the within-computation forms listed in the table unless using 103 * async event-style tasks that are not usually joined, in which case 104 * there is little difference among choice of methods. 105 * 106 * <table BORDER CELLPADDING=3 CELLSPACING=1> 107 * <tr> 108 * <td></td> 109 * <td ALIGN=CENTER> <b>Call from non-fork/join clients</b></td> 110 * <td ALIGN=CENTER> <b>Call from within fork/join computations</b></td> 111 * </tr> 112 * <tr> 113 * <td> <b>Arrange async execution</td> 114 * <td> {@link #execute(ForkJoinTask)}</td> 115 * <td> {@link ForkJoinTask#fork}</td> 116 * </tr> 117 * <tr> 118 * <td> <b>Await and obtain result</td> 119 * <td> {@link #invoke(ForkJoinTask)}</td> 120 * <td> {@link ForkJoinTask#invoke}</td> 121 * </tr> 122 * <tr> 123 * <td> <b>Arrange exec and obtain Future</td> 124 * <td> {@link #submit(ForkJoinTask)}</td> 125 * <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td> 126 * </tr> 127 * </table> 128 * 129 * <p>The common pool is by default constructed with default 130 * parameters, but these may be controlled by setting three {@link 131 * System#getProperty system properties} with prefix {@code 132 * java.util.concurrent.ForkJoinPool.common}: {@code parallelism} -- 133 * an integer greater than zero, {@code threadFactory} -- the class 134 * name of a {@link ForkJoinWorkerThreadFactory}, and {@code 135 * exceptionHandler} -- the class name of a {@link 136 * java.lang.Thread.UncaughtExceptionHandler 137 * Thread.UncaughtExceptionHandler}. Upon any error in establishing 138 * these settings, default parameters are used. 139 * 140 * <p><b>Implementation notes</b>: This implementation restricts the 141 * maximum number of running threads to 32767. Attempts to create 142 * pools with greater than the maximum number result in 143 * {@code IllegalArgumentException}. 144 * 145 * <p>This implementation rejects submitted tasks (that is, by throwing 146 * {@link RejectedExecutionException}) only when the pool is shut down 147 * or internal resources have been exhausted. 148 * 149 * @since 1.7 150 * @author Doug Lea 151 */ 152 public class ForkJoinPool extends AbstractExecutorService { 153 154 /* 155 * Implementation Overview 156 * 157 * This class and its nested classes provide the main 158 * functionality and control for a set of worker threads: 159 * Submissions from non-FJ threads enter into submission queues. 160 * Workers take these tasks and typically split them into subtasks 161 * that may be stolen by other workers. Preference rules give 162 * first priority to processing tasks from their own queues (LIFO 163 * or FIFO, depending on mode), then to randomized FIFO steals of 164 * tasks in other queues. 165 * 166 * WorkQueues 167 * ========== 168 * 169 * Most operations occur within work-stealing queues (in nested 170 * class WorkQueue). These are special forms of Deques that 171 * support only three of the four possible end-operations -- push, 172 * pop, and poll (aka steal), under the further constraints that 173 * push and pop are called only from the owning thread (or, as 174 * extended here, under a lock), while poll may be called from 175 * other threads. (If you are unfamiliar with them, you probably 176 * want to read Herlihy and Shavit's book "The Art of 177 * Multiprocessor programming", chapter 16 describing these in 178 * more detail before proceeding.) The main work-stealing queue 179 * design is roughly similar to those in the papers "Dynamic 180 * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005 181 * (http://research.sun.com/scalable/pubs/index.html) and 182 * "Idempotent work stealing" by Michael, Saraswat, and Vechev, 183 * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186). 184 * The main differences ultimately stem from GC requirements that 185 * we null out taken slots as soon as we can, to maintain as small 186 * a footprint as possible even in programs generating huge 187 * numbers of tasks. To accomplish this, we shift the CAS 188 * arbitrating pop vs poll (steal) from being on the indices 189 * ("base" and "top") to the slots themselves. So, both a 190 * successful pop and poll mainly entail a CAS of a slot from 191 * non-null to null. Because we rely on CASes of references, we 192 * do not need tag bits on base or top. They are simple ints as 193 * used in any circular array-based queue (see for example 194 * ArrayDeque). Updates to the indices must still be ordered in a 195 * way that guarantees that top == base means the queue is empty, 196 * but otherwise may err on the side of possibly making the queue 197 * appear nonempty when a push, pop, or poll have not fully 198 * committed. Note that this means that the poll operation, 199 * considered individually, is not wait-free. One thief cannot 200 * successfully continue until another in-progress one (or, if 201 * previously empty, a push) completes. However, in the 202 * aggregate, we ensure at least probabilistic non-blockingness. 203 * If an attempted steal fails, a thief always chooses a different 204 * random victim target to try next. So, in order for one thief to 205 * progress, it suffices for any in-progress poll or new push on 206 * any empty queue to complete. (This is why we normally use 207 * method pollAt and its variants that try once at the apparent 208 * base index, else consider alternative actions, rather than 209 * method poll.) 210 * 211 * This approach also enables support of a user mode in which local 212 * task processing is in FIFO, not LIFO order, simply by using 213 * poll rather than pop. This can be useful in message-passing 214 * frameworks in which tasks are never joined. However neither 215 * mode considers affinities, loads, cache localities, etc, so 216 * rarely provide the best possible performance on a given 217 * machine, but portably provide good throughput by averaging over 218 * these factors. (Further, even if we did try to use such 219 * information, we do not usually have a basis for exploiting it. 220 * For example, some sets of tasks profit from cache affinities, 221 * but others are harmed by cache pollution effects.) 222 * 223 * WorkQueues are also used in a similar way for tasks submitted 224 * to the pool. We cannot mix these tasks in the same queues used 225 * for work-stealing (this would contaminate lifo/fifo 226 * processing). Instead, we randomly associate submission queues 227 * with submitting threads, using a form of hashing. The 228 * ThreadLocal Submitter class contains a value initially used as 229 * a hash code for choosing existing queues, but may be randomly 230 * repositioned upon contention with other submitters. In 231 * essence, submitters act like workers except that they are 232 * restricted to executing local tasks that they submitted (or in 233 * the case of CountedCompleters, others with the same root task). 234 * However, because most shared/external queue operations are more 235 * expensive than internal, and because, at steady state, external 236 * submitters will compete for CPU with workers, ForkJoinTask.join 237 * and related methods disable them from repeatedly helping to 238 * process tasks if all workers are active. Insertion of tasks in 239 * shared mode requires a lock (mainly to protect in the case of 240 * resizing) but we use only a simple spinlock (using bits in 241 * field qlock), because submitters encountering a busy queue move 242 * on to try or create other queues -- they block only when 243 * creating and registering new queues. 244 * 245 * Management 246 * ========== 247 * 248 * The main throughput advantages of work-stealing stem from 249 * decentralized control -- workers mostly take tasks from 250 * themselves or each other. We cannot negate this in the 251 * implementation of other management responsibilities. The main 252 * tactic for avoiding bottlenecks is packing nearly all 253 * essentially atomic control state into two volatile variables 254 * that are by far most often read (not written) as status and 255 * consistency checks. 256 * 257 * Field "ctl" contains 64 bits holding all the information needed 258 * to atomically decide to add, inactivate, enqueue (on an event 259 * queue), dequeue, and/or re-activate workers. To enable this 260 * packing, we restrict maximum parallelism to (1<<15)-1 (which is 261 * far in excess of normal operating range) to allow ids, counts, 262 * and their negations (used for thresholding) to fit into 16bit 263 * fields. 264 * 265 * Field "plock" is a form of sequence lock with a saturating 266 * shutdown bit (similarly for per-queue "qlocks"), mainly 267 * protecting updates to the workQueues array, as well as to 268 * enable shutdown. When used as a lock, it is normally only very 269 * briefly held, so is nearly always available after at most a 270 * brief spin, but we use a monitor-based backup strategy to 271 * block when needed. 272 * 273 * Recording WorkQueues. WorkQueues are recorded in the 274 * "workQueues" array that is created upon first use and expanded 275 * if necessary. Updates to the array while recording new workers 276 * and unrecording terminated ones are protected from each other 277 * by a lock but the array is otherwise concurrently readable, and 278 * accessed directly. To simplify index-based operations, the 279 * array size is always a power of two, and all readers must 280 * tolerate null slots. Worker queues are at odd indices. Shared 281 * (submission) queues are at even indices, up to a maximum of 64 282 * slots, to limit growth even if array needs to expand to add 283 * more workers. Grouping them together in this way simplifies and 284 * speeds up task scanning. 285 * 286 * All worker thread creation is on-demand, triggered by task 287 * submissions, replacement of terminated workers, and/or 288 * compensation for blocked workers. However, all other support 289 * code is set up to work with other policies. To ensure that we 290 * do not hold on to worker references that would prevent GC, ALL 291 * accesses to workQueues are via indices into the workQueues 292 * array (which is one source of some of the messy code 293 * constructions here). In essence, the workQueues array serves as 294 * a weak reference mechanism. Thus for example the wait queue 295 * field of ctl stores indices, not references. Access to the 296 * workQueues in associated methods (for example signalWork) must 297 * both index-check and null-check the IDs. All such accesses 298 * ignore bad IDs by returning out early from what they are doing, 299 * since this can only be associated with termination, in which 300 * case it is OK to give up. All uses of the workQueues array 301 * also check that it is non-null (even if previously 302 * non-null). This allows nulling during termination, which is 303 * currently not necessary, but remains an option for 304 * resource-revocation-based shutdown schemes. It also helps 305 * reduce JIT issuance of uncommon-trap code, which tends to 306 * unnecessarily complicate control flow in some methods. 307 * 308 * Event Queuing. Unlike HPC work-stealing frameworks, we cannot 309 * let workers spin indefinitely scanning for tasks when none can 310 * be found immediately, and we cannot start/resume workers unless 311 * there appear to be tasks available. On the other hand, we must 312 * quickly prod them into action when new tasks are submitted or 313 * generated. In many usages, ramp-up time to activate workers is 314 * the main limiting factor in overall performance (this is 315 * compounded at program start-up by JIT compilation and 316 * allocation). So we try to streamline this as much as possible. 317 * We park/unpark workers after placing in an event wait queue 318 * when they cannot find work. This "queue" is actually a simple 319 * Treiber stack, headed by the "id" field of ctl, plus a 15bit 320 * counter value (that reflects the number of times a worker has 321 * been inactivated) to avoid ABA effects (we need only as many 322 * version numbers as worker threads). Successors are held in 323 * field WorkQueue.nextWait. Queuing deals with several intrinsic 324 * races, mainly that a task-producing thread can miss seeing (and 325 * signalling) another thread that gave up looking for work but 326 * has not yet entered the wait queue. We solve this by requiring 327 * a full sweep of all workers (via repeated calls to method 328 * scan()) both before and after a newly waiting worker is added 329 * to the wait queue. During a rescan, the worker might release 330 * some other queued worker rather than itself, which has the same 331 * net effect. Because enqueued workers may actually be rescanning 332 * rather than waiting, we set and clear the "parker" field of 333 * WorkQueues to reduce unnecessary calls to unpark. (This 334 * requires a secondary recheck to avoid missed signals.) Note 335 * the unusual conventions about Thread.interrupts surrounding 336 * parking and other blocking: Because interrupts are used solely 337 * to alert threads to check termination, which is checked anyway 338 * upon blocking, we clear status (using Thread.interrupted) 339 * before any call to park, so that park does not immediately 340 * return due to status being set via some other unrelated call to 341 * interrupt in user code. 342 * 343 * Signalling. We create or wake up workers only when there 344 * appears to be at least one task they might be able to find and 345 * execute. However, many other threads may notice the same task 346 * and each signal to wake up a thread that might take it. So in 347 * general, pools will be over-signalled. When a submission is 348 * added or another worker adds a task to a queue that has fewer 349 * than two tasks, they signal waiting workers (or trigger 350 * creation of new ones if fewer than the given parallelism level 351 * -- signalWork), and may leave a hint to the unparked worker to 352 * help signal others upon wakeup). These primary signals are 353 * buttressed by others (see method helpSignal) whenever other 354 * threads scan for work or do not have a task to process. On 355 * most platforms, signalling (unpark) overhead time is noticeably 356 * long, and the time between signalling a thread and it actually 357 * making progress can be very noticeably long, so it is worth 358 * offloading these delays from critical paths as much as 359 * possible. 360 * 361 * Trimming workers. To release resources after periods of lack of 362 * use, a worker starting to wait when the pool is quiescent will 363 * time out and terminate if the pool has remained quiescent for a 364 * given period -- a short period if there are more threads than 365 * parallelism, longer as the number of threads decreases. This 366 * will slowly propagate, eventually terminating all workers after 367 * periods of non-use. 368 * 369 * Shutdown and Termination. A call to shutdownNow atomically sets 370 * a plock bit and then (non-atomically) sets each worker's 371 * qlock status, cancels all unprocessed tasks, and wakes up 372 * all waiting workers. Detecting whether termination should 373 * commence after a non-abrupt shutdown() call requires more work 374 * and bookkeeping. We need consensus about quiescence (i.e., that 375 * there is no more work). The active count provides a primary 376 * indication but non-abrupt shutdown still requires a rechecking 377 * scan for any workers that are inactive but not queued. 378 * 379 * Joining Tasks 380 * ============= 381 * 382 * Any of several actions may be taken when one worker is waiting 383 * to join a task stolen (or always held) by another. Because we 384 * are multiplexing many tasks on to a pool of workers, we can't 385 * just let them block (as in Thread.join). We also cannot just 386 * reassign the joiner's run-time stack with another and replace 387 * it later, which would be a form of "continuation", that even if 388 * possible is not necessarily a good idea since we sometimes need 389 * both an unblocked task and its continuation to progress. 390 * Instead we combine two tactics: 391 * 392 * Helping: Arranging for the joiner to execute some task that it 393 * would be running if the steal had not occurred. 394 * 395 * Compensating: Unless there are already enough live threads, 396 * method tryCompensate() may create or re-activate a spare 397 * thread to compensate for blocked joiners until they unblock. 398 * 399 * A third form (implemented in tryRemoveAndExec) amounts to 400 * helping a hypothetical compensator: If we can readily tell that 401 * a possible action of a compensator is to steal and execute the 402 * task being joined, the joining thread can do so directly, 403 * without the need for a compensation thread (although at the 404 * expense of larger run-time stacks, but the tradeoff is 405 * typically worthwhile). 406 * 407 * The ManagedBlocker extension API can't use helping so relies 408 * only on compensation in method awaitBlocker. 409 * 410 * The algorithm in tryHelpStealer entails a form of "linear" 411 * helping: Each worker records (in field currentSteal) the most 412 * recent task it stole from some other worker. Plus, it records 413 * (in field currentJoin) the task it is currently actively 414 * joining. Method tryHelpStealer uses these markers to try to 415 * find a worker to help (i.e., steal back a task from and execute 416 * it) that could hasten completion of the actively joined task. 417 * In essence, the joiner executes a task that would be on its own 418 * local deque had the to-be-joined task not been stolen. This may 419 * be seen as a conservative variant of the approach in Wagner & 420 * Calder "Leapfrogging: a portable technique for implementing 421 * efficient futures" SIGPLAN Notices, 1993 422 * (http://portal.acm.org/citation.cfm?id=155354). It differs in 423 * that: (1) We only maintain dependency links across workers upon 424 * steals, rather than use per-task bookkeeping. This sometimes 425 * requires a linear scan of workQueues array to locate stealers, 426 * but often doesn't because stealers leave hints (that may become 427 * stale/wrong) of where to locate them. It is only a hint 428 * because a worker might have had multiple steals and the hint 429 * records only one of them (usually the most current). Hinting 430 * isolates cost to when it is needed, rather than adding to 431 * per-task overhead. (2) It is "shallow", ignoring nesting and 432 * potentially cyclic mutual steals. (3) It is intentionally 433 * racy: field currentJoin is updated only while actively joining, 434 * which means that we miss links in the chain during long-lived 435 * tasks, GC stalls etc (which is OK since blocking in such cases 436 * is usually a good idea). (4) We bound the number of attempts 437 * to find work (see MAX_HELP) and fall back to suspending the 438 * worker and if necessary replacing it with another. 439 * 440 * Helping actions for CountedCompleters are much simpler: Method 441 * helpComplete can take and execute any task with the same root 442 * as the task being waited on. However, this still entails some 443 * traversal of completer chains, so is less efficient than using 444 * CountedCompleters without explicit joins. 445 * 446 * It is impossible to keep exactly the target parallelism number 447 * of threads running at any given time. Determining the 448 * existence of conservatively safe helping targets, the 449 * availability of already-created spares, and the apparent need 450 * to create new spares are all racy, so we rely on multiple 451 * retries of each. Compensation in the apparent absence of 452 * helping opportunities is challenging to control on JVMs, where 453 * GC and other activities can stall progress of tasks that in 454 * turn stall out many other dependent tasks, without us being 455 * able to determine whether they will ever require compensation. 456 * Even though work-stealing otherwise encounters little 457 * degradation in the presence of more threads than cores, 458 * aggressively adding new threads in such cases entails risk of 459 * unwanted positive feedback control loops in which more threads 460 * cause more dependent stalls (as well as delayed progress of 461 * unblocked threads to the point that we know they are available) 462 * leading to more situations requiring more threads, and so 463 * on. This aspect of control can be seen as an (analytically 464 * intractable) game with an opponent that may choose the worst 465 * (for us) active thread to stall at any time. We take several 466 * precautions to bound losses (and thus bound gains), mainly in 467 * methods tryCompensate and awaitJoin. 468 * 469 * Common Pool 470 * =========== 471 * 472 * The static commonPool always exists after static 473 * initialization. Since it (or any other created pool) need 474 * never be used, we minimize initial construction overhead and 475 * footprint to the setup of about a dozen fields, with no nested 476 * allocation. Most bootstrapping occurs within method 477 * fullExternalPush during the first submission to the pool. 478 * 479 * When external threads submit to the common pool, they can 480 * perform some subtask processing (see externalHelpJoin and 481 * related methods). We do not need to record whether these 482 * submissions are to the common pool -- if not, externalHelpJoin 483 * returns quickly (at the most helping to signal some common pool 484 * workers). These submitters would otherwise be blocked waiting 485 * for completion, so the extra effort (with liberally sprinkled 486 * task status checks) in inapplicable cases amounts to an odd 487 * form of limited spin-wait before blocking in ForkJoinTask.join. 488 * 489 * Style notes 490 * =========== 491 * 492 * There is a lot of representation-level coupling among classes 493 * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask. The 494 * fields of WorkQueue maintain data structures managed by 495 * ForkJoinPool, so are directly accessed. There is little point 496 * trying to reduce this, since any associated future changes in 497 * representations will need to be accompanied by algorithmic 498 * changes anyway. Several methods intrinsically sprawl because 499 * they must accumulate sets of consistent reads of volatiles held 500 * in local variables. Methods signalWork() and scan() are the 501 * main bottlenecks, so are especially heavily 502 * micro-optimized/mangled. There are lots of inline assignments 503 * (of form "while ((local = field) != 0)") which are usually the 504 * simplest way to ensure the required read orderings (which are 505 * sometimes critical). This leads to a "C"-like style of listing 506 * declarations of these locals at the heads of methods or blocks. 507 * There are several occurrences of the unusual "do {} while 508 * (!cas...)" which is the simplest way to force an update of a 509 * CAS'ed variable. There are also other coding oddities (including 510 * several unnecessary-looking hoisted null checks) that help 511 * some methods perform reasonably even when interpreted (not 512 * compiled). 513 * 514 * The order of declarations in this file is: 515 * (1) Static utility functions 516 * (2) Nested (static) classes 517 * (3) Static fields 518 * (4) Fields, along with constants used when unpacking some of them 519 * (5) Internal control methods 520 * (6) Callbacks and other support for ForkJoinTask methods 521 * (7) Exported methods 522 * (8) Static block initializing statics in minimally dependent order 523 */ 524 525 // Static utilities 526 527 /** 528 * If there is a security manager, makes sure caller has 529 * permission to modify threads. 530 */ 531 private static void checkPermission() { 532 SecurityManager security = System.getSecurityManager(); 533 if (security != null) 534 security.checkPermission(modifyThreadPermission); 535 } 536 537 // Nested classes 538 539 /** 540 * Factory for creating new {@link ForkJoinWorkerThread}s. 541 * A {@code ForkJoinWorkerThreadFactory} must be defined and used 542 * for {@code ForkJoinWorkerThread} subclasses that extend base 543 * functionality or initialize threads with different contexts. 544 */ 545 public static interface ForkJoinWorkerThreadFactory { 546 /** 547 * Returns a new worker thread operating in the given pool. 548 * 549 * @param pool the pool this thread works in 550 * @throws NullPointerException if the pool is null 551 */ 552 public ForkJoinWorkerThread newThread(ForkJoinPool pool); 553 } 554 555 /** 556 * Default ForkJoinWorkerThreadFactory implementation; creates a 557 * new ForkJoinWorkerThread. 558 */ 559 static final class DefaultForkJoinWorkerThreadFactory 560 implements ForkJoinWorkerThreadFactory { 561 public final ForkJoinWorkerThread newThread(ForkJoinPool pool) { 562 return new ForkJoinWorkerThread(pool); 563 } 564 } 565 566 /** 567 * Per-thread records for threads that submit to pools. Currently 568 * holds only pseudo-random seed / index that is used to choose 569 * submission queues in method externalPush. In the future, this may 570 * also incorporate a means to implement different task rejection 571 * and resubmission policies. 572 * 573 * Seeds for submitters and workers/workQueues work in basically 574 * the same way but are initialized and updated using slightly 575 * different mechanics. Both are initialized using the same 576 * approach as in class ThreadLocal, where successive values are 577 * unlikely to collide with previous values. Seeds are then 578 * randomly modified upon collisions using xorshifts, which 579 * requires a non-zero seed. 580 */ 581 static final class Submitter { 582 int seed; 583 Submitter(int s) { seed = s; } 584 } 585 586 /** 587 * Class for artificial tasks that are used to replace the target 588 * of local joins if they are removed from an interior queue slot 589 * in WorkQueue.tryRemoveAndExec. We don't need the proxy to 590 * actually do anything beyond having a unique identity. 591 */ 592 static final class EmptyTask extends ForkJoinTask<Void> { 593 private static final long serialVersionUID = -7721805057305804111L; 594 EmptyTask() { status = ForkJoinTask.NORMAL; } // force done 595 public final Void getRawResult() { return null; } 596 public final void setRawResult(Void x) {} 597 public final boolean exec() { return true; } 598 } 599 600 /** 601 * Queues supporting work-stealing as well as external task 602 * submission. See above for main rationale and algorithms. 603 * Implementation relies heavily on "Unsafe" intrinsics 604 * and selective use of "volatile": 605 * 606 * Field "base" is the index (mod array.length) of the least valid 607 * queue slot, which is always the next position to steal (poll) 608 * from if nonempty. Reads and writes require volatile orderings 609 * but not CAS, because updates are only performed after slot 610 * CASes. 611 * 612 * Field "top" is the index (mod array.length) of the next queue 613 * slot to push to or pop from. It is written only by owner thread 614 * for push, or under lock for external/shared push, and accessed 615 * by other threads only after reading (volatile) base. Both top 616 * and base are allowed to wrap around on overflow, but (top - 617 * base) (or more commonly -(base - top) to force volatile read of 618 * base before top) still estimates size. The lock ("qlock") is 619 * forced to -1 on termination, causing all further lock attempts 620 * to fail. (Note: we don't need CAS for termination state because 621 * upon pool shutdown, all shared-queues will stop being used 622 * anyway.) Nearly all lock bodies are set up so that exceptions 623 * within lock bodies are "impossible" (modulo JVM errors that 624 * would cause failure anyway.) 625 * 626 * The array slots are read and written using the emulation of 627 * volatiles/atomics provided by Unsafe. Insertions must in 628 * general use putOrderedObject as a form of releasing store to 629 * ensure that all writes to the task object are ordered before 630 * its publication in the queue. All removals entail a CAS to 631 * null. The array is always a power of two. To ensure safety of 632 * Unsafe array operations, all accesses perform explicit null 633 * checks and implicit bounds checks via power-of-two masking. 634 * 635 * In addition to basic queuing support, this class contains 636 * fields described elsewhere to control execution. It turns out 637 * to work better memory-layout-wise to include them in this class 638 * rather than a separate class. 639 * 640 * Performance on most platforms is very sensitive to placement of 641 * instances of both WorkQueues and their arrays -- we absolutely 642 * do not want multiple WorkQueue instances or multiple queue 643 * arrays sharing cache lines. (It would be best for queue objects 644 * and their arrays to share, but there is nothing available to 645 * help arrange that). Unfortunately, because they are recorded 646 * in a common array, WorkQueue instances are often moved to be 647 * adjacent by garbage collectors. To reduce impact, we use field 648 * padding that works OK on common platforms; this effectively 649 * trades off slightly slower average field access for the sake of 650 * avoiding really bad worst-case access. (Until better JVM 651 * support is in place, this padding is dependent on transient 652 * properties of JVM field layout rules.) We also take care in 653 * allocating, sizing and resizing the array. Non-shared queue 654 * arrays are initialized by workers before use. Others are 655 * allocated on first use. 656 */ 657 static final class WorkQueue { 658 /** 659 * Capacity of work-stealing queue array upon initialization. 660 * Must be a power of two; at least 4, but should be larger to 661 * reduce or eliminate cacheline sharing among queues. 662 * Currently, it is much larger, as a partial workaround for 663 * the fact that JVMs often place arrays in locations that 664 * share GC bookkeeping (especially cardmarks) such that 665 * per-write accesses encounter serious memory contention. 666 */ 667 static final int INITIAL_QUEUE_CAPACITY = 1 << 13; 668 669 /** 670 * Maximum size for queue arrays. Must be a power of two less 671 * than or equal to 1 << (31 - width of array entry) to ensure 672 * lack of wraparound of index calculations, but defined to a 673 * value a bit less than this to help users trap runaway 674 * programs before saturating systems. 675 */ 676 static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M 677 678 // Heuristic padding to ameliorate unfortunate memory placements 679 volatile long pad00, pad01, pad02, pad03, pad04, pad05, pad06; 680 681 int seed; // for random scanning; initialize nonzero 682 volatile int eventCount; // encoded inactivation count; < 0 if inactive 683 int nextWait; // encoded record of next event waiter 684 int hint; // steal or signal hint (index) 685 int poolIndex; // index of this queue in pool (or 0) 686 final int mode; // 0: lifo, > 0: fifo, < 0: shared 687 int nsteals; // number of steals 688 volatile int qlock; // 1: locked, -1: terminate; else 0 689 volatile int base; // index of next slot for poll 690 int top; // index of next slot for push 691 ForkJoinTask<?>[] array; // the elements (initially unallocated) 692 final ForkJoinPool pool; // the containing pool (may be null) 693 final ForkJoinWorkerThread owner; // owning thread or null if shared 694 volatile Thread parker; // == owner during call to park; else null 695 volatile ForkJoinTask<?> currentJoin; // task being joined in awaitJoin 696 ForkJoinTask<?> currentSteal; // current non-local task being executed 697 698 volatile Object pad10, pad11, pad12, pad13, pad14, pad15, pad16, pad17; 699 volatile Object pad18, pad19, pad1a, pad1b, pad1c, pad1d; 700 701 WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner, int mode, 702 int seed) { 703 this.pool = pool; 704 this.owner = owner; 705 this.mode = mode; 706 this.seed = seed; 707 // Place indices in the center of array (that is not yet allocated) 708 base = top = INITIAL_QUEUE_CAPACITY >>> 1; 709 } 710 711 /** 712 * Returns the approximate number of tasks in the queue. 713 */ 714 final int queueSize() { 715 int n = base - top; // non-owner callers must read base first 716 return (n >= 0) ? 0 : -n; // ignore transient negative 717 } 718 719 /** 720 * Provides a more accurate estimate of whether this queue has 721 * any tasks than does queueSize, by checking whether a 722 * near-empty queue has at least one unclaimed task. 723 */ 724 final boolean isEmpty() { 725 ForkJoinTask<?>[] a; int m, s; 726 int n = base - (s = top); 727 return (n >= 0 || 728 (n == -1 && 729 ((a = array) == null || 730 (m = a.length - 1) < 0 || 731 U.getObject 732 (a, (long)((m & (s - 1)) << ASHIFT) + ABASE) == null))); 733 } 734 735 /** 736 * Pushes a task. Call only by owner in unshared queues. (The 737 * shared-queue version is embedded in method externalPush.) 738 * 739 * @param task the task. Caller must ensure non-null. 740 * @throw RejectedExecutionException if array cannot be resized 741 */ 742 final void push(ForkJoinTask<?> task) { 743 ForkJoinTask<?>[] a; ForkJoinPool p; 744 int s = top, m, n; 745 if ((a = array) != null) { // ignore if queue removed 746 int j = (((m = a.length - 1) & s) << ASHIFT) + ABASE; 747 U.putOrderedObject(a, j, task); 748 if ((n = (top = s + 1) - base) <= 2) { 749 if ((p = pool) != null) 750 p.signalWork(this); 751 } 752 else if (n >= m) 753 growArray(); 754 } 755 } 756 757 /** 758 * Initializes or doubles the capacity of array. Call either 759 * by owner or with lock held -- it is OK for base, but not 760 * top, to move while resizings are in progress. 761 */ 762 final ForkJoinTask<?>[] growArray() { 763 ForkJoinTask<?>[] oldA = array; 764 int size = oldA != null ? oldA.length << 1 : INITIAL_QUEUE_CAPACITY; 765 if (size > MAXIMUM_QUEUE_CAPACITY) 766 throw new RejectedExecutionException("Queue capacity exceeded"); 767 int oldMask, t, b; 768 ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size]; 769 if (oldA != null && (oldMask = oldA.length - 1) >= 0 && 770 (t = top) - (b = base) > 0) { 771 int mask = size - 1; 772 do { 773 ForkJoinTask<?> x; 774 int oldj = ((b & oldMask) << ASHIFT) + ABASE; 775 int j = ((b & mask) << ASHIFT) + ABASE; 776 x = (ForkJoinTask<?>)U.getObjectVolatile(oldA, oldj); 777 if (x != null && 778 U.compareAndSwapObject(oldA, oldj, x, null)) 779 U.putObjectVolatile(a, j, x); 780 } while (++b != t); 781 } 782 return a; 783 } 784 785 /** 786 * Takes next task, if one exists, in LIFO order. Call only 787 * by owner in unshared queues. 788 */ 789 final ForkJoinTask<?> pop() { 790 ForkJoinTask<?>[] a; ForkJoinTask<?> t; int m; 791 if ((a = array) != null && (m = a.length - 1) >= 0) { 792 for (int s; (s = top - 1) - base >= 0;) { 793 long j = ((m & s) << ASHIFT) + ABASE; 794 if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null) 795 break; 796 if (U.compareAndSwapObject(a, j, t, null)) { 797 top = s; 798 return t; 799 } 800 } 801 } 802 return null; 803 } 804 805 /** 806 * Takes a task in FIFO order if b is base of queue and a task 807 * can be claimed without contention. Specialized versions 808 * appear in ForkJoinPool methods scan and tryHelpStealer. 809 */ 810 final ForkJoinTask<?> pollAt(int b) { 811 ForkJoinTask<?> t; ForkJoinTask<?>[] a; 812 if ((a = array) != null) { 813 int j = (((a.length - 1) & b) << ASHIFT) + ABASE; 814 if ((t = (ForkJoinTask<?>)U.getObjectVolatile(a, j)) != null && 815 base == b && 816 U.compareAndSwapObject(a, j, t, null)) { 817 base = b + 1; 818 return t; 819 } 820 } 821 return null; 822 } 823 824 /** 825 * Takes next task, if one exists, in FIFO order. 826 */ 827 final ForkJoinTask<?> poll() { 828 ForkJoinTask<?>[] a; int b; ForkJoinTask<?> t; 829 while ((b = base) - top < 0 && (a = array) != null) { 830 int j = (((a.length - 1) & b) << ASHIFT) + ABASE; 831 t = (ForkJoinTask<?>)U.getObjectVolatile(a, j); 832 if (t != null) { 833 if (base == b && 834 U.compareAndSwapObject(a, j, t, null)) { 835 base = b + 1; 836 return t; 837 } 838 } 839 else if (base == b) { 840 if (b + 1 == top) 841 break; 842 Thread.yield(); // wait for lagging update (very rare) 843 } 844 } 845 return null; 846 } 847 848 /** 849 * Takes next task, if one exists, in order specified by mode. 850 */ 851 final ForkJoinTask<?> nextLocalTask() { 852 return mode == 0 ? pop() : poll(); 853 } 854 855 /** 856 * Returns next task, if one exists, in order specified by mode. 857 */ 858 final ForkJoinTask<?> peek() { 859 ForkJoinTask<?>[] a = array; int m; 860 if (a == null || (m = a.length - 1) < 0) 861 return null; 862 int i = mode == 0 ? top - 1 : base; 863 int j = ((i & m) << ASHIFT) + ABASE; 864 return (ForkJoinTask<?>)U.getObjectVolatile(a, j); 865 } 866 867 /** 868 * Pops the given task only if it is at the current top. 869 * (A shared version is available only via FJP.tryExternalUnpush) 870 */ 871 final boolean tryUnpush(ForkJoinTask<?> t) { 872 ForkJoinTask<?>[] a; int s; 873 if ((a = array) != null && (s = top) != base && 874 U.compareAndSwapObject 875 (a, (((a.length - 1) & --s) << ASHIFT) + ABASE, t, null)) { 876 top = s; 877 return true; 878 } 879 return false; 880 } 881 882 /** 883 * Removes and cancels all known tasks, ignoring any exceptions. 884 */ 885 final void cancelAll() { 886 ForkJoinTask.cancelIgnoringExceptions(currentJoin); 887 ForkJoinTask.cancelIgnoringExceptions(currentSteal); 888 for (ForkJoinTask<?> t; (t = poll()) != null; ) 889 ForkJoinTask.cancelIgnoringExceptions(t); 890 } 891 892 /** 893 * Computes next value for random probes. Scans don't require 894 * a very high quality generator, but also not a crummy one. 895 * Marsaglia xor-shift is cheap and works well enough. Note: 896 * This is manually inlined in its usages in ForkJoinPool to 897 * avoid writes inside busy scan loops. 898 */ 899 final int nextSeed() { 900 int r = seed; 901 r ^= r << 13; 902 r ^= r >>> 17; 903 return seed = r ^= r << 5; 904 } 905 906 // Specialized execution methods 907 908 /** 909 * Pops and runs tasks until empty. 910 */ 911 private void popAndExecAll() { 912 // A bit faster than repeated pop calls 913 ForkJoinTask<?>[] a; int m, s; long j; ForkJoinTask<?> t; 914 while ((a = array) != null && (m = a.length - 1) >= 0 && 915 (s = top - 1) - base >= 0 && 916 (t = ((ForkJoinTask<?>) 917 U.getObject(a, j = ((m & s) << ASHIFT) + ABASE))) 918 != null) { 919 if (U.compareAndSwapObject(a, j, t, null)) { 920 top = s; 921 t.doExec(); 922 } 923 } 924 } 925 926 /** 927 * Polls and runs tasks until empty. 928 */ 929 private void pollAndExecAll() { 930 for (ForkJoinTask<?> t; (t = poll()) != null;) 931 t.doExec(); 932 } 933 934 /** 935 * If present, removes from queue and executes the given task, 936 * or any other cancelled task. Returns (true) on any CAS 937 * or consistency check failure so caller can retry. 938 * 939 * @return false if no progress can be made, else true; 940 */ 941 final boolean tryRemoveAndExec(ForkJoinTask<?> task) { 942 boolean stat = true, removed = false, empty = true; 943 ForkJoinTask<?>[] a; int m, s, b, n; 944 if ((a = array) != null && (m = a.length - 1) >= 0 && 945 (n = (s = top) - (b = base)) > 0) { 946 for (ForkJoinTask<?> t;;) { // traverse from s to b 947 int j = ((--s & m) << ASHIFT) + ABASE; 948 t = (ForkJoinTask<?>)U.getObjectVolatile(a, j); 949 if (t == null) // inconsistent length 950 break; 951 else if (t == task) { 952 if (s + 1 == top) { // pop 953 if (!U.compareAndSwapObject(a, j, task, null)) 954 break; 955 top = s; 956 removed = true; 957 } 958 else if (base == b) // replace with proxy 959 removed = U.compareAndSwapObject(a, j, task, 960 new EmptyTask()); 961 break; 962 } 963 else if (t.status >= 0) 964 empty = false; 965 else if (s + 1 == top) { // pop and throw away 966 if (U.compareAndSwapObject(a, j, t, null)) 967 top = s; 968 break; 969 } 970 if (--n == 0) { 971 if (!empty && base == b) 972 stat = false; 973 break; 974 } 975 } 976 } 977 if (removed) 978 task.doExec(); 979 return stat; 980 } 981 982 /** 983 * Polls for and executes the given task or any other task in 984 * its CountedCompleter computation 985 */ 986 final boolean pollAndExecCC(ForkJoinTask<?> root) { 987 ForkJoinTask<?>[] a; int b; Object o; 988 outer: while ((b = base) - top < 0 && (a = array) != null) { 989 long j = (((a.length - 1) & b) << ASHIFT) + ABASE; 990 if ((o = U.getObject(a, j)) == null || 991 !(o instanceof CountedCompleter)) 992 break; 993 for (CountedCompleter<?> t = (CountedCompleter<?>)o, r = t;;) { 994 if (r == root) { 995 if (base == b && 996 U.compareAndSwapObject(a, j, t, null)) { 997 base = b + 1; 998 t.doExec(); 999 return true; 1000 } 1001 else 1002 break; // restart 1003 } 1004 if ((r = r.completer) == null) 1005 break outer; // not part of root computation 1006 } 1007 } 1008 return false; 1009 } 1010 1011 /** 1012 * Executes a top-level task and any local tasks remaining 1013 * after execution. 1014 */ 1015 final void runTask(ForkJoinTask<?> t) { 1016 if (t != null) { 1017 (currentSteal = t).doExec(); 1018 currentSteal = null; 1019 ++nsteals; 1020 if (base - top < 0) { // process remaining local tasks 1021 if (mode == 0) 1022 popAndExecAll(); 1023 else 1024 pollAndExecAll(); 1025 } 1026 } 1027 } 1028 1029 /** 1030 * Executes a non-top-level (stolen) task. 1031 */ 1032 final void runSubtask(ForkJoinTask<?> t) { 1033 if (t != null) { 1034 ForkJoinTask<?> ps = currentSteal; 1035 (currentSteal = t).doExec(); 1036 currentSteal = ps; 1037 } 1038 } 1039 1040 /** 1041 * Returns true if owned and not known to be blocked. 1042 */ 1043 final boolean isApparentlyUnblocked() { 1044 Thread wt; Thread.State s; 1045 return (eventCount >= 0 && 1046 (wt = owner) != null && 1047 (s = wt.getState()) != Thread.State.BLOCKED && 1048 s != Thread.State.WAITING && 1049 s != Thread.State.TIMED_WAITING); 1050 } 1051 1052 // Unsafe mechanics 1053 private static final sun.misc.Unsafe U; 1054 private static final long QLOCK; 1055 private static final int ABASE; 1056 private static final int ASHIFT; 1057 static { 1058 int s; 1059 try { 1060 U = sun.misc.Unsafe.getUnsafe(); 1061 Class<?> k = WorkQueue.class; 1062 Class<?> ak = ForkJoinTask[].class; 1063 QLOCK = U.objectFieldOffset 1064 (k.getDeclaredField("qlock")); 1065 ABASE = U.arrayBaseOffset(ak); 1066 s = U.arrayIndexScale(ak); 1067 } catch (Exception e) { 1068 throw new Error(e); 1069 } 1070 if ((s & (s-1)) != 0) 1071 throw new Error("data type scale not a power of two"); 1072 ASHIFT = 31 - Integer.numberOfLeadingZeros(s); 1073 } 1074 } 1075 1076 // static fields (initialized in static initializer below) 1077 1078 /** 1079 * Creates a new ForkJoinWorkerThread. This factory is used unless 1080 * overridden in ForkJoinPool constructors. 1081 */ 1082 public static final ForkJoinWorkerThreadFactory 1083 defaultForkJoinWorkerThreadFactory; 1084 1085 /** 1086 * Per-thread submission bookkeeping. Shared across all pools 1087 * to reduce ThreadLocal pollution and because random motion 1088 * to avoid contention in one pool is likely to hold for others. 1089 * Lazily initialized on first submission (but null-checked 1090 * in other contexts to avoid unnecessary initialization). 1091 */ 1092 static final ThreadLocal<Submitter> submitters; 1093 1094 /** 1095 * Permission required for callers of methods that may start or 1096 * kill threads. 1097 */ 1098 private static final RuntimePermission modifyThreadPermission; 1099 1100 /** 1101 * Common (static) pool. Non-null for public use unless a static 1102 * construction exception, but internal usages null-check on use 1103 * to paranoically avoid potential initialization circularities 1104 * as well as to simplify generated code. 1105 */ 1106 static final ForkJoinPool commonPool; 1107 1108 /** 1109 * Common pool parallelism. Must equal commonPool.parallelism. 1110 */ 1111 static final int commonPoolParallelism; 1112 1113 /** 1114 * Sequence number for creating workerNamePrefix. 1115 */ 1116 private static int poolNumberSequence; 1117 1118 /** 1119 * Return the next sequence number. We don't expect this to 1120 * ever contend so use simple builtin sync. 1121 */ 1122 private static final synchronized int nextPoolId() { 1123 return ++poolNumberSequence; 1124 } 1125 1126 // static constants 1127 1128 /** 1129 * Initial timeout value (in nanoseconds) for the thread 1130 * triggering quiescence to park waiting for new work. On timeout, 1131 * the thread will instead try to shrink the number of 1132 * workers. The value should be large enough to avoid overly 1133 * aggressive shrinkage during most transient stalls (long GCs 1134 * etc). 1135 */ 1136 private static final long IDLE_TIMEOUT = 2000L * 1000L * 1000L; // 2sec 1137 1138 /** 1139 * Timeout value when there are more threads than parallelism level 1140 */ 1141 private static final long FAST_IDLE_TIMEOUT = 200L * 1000L * 1000L; 1142 1143 /** 1144 * Tolerance for idle timeouts, to cope with timer undershoots 1145 */ 1146 private static final long TIMEOUT_SLOP = 2000000L; 1147 1148 /** 1149 * The maximum stolen->joining link depth allowed in method 1150 * tryHelpStealer. Must be a power of two. Depths for legitimate 1151 * chains are unbounded, but we use a fixed constant to avoid 1152 * (otherwise unchecked) cycles and to bound staleness of 1153 * traversal parameters at the expense of sometimes blocking when 1154 * we could be helping. 1155 */ 1156 private static final int MAX_HELP = 64; 1157 1158 /** 1159 * Increment for seed generators. See class ThreadLocal for 1160 * explanation. 1161 */ 1162 private static final int SEED_INCREMENT = 0x61c88647; 1163 1164 /** 1165 * Bits and masks for control variables 1166 * 1167 * Field ctl is a long packed with: 1168 * AC: Number of active running workers minus target parallelism (16 bits) 1169 * TC: Number of total workers minus target parallelism (16 bits) 1170 * ST: true if pool is terminating (1 bit) 1171 * EC: the wait count of top waiting thread (15 bits) 1172 * ID: poolIndex of top of Treiber stack of waiters (16 bits) 1173 * 1174 * When convenient, we can extract the upper 32 bits of counts and 1175 * the lower 32 bits of queue state, u = (int)(ctl >>> 32) and e = 1176 * (int)ctl. The ec field is never accessed alone, but always 1177 * together with id and st. The offsets of counts by the target 1178 * parallelism and the positionings of fields makes it possible to 1179 * perform the most common checks via sign tests of fields: When 1180 * ac is negative, there are not enough active workers, when tc is 1181 * negative, there are not enough total workers, and when e is 1182 * negative, the pool is terminating. To deal with these possibly 1183 * negative fields, we use casts in and out of "short" and/or 1184 * signed shifts to maintain signedness. 1185 * 1186 * When a thread is queued (inactivated), its eventCount field is 1187 * set negative, which is the only way to tell if a worker is 1188 * prevented from executing tasks, even though it must continue to 1189 * scan for them to avoid queuing races. Note however that 1190 * eventCount updates lag releases so usage requires care. 1191 * 1192 * Field plock is an int packed with: 1193 * SHUTDOWN: true if shutdown is enabled (1 bit) 1194 * SEQ: a sequence lock, with PL_LOCK bit set if locked (30 bits) 1195 * SIGNAL: set when threads may be waiting on the lock (1 bit) 1196 * 1197 * The sequence number enables simple consistency checks: 1198 * Staleness of read-only operations on the workQueues array can 1199 * be checked by comparing plock before vs after the reads. 1200 */ 1201 1202 // bit positions/shifts for fields 1203 private static final int AC_SHIFT = 48; 1204 private static final int TC_SHIFT = 32; 1205 private static final int ST_SHIFT = 31; 1206 private static final int EC_SHIFT = 16; 1207 1208 // bounds 1209 private static final int SMASK = 0xffff; // short bits 1210 private static final int MAX_CAP = 0x7fff; // max #workers - 1 1211 private static final int EVENMASK = 0xfffe; // even short bits 1212 private static final int SQMASK = 0x007e; // max 64 (even) slots 1213 private static final int SHORT_SIGN = 1 << 15; 1214 private static final int INT_SIGN = 1 << 31; 1215 1216 // masks 1217 private static final long STOP_BIT = 0x0001L << ST_SHIFT; 1218 private static final long AC_MASK = ((long)SMASK) << AC_SHIFT; 1219 private static final long TC_MASK = ((long)SMASK) << TC_SHIFT; 1220 1221 // units for incrementing and decrementing 1222 private static final long TC_UNIT = 1L << TC_SHIFT; 1223 private static final long AC_UNIT = 1L << AC_SHIFT; 1224 1225 // masks and units for dealing with u = (int)(ctl >>> 32) 1226 private static final int UAC_SHIFT = AC_SHIFT - 32; 1227 private static final int UTC_SHIFT = TC_SHIFT - 32; 1228 private static final int UAC_MASK = SMASK << UAC_SHIFT; 1229 private static final int UTC_MASK = SMASK << UTC_SHIFT; 1230 private static final int UAC_UNIT = 1 << UAC_SHIFT; 1231 private static final int UTC_UNIT = 1 << UTC_SHIFT; 1232 1233 // masks and units for dealing with e = (int)ctl 1234 private static final int E_MASK = 0x7fffffff; // no STOP_BIT 1235 private static final int E_SEQ = 1 << EC_SHIFT; 1236 1237 // plock bits 1238 private static final int SHUTDOWN = 1 << 31; 1239 private static final int PL_LOCK = 2; 1240 private static final int PL_SIGNAL = 1; 1241 private static final int PL_SPINS = 1 << 8; 1242 1243 // access mode for WorkQueue 1244 static final int LIFO_QUEUE = 0; 1245 static final int FIFO_QUEUE = 1; 1246 static final int SHARED_QUEUE = -1; 1247 1248 // bounds for #steps in scan loop -- must be power 2 minus 1 1249 private static final int MIN_SCAN = 0x1ff; // cover estimation slop 1250 private static final int MAX_SCAN = 0x1ffff; // 4 * max workers 1251 1252 // Instance fields 1253 1254 /* 1255 * Field layout of this class tends to matter more than one would 1256 * like. Runtime layout order is only loosely related to 1257 * declaration order and may differ across JVMs, but the following 1258 * empirically works OK on current JVMs. 1259 */ 1260 1261 // Heuristic padding to ameliorate unfortunate memory placements 1262 volatile long pad00, pad01, pad02, pad03, pad04, pad05, pad06; 1263 1264 volatile long stealCount; // collects worker counts 1265 volatile long ctl; // main pool control 1266 volatile int plock; // shutdown status and seqLock 1267 volatile int indexSeed; // worker/submitter index seed 1268 final int config; // mode and parallelism level 1269 WorkQueue[] workQueues; // main registry 1270 final ForkJoinWorkerThreadFactory factory; 1271 final Thread.UncaughtExceptionHandler ueh; // per-worker UEH 1272 final String workerNamePrefix; // to create worker name string 1273 1274 volatile Object pad10, pad11, pad12, pad13, pad14, pad15, pad16, pad17; 1275 volatile Object pad18, pad19, pad1a, pad1b; 1276 1277 /* 1278 * Acquires the plock lock to protect worker array and related 1279 * updates. This method is called only if an initial CAS on plock 1280 * fails. This acts as a spinLock for normal cases, but falls back 1281 * to builtin monitor to block when (rarely) needed. This would be 1282 * a terrible idea for a highly contended lock, but works fine as 1283 * a more conservative alternative to a pure spinlock. 1284 */ 1285 private int acquirePlock() { 1286 int spins = PL_SPINS, r = 0, ps, nps; 1287 for (;;) { 1288 if (((ps = plock) & PL_LOCK) == 0 && 1289 U.compareAndSwapInt(this, PLOCK, ps, nps = ps + PL_LOCK)) 1290 return nps; 1291 else if (r == 0) { // randomize spins if possible 1292 Thread t = Thread.currentThread(); WorkQueue w; Submitter z; 1293 if ((t instanceof ForkJoinWorkerThread) && 1294 (w = ((ForkJoinWorkerThread)t).workQueue) != null) 1295 r = w.seed; 1296 else if ((z = submitters.get()) != null) 1297 r = z.seed; 1298 else 1299 r = 1; 1300 } 1301 else if (spins >= 0) { 1302 r ^= r << 1; r ^= r >>> 3; r ^= r << 10; // xorshift 1303 if (r >= 0) 1304 --spins; 1305 } 1306 else if (U.compareAndSwapInt(this, PLOCK, ps, ps | PL_SIGNAL)) { 1307 synchronized (this) { 1308 if ((plock & PL_SIGNAL) != 0) { 1309 try { 1310 wait(); 1311 } catch (InterruptedException ie) { 1312 try { 1313 Thread.currentThread().interrupt(); 1314 } catch (SecurityException ignore) { 1315 } 1316 } 1317 } 1318 else 1319 notifyAll(); 1320 } 1321 } 1322 } 1323 } 1324 1325 /** 1326 * Unlocks and signals any thread waiting for plock. Called only 1327 * when CAS of seq value for unlock fails. 1328 */ 1329 private void releasePlock(int ps) { 1330 plock = ps; 1331 synchronized (this) { notifyAll(); } 1332 } 1333 1334 /** 1335 * Performs secondary initialization, called when plock is zero. 1336 * Creates workQueue array and sets plock to a valid value. The 1337 * lock body must be exception-free (so no try/finally) so we 1338 * optimistically allocate new array outside the lock and throw 1339 * away if (very rarely) not needed. (A similar tactic is used in 1340 * fullExternalPush.) Because the plock seq value can eventually 1341 * wrap around zero, this method harmlessly fails to reinitialize 1342 * if workQueues exists, while still advancing plock. 1343 * 1344 * Additionally tries to create the first worker. 1345 */ 1346 private void initWorkers() { 1347 WorkQueue[] ws, nws; int ps; 1348 int p = config & SMASK; // find power of two table size 1349 int n = (p > 1) ? p - 1 : 1; // ensure at least 2 slots 1350 n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; 1351 n = (n + 1) << 1; 1352 if ((ws = workQueues) == null || ws.length == 0) 1353 nws = new WorkQueue[n]; 1354 else 1355 nws = null; 1356 if (((ps = plock) & PL_LOCK) != 0 || 1357 !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK)) 1358 ps = acquirePlock(); 1359 if (((ws = workQueues) == null || ws.length == 0) && nws != null) 1360 workQueues = nws; 1361 int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN); 1362 if (!U.compareAndSwapInt(this, PLOCK, ps, nps)) 1363 releasePlock(nps); 1364 tryAddWorker(); 1365 } 1366 1367 /** 1368 * Tries to create and start one worker if fewer than target 1369 * parallelism level exist. Adjusts counts etc on failure. 1370 */ 1371 private void tryAddWorker() { 1372 long c; int u; 1373 while ((u = (int)((c = ctl) >>> 32)) < 0 && 1374 (u & SHORT_SIGN) != 0 && (int)c == 0) { 1375 long nc = (long)(((u + UTC_UNIT) & UTC_MASK) | 1376 ((u + UAC_UNIT) & UAC_MASK)) << 32; 1377 if (U.compareAndSwapLong(this, CTL, c, nc)) { 1378 ForkJoinWorkerThreadFactory fac; 1379 Throwable ex = null; 1380 ForkJoinWorkerThread wt = null; 1381 try { 1382 if ((fac = factory) != null && 1383 (wt = fac.newThread(this)) != null) { 1384 wt.start(); 1385 break; 1386 } 1387 } catch (Throwable e) { 1388 ex = e; 1389 } 1390 deregisterWorker(wt, ex); 1391 break; 1392 } 1393 } 1394 } 1395 1396 // Registering and deregistering workers 1397 1398 /** 1399 * Callback from ForkJoinWorkerThread to establish and record its 1400 * WorkQueue. To avoid scanning bias due to packing entries in 1401 * front of the workQueues array, we treat the array as a simple 1402 * power-of-two hash table using per-thread seed as hash, 1403 * expanding as needed. 1404 * 1405 * @param wt the worker thread 1406 * @return the worker's queue 1407 */ 1408 final WorkQueue registerWorker(ForkJoinWorkerThread wt) { 1409 Thread.UncaughtExceptionHandler handler; WorkQueue[] ws; int s, ps; 1410 wt.setDaemon(true); 1411 if ((handler = ueh) != null) 1412 wt.setUncaughtExceptionHandler(handler); 1413 do {} while (!U.compareAndSwapInt(this, INDEXSEED, s = indexSeed, 1414 s += SEED_INCREMENT) || 1415 s == 0); // skip 0 1416 WorkQueue w = new WorkQueue(this, wt, config >>> 16, s); 1417 if (((ps = plock) & PL_LOCK) != 0 || 1418 !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK)) 1419 ps = acquirePlock(); 1420 int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN); 1421 try { 1422 if ((ws = workQueues) != null) { // skip if shutting down 1423 int n = ws.length, m = n - 1; 1424 int r = (s << 1) | 1; // use odd-numbered indices 1425 if (ws[r &= m] != null) { // collision 1426 int probes = 0; // step by approx half size 1427 int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2; 1428 while (ws[r = (r + step) & m] != null) { 1429 if (++probes >= n) { 1430 workQueues = ws = Arrays.copyOf(ws, n <<= 1); 1431 m = n - 1; 1432 probes = 0; 1433 } 1434 } 1435 } 1436 w.eventCount = w.poolIndex = r; // volatile write orders 1437 ws[r] = w; 1438 } 1439 } finally { 1440 if (!U.compareAndSwapInt(this, PLOCK, ps, nps)) 1441 releasePlock(nps); 1442 } 1443 wt.setName(workerNamePrefix.concat(Integer.toString(w.poolIndex))); 1444 return w; 1445 } 1446 1447 /** 1448 * Final callback from terminating worker, as well as upon failure 1449 * to construct or start a worker. Removes record of worker from 1450 * array, and adjusts counts. If pool is shutting down, tries to 1451 * complete termination. 1452 * 1453 * @param wt the worker thread or null if construction failed 1454 * @param ex the exception causing failure, or null if none 1455 */ 1456 final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) { 1457 WorkQueue w = null; 1458 if (wt != null && (w = wt.workQueue) != null) { 1459 int ps; 1460 w.qlock = -1; // ensure set 1461 long ns = w.nsteals, sc; // collect steal count 1462 do {} while (!U.compareAndSwapLong(this, STEALCOUNT, 1463 sc = stealCount, sc + ns)); 1464 if (((ps = plock) & PL_LOCK) != 0 || 1465 !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK)) 1466 ps = acquirePlock(); 1467 int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN); 1468 try { 1469 int idx = w.poolIndex; 1470 WorkQueue[] ws = workQueues; 1471 if (ws != null && idx >= 0 && idx < ws.length && ws[idx] == w) 1472 ws[idx] = null; 1473 } finally { 1474 if (!U.compareAndSwapInt(this, PLOCK, ps, nps)) 1475 releasePlock(nps); 1476 } 1477 } 1478 1479 long c; // adjust ctl counts 1480 do {} while (!U.compareAndSwapLong 1481 (this, CTL, c = ctl, (((c - AC_UNIT) & AC_MASK) | 1482 ((c - TC_UNIT) & TC_MASK) | 1483 (c & ~(AC_MASK|TC_MASK))))); 1484 1485 if (!tryTerminate(false, false) && w != null && w.array != null) { 1486 w.cancelAll(); // cancel remaining tasks 1487 WorkQueue[] ws; WorkQueue v; Thread p; int u, i, e; 1488 while ((u = (int)((c = ctl) >>> 32)) < 0 && (e = (int)c) >= 0) { 1489 if (e > 0) { // activate or create replacement 1490 if ((ws = workQueues) == null || 1491 (i = e & SMASK) >= ws.length || 1492 (v = ws[i]) != null) 1493 break; 1494 long nc = (((long)(v.nextWait & E_MASK)) | 1495 ((long)(u + UAC_UNIT) << 32)); 1496 if (v.eventCount != (e | INT_SIGN)) 1497 break; 1498 if (U.compareAndSwapLong(this, CTL, c, nc)) { 1499 v.eventCount = (e + E_SEQ) & E_MASK; 1500 if ((p = v.parker) != null) 1501 U.unpark(p); 1502 break; 1503 } 1504 } 1505 else { 1506 if ((short)u < 0) 1507 tryAddWorker(); 1508 break; 1509 } 1510 } 1511 } 1512 if (ex == null) // help clean refs on way out 1513 ForkJoinTask.helpExpungeStaleExceptions(); 1514 else // rethrow 1515 ForkJoinTask.rethrow(ex); 1516 } 1517 1518 // Submissions 1519 1520 /** 1521 * Unless shutting down, adds the given task to a submission queue 1522 * at submitter's current queue index (modulo submission 1523 * range). Only the most common path is directly handled in this 1524 * method. All others are relayed to fullExternalPush. 1525 * 1526 * @param task the task. Caller must ensure non-null. 1527 */ 1528 final void externalPush(ForkJoinTask<?> task) { 1529 WorkQueue[] ws; WorkQueue q; Submitter z; int m; ForkJoinTask<?>[] a; 1530 if ((z = submitters.get()) != null && plock > 0 && 1531 (ws = workQueues) != null && (m = (ws.length - 1)) >= 0 && 1532 (q = ws[m & z.seed & SQMASK]) != null && 1533 U.compareAndSwapInt(q, QLOCK, 0, 1)) { // lock 1534 int b = q.base, s = q.top, n, an; 1535 if ((a = q.array) != null && (an = a.length) > (n = s + 1 - b)) { 1536 int j = (((an - 1) & s) << ASHIFT) + ABASE; 1537 U.putOrderedObject(a, j, task); 1538 q.top = s + 1; // push on to deque 1539 q.qlock = 0; 1540 if (n <= 2) 1541 signalWork(q); 1542 return; 1543 } 1544 q.qlock = 0; 1545 } 1546 fullExternalPush(task); 1547 } 1548 1549 /** 1550 * Full version of externalPush. This method is called, among 1551 * other times, upon the first submission of the first task to the 1552 * pool, so must perform secondary initialization (via 1553 * initWorkers). It also detects first submission by an external 1554 * thread by looking up its ThreadLocal, and creates a new shared 1555 * queue if the one at index if empty or contended. The plock lock 1556 * body must be exception-free (so no try/finally) so we 1557 * optimistically allocate new queues outside the lock and throw 1558 * them away if (very rarely) not needed. 1559 */ 1560 private void fullExternalPush(ForkJoinTask<?> task) { 1561 int r = 0; // random index seed 1562 for (Submitter z = submitters.get();;) { 1563 WorkQueue[] ws; WorkQueue q; int ps, m, k; 1564 if (z == null) { 1565 if (U.compareAndSwapInt(this, INDEXSEED, r = indexSeed, 1566 r += SEED_INCREMENT) && r != 0) 1567 submitters.set(z = new Submitter(r)); 1568 } 1569 else if (r == 0) { // move to a different index 1570 r = z.seed; 1571 r ^= r << 13; // same xorshift as WorkQueues 1572 r ^= r >>> 17; 1573 z.seed = r ^ (r << 5); 1574 } 1575 else if ((ps = plock) < 0) 1576 throw new RejectedExecutionException(); 1577 else if (ps == 0 || (ws = workQueues) == null || 1578 (m = ws.length - 1) < 0) 1579 initWorkers(); 1580 else if ((q = ws[k = r & m & SQMASK]) != null) { 1581 if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) { 1582 ForkJoinTask<?>[] a = q.array; 1583 int s = q.top; 1584 boolean submitted = false; 1585 try { // locked version of push 1586 if ((a != null && a.length > s + 1 - q.base) || 1587 (a = q.growArray()) != null) { // must presize 1588 int j = (((a.length - 1) & s) << ASHIFT) + ABASE; 1589 U.putOrderedObject(a, j, task); 1590 q.top = s + 1; 1591 submitted = true; 1592 } 1593 } finally { 1594 q.qlock = 0; // unlock 1595 } 1596 if (submitted) { 1597 signalWork(q); 1598 return; 1599 } 1600 } 1601 r = 0; // move on failure 1602 } 1603 else if (((ps = plock) & PL_LOCK) == 0) { // create new queue 1604 q = new WorkQueue(this, null, SHARED_QUEUE, r); 1605 if (((ps = plock) & PL_LOCK) != 0 || 1606 !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK)) 1607 ps = acquirePlock(); 1608 if ((ws = workQueues) != null && k < ws.length && ws[k] == null) 1609 ws[k] = q; 1610 int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN); 1611 if (!U.compareAndSwapInt(this, PLOCK, ps, nps)) 1612 releasePlock(nps); 1613 } 1614 else 1615 r = 0; // try elsewhere while lock held 1616 } 1617 } 1618 1619 // Maintaining ctl counts 1620 1621 /** 1622 * Increments active count; mainly called upon return from blocking. 1623 */ 1624 final void incrementActiveCount() { 1625 long c; 1626 do {} while (!U.compareAndSwapLong(this, CTL, c = ctl, c + AC_UNIT)); 1627 } 1628 1629 /** 1630 * Tries to create or activate a worker if too few are active. 1631 * 1632 * @param q the (non-null) queue holding tasks to be signalled 1633 */ 1634 final void signalWork(WorkQueue q) { 1635 int hint = q.poolIndex; 1636 long c; int e, u, i, n; WorkQueue[] ws; WorkQueue w; Thread p; 1637 while ((u = (int)((c = ctl) >>> 32)) < 0) { 1638 if ((e = (int)c) > 0) { 1639 if ((ws = workQueues) != null && ws.length > (i = e & SMASK) && 1640 (w = ws[i]) != null && w.eventCount == (e | INT_SIGN)) { 1641 long nc = (((long)(w.nextWait & E_MASK)) | 1642 ((long)(u + UAC_UNIT) << 32)); 1643 if (U.compareAndSwapLong(this, CTL, c, nc)) { 1644 w.hint = hint; 1645 w.eventCount = (e + E_SEQ) & E_MASK; 1646 if ((p = w.parker) != null) 1647 U.unpark(p); 1648 break; 1649 } 1650 if (q.top - q.base <= 0) 1651 break; 1652 } 1653 else 1654 break; 1655 } 1656 else { 1657 if ((short)u < 0) 1658 tryAddWorker(); 1659 break; 1660 } 1661 } 1662 } 1663 1664 // Scanning for tasks 1665 1666 /** 1667 * Top-level runloop for workers, called by ForkJoinWorkerThread.run. 1668 */ 1669 final void runWorker(WorkQueue w) { 1670 w.growArray(); // allocate queue 1671 do { w.runTask(scan(w)); } while (w.qlock >= 0); 1672 } 1673 1674 /** 1675 * Scans for and, if found, returns one task, else possibly 1676 * inactivates the worker. This method operates on single reads of 1677 * volatile state and is designed to be re-invoked continuously, 1678 * in part because it returns upon detecting inconsistencies, 1679 * contention, or state changes that indicate possible success on 1680 * re-invocation. 1681 * 1682 * The scan searches for tasks across queues (starting at a random 1683 * index, and relying on registerWorker to irregularly scatter 1684 * them within array to avoid bias), checking each at least twice. 1685 * The scan terminates upon either finding a non-empty queue, or 1686 * completing the sweep. If the worker is not inactivated, it 1687 * takes and returns a task from this queue. Otherwise, if not 1688 * activated, it signals workers (that may include itself) and 1689 * returns so caller can retry. Also returns for true if the 1690 * worker array may have changed during an empty scan. On failure 1691 * to find a task, we take one of the following actions, after 1692 * which the caller will retry calling this method unless 1693 * terminated. 1694 * 1695 * * If pool is terminating, terminate the worker. 1696 * 1697 * * If not already enqueued, try to inactivate and enqueue the 1698 * worker on wait queue. Or, if inactivating has caused the pool 1699 * to be quiescent, relay to idleAwaitWork to possibly shrink 1700 * pool. 1701 * 1702 * * If already enqueued and none of the above apply, possibly 1703 * park awaiting signal, else lingering to help scan and signal. 1704 * 1705 * * If a non-empty queue discovered or left as a hint, 1706 * help wake up other workers before return 1707 * 1708 * @param w the worker (via its WorkQueue) 1709 * @return a task or null if none found 1710 */ 1711 private final ForkJoinTask<?> scan(WorkQueue w) { 1712 WorkQueue[] ws; int m; 1713 int ps = plock; // read plock before ws 1714 if (w != null && (ws = workQueues) != null && (m = ws.length - 1) >= 0) { 1715 int ec = w.eventCount; // ec is negative if inactive 1716 int r = w.seed; r ^= r << 13; r ^= r >>> 17; w.seed = r ^= r << 5; 1717 w.hint = -1; // update seed and clear hint 1718 int j = ((m + m + 1) | MIN_SCAN) & MAX_SCAN; 1719 do { 1720 WorkQueue q; ForkJoinTask<?>[] a; int b; 1721 if ((q = ws[(r + j) & m]) != null && (b = q.base) - q.top < 0 && 1722 (a = q.array) != null) { // probably nonempty 1723 int i = (((a.length - 1) & b) << ASHIFT) + ABASE; 1724 ForkJoinTask<?> t = (ForkJoinTask<?>) 1725 U.getObjectVolatile(a, i); 1726 if (q.base == b && ec >= 0 && t != null && 1727 U.compareAndSwapObject(a, i, t, null)) { 1728 if ((q.base = b + 1) - q.top < 0) 1729 signalWork(q); 1730 return t; // taken 1731 } 1732 else if ((ec < 0 || j < m) && (int)(ctl >> AC_SHIFT) <= 0) { 1733 w.hint = (r + j) & m; // help signal below 1734 break; // cannot take 1735 } 1736 } 1737 } while (--j >= 0); 1738 1739 int h, e, ns; long c, sc; WorkQueue q; 1740 if ((ns = w.nsteals) != 0) { 1741 if (U.compareAndSwapLong(this, STEALCOUNT, 1742 sc = stealCount, sc + ns)) 1743 w.nsteals = 0; // collect steals and rescan 1744 } 1745 else if (plock != ps) // consistency check 1746 ; // skip 1747 else if ((e = (int)(c = ctl)) < 0) 1748 w.qlock = -1; // pool is terminating 1749 else { 1750 if ((h = w.hint) < 0) { 1751 if (ec >= 0) { // try to enqueue/inactivate 1752 long nc = (((long)ec | 1753 ((c - AC_UNIT) & (AC_MASK|TC_MASK)))); 1754 w.nextWait = e; // link and mark inactive 1755 w.eventCount = ec | INT_SIGN; 1756 if (ctl != c || !U.compareAndSwapLong(this, CTL, c, nc)) 1757 w.eventCount = ec; // unmark on CAS failure 1758 else if ((int)(c >> AC_SHIFT) == 1 - (config & SMASK)) 1759 idleAwaitWork(w, nc, c); 1760 } 1761 else if (w.eventCount < 0 && !tryTerminate(false, false) && 1762 ctl == c) { // block 1763 Thread wt = Thread.currentThread(); 1764 Thread.interrupted(); // clear status 1765 U.putObject(wt, PARKBLOCKER, this); 1766 w.parker = wt; // emulate LockSupport.park 1767 if (w.eventCount < 0) // recheck 1768 U.park(false, 0L); 1769 w.parker = null; 1770 U.putObject(wt, PARKBLOCKER, null); 1771 } 1772 } 1773 if ((h >= 0 || (h = w.hint) >= 0) && 1774 (ws = workQueues) != null && h < ws.length && 1775 (q = ws[h]) != null) { // signal others before retry 1776 WorkQueue v; Thread p; int u, i, s; 1777 for (int n = (config & SMASK) >>> 1;;) { 1778 int idleCount = (w.eventCount < 0) ? 0 : -1; 1779 if (((s = idleCount - q.base + q.top) <= n && 1780 (n = s) <= 0) || 1781 (u = (int)((c = ctl) >>> 32)) >= 0 || 1782 (e = (int)c) <= 0 || m < (i = e & SMASK) || 1783 (v = ws[i]) == null) 1784 break; 1785 long nc = (((long)(v.nextWait & E_MASK)) | 1786 ((long)(u + UAC_UNIT) << 32)); 1787 if (v.eventCount != (e | INT_SIGN) || 1788 !U.compareAndSwapLong(this, CTL, c, nc)) 1789 break; 1790 v.hint = h; 1791 v.eventCount = (e + E_SEQ) & E_MASK; 1792 if ((p = v.parker) != null) 1793 U.unpark(p); 1794 if (--n <= 0) 1795 break; 1796 } 1797 } 1798 } 1799 } 1800 return null; 1801 } 1802 1803 /** 1804 * If inactivating worker w has caused the pool to become 1805 * quiescent, checks for pool termination, and, so long as this is 1806 * not the only worker, waits for event for up to a given 1807 * duration. On timeout, if ctl has not changed, terminates the 1808 * worker, which will in turn wake up another worker to possibly 1809 * repeat this process. 1810 * 1811 * @param w the calling worker 1812 * @param currentCtl the ctl value triggering possible quiescence 1813 * @param prevCtl the ctl value to restore if thread is terminated 1814 */ 1815 private void idleAwaitWork(WorkQueue w, long currentCtl, long prevCtl) { 1816 if (w != null && w.eventCount < 0 && 1817 !tryTerminate(false, false) && (int)prevCtl != 0) { 1818 int dc = -(short)(currentCtl >>> TC_SHIFT); 1819 long parkTime = dc < 0 ? FAST_IDLE_TIMEOUT: (dc + 1) * IDLE_TIMEOUT; 1820 long deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP; 1821 Thread wt = Thread.currentThread(); 1822 while (ctl == currentCtl) { 1823 Thread.interrupted(); // timed variant of version in scan() 1824 U.putObject(wt, PARKBLOCKER, this); 1825 w.parker = wt; 1826 if (ctl == currentCtl) 1827 U.park(false, parkTime); 1828 w.parker = null; 1829 U.putObject(wt, PARKBLOCKER, null); 1830 if (ctl != currentCtl) 1831 break; 1832 if (deadline - System.nanoTime() <= 0L && 1833 U.compareAndSwapLong(this, CTL, currentCtl, prevCtl)) { 1834 w.eventCount = (w.eventCount + E_SEQ) | E_MASK; 1835 w.qlock = -1; // shrink 1836 break; 1837 } 1838 } 1839 } 1840 } 1841 1842 /** 1843 * Scans through queues looking for work while joining a task; if 1844 * any present, signals. May return early if more signalling is 1845 * detectably unneeded. 1846 * 1847 * @param task return early if done 1848 * @param origin an index to start scan 1849 */ 1850 private void helpSignal(ForkJoinTask<?> task, int origin) { 1851 WorkQueue[] ws; WorkQueue w; Thread p; long c; int m, u, e, i, s; 1852 if (task != null && task.status >= 0 && 1853 (u = (int)(ctl >>> 32)) < 0 && (u >> UAC_SHIFT) < 0 && 1854 (ws = workQueues) != null && (m = ws.length - 1) >= 0) { 1855 outer: for (int k = origin, j = m; j >= 0; --j) { 1856 WorkQueue q = ws[k++ & m]; 1857 for (int n = m;;) { // limit to at most m signals 1858 if (task.status < 0) 1859 break outer; 1860 if (q == null || 1861 ((s = -q.base + q.top) <= n && (n = s) <= 0)) 1862 break; 1863 if ((u = (int)((c = ctl) >>> 32)) >= 0 || 1864 (e = (int)c) <= 0 || m < (i = e & SMASK) || 1865 (w = ws[i]) == null) 1866 break outer; 1867 long nc = (((long)(w.nextWait & E_MASK)) | 1868 ((long)(u + UAC_UNIT) << 32)); 1869 if (w.eventCount != (e | INT_SIGN)) 1870 break outer; 1871 if (U.compareAndSwapLong(this, CTL, c, nc)) { 1872 w.eventCount = (e + E_SEQ) & E_MASK; 1873 if ((p = w.parker) != null) 1874 U.unpark(p); 1875 if (--n <= 0) 1876 break; 1877 } 1878 } 1879 } 1880 } 1881 } 1882 1883 /** 1884 * Tries to locate and execute tasks for a stealer of the given 1885 * task, or in turn one of its stealers, Traces currentSteal -> 1886 * currentJoin links looking for a thread working on a descendant 1887 * of the given task and with a non-empty queue to steal back and 1888 * execute tasks from. The first call to this method upon a 1889 * waiting join will often entail scanning/search, (which is OK 1890 * because the joiner has nothing better to do), but this method 1891 * leaves hints in workers to speed up subsequent calls. The 1892 * implementation is very branchy to cope with potential 1893 * inconsistencies or loops encountering chains that are stale, 1894 * unknown, or so long that they are likely cyclic. 1895 * 1896 * @param joiner the joining worker 1897 * @param task the task to join 1898 * @return 0 if no progress can be made, negative if task 1899 * known complete, else positive 1900 */ 1901 private int tryHelpStealer(WorkQueue joiner, ForkJoinTask<?> task) { 1902 int stat = 0, steps = 0; // bound to avoid cycles 1903 if (joiner != null && task != null) { // hoist null checks 1904 restart: for (;;) { 1905 ForkJoinTask<?> subtask = task; // current target 1906 for (WorkQueue j = joiner, v;;) { // v is stealer of subtask 1907 WorkQueue[] ws; int m, s, h; 1908 if ((s = task.status) < 0) { 1909 stat = s; 1910 break restart; 1911 } 1912 if ((ws = workQueues) == null || (m = ws.length - 1) <= 0) 1913 break restart; // shutting down 1914 if ((v = ws[h = (j.hint | 1) & m]) == null || 1915 v.currentSteal != subtask) { 1916 for (int origin = h;;) { // find stealer 1917 if (((h = (h + 2) & m) & 15) == 1 && 1918 (subtask.status < 0 || j.currentJoin != subtask)) 1919 continue restart; // occasional staleness check 1920 if ((v = ws[h]) != null && 1921 v.currentSteal == subtask) { 1922 j.hint = h; // save hint 1923 break; 1924 } 1925 if (h == origin) 1926 break restart; // cannot find stealer 1927 } 1928 } 1929 for (;;) { // help stealer or descend to its stealer 1930 ForkJoinTask[] a; int b; 1931 if (subtask.status < 0) // surround probes with 1932 continue restart; // consistency checks 1933 if ((b = v.base) - v.top < 0 && (a = v.array) != null) { 1934 int i = (((a.length - 1) & b) << ASHIFT) + ABASE; 1935 ForkJoinTask<?> t = 1936 (ForkJoinTask<?>)U.getObjectVolatile(a, i); 1937 if (subtask.status < 0 || j.currentJoin != subtask || 1938 v.currentSteal != subtask) 1939 continue restart; // stale 1940 stat = 1; // apparent progress 1941 if (t != null && v.base == b && 1942 U.compareAndSwapObject(a, i, t, null)) { 1943 v.base = b + 1; // help stealer 1944 joiner.runSubtask(t); 1945 } 1946 else if (v.base == b && ++steps == MAX_HELP) 1947 break restart; // v apparently stalled 1948 } 1949 else { // empty -- try to descend 1950 ForkJoinTask<?> next = v.currentJoin; 1951 if (subtask.status < 0 || j.currentJoin != subtask || 1952 v.currentSteal != subtask) 1953 continue restart; // stale 1954 else if (next == null || ++steps == MAX_HELP) 1955 break restart; // dead-end or maybe cyclic 1956 else { 1957 subtask = next; 1958 j = v; 1959 break; 1960 } 1961 } 1962 } 1963 } 1964 } 1965 } 1966 return stat; 1967 } 1968 1969 /** 1970 * Analog of tryHelpStealer for CountedCompleters. Tries to steal 1971 * and run tasks within the target's computation. 1972 * 1973 * @param task the task to join 1974 * @param mode if shared, exit upon completing any task 1975 * if all workers are active 1976 * 1977 */ 1978 private int helpComplete(ForkJoinTask<?> task, int mode) { 1979 WorkQueue[] ws; WorkQueue q; int m, n, s, u; 1980 if (task != null && (ws = workQueues) != null && 1981 (m = ws.length - 1) >= 0) { 1982 for (int j = 1, origin = j;;) { 1983 if ((s = task.status) < 0) 1984 return s; 1985 if ((q = ws[j & m]) != null && q.pollAndExecCC(task)) { 1986 origin = j; 1987 if (mode == SHARED_QUEUE && 1988 ((u = (int)(ctl >>> 32)) >= 0 || (u >> UAC_SHIFT) >= 0)) 1989 break; 1990 } 1991 else if ((j = (j + 2) & m) == origin) 1992 break; 1993 } 1994 } 1995 return 0; 1996 } 1997 1998 /** 1999 * Tries to decrement active count (sometimes implicitly) and 2000 * possibly release or create a compensating worker in preparation 2001 * for blocking. Fails on contention or termination. Otherwise, 2002 * adds a new thread if no idle workers are available and pool 2003 * may become starved. 2004 */ 2005 final boolean tryCompensate() { 2006 int pc = config & SMASK, e, i, tc; long c; 2007 WorkQueue[] ws; WorkQueue w; Thread p; 2008 if ((ws = workQueues) != null && (e = (int)(c = ctl)) >= 0) { 2009 if (e != 0 && (i = e & SMASK) < ws.length && 2010 (w = ws[i]) != null && w.eventCount == (e | INT_SIGN)) { 2011 long nc = ((long)(w.nextWait & E_MASK) | 2012 (c & (AC_MASK|TC_MASK))); 2013 if (U.compareAndSwapLong(this, CTL, c, nc)) { 2014 w.eventCount = (e + E_SEQ) & E_MASK; 2015 if ((p = w.parker) != null) 2016 U.unpark(p); 2017 return true; // replace with idle worker 2018 } 2019 } 2020 else if ((tc = (short)(c >>> TC_SHIFT)) >= 0 && 2021 (int)(c >> AC_SHIFT) + pc > 1) { 2022 long nc = ((c - AC_UNIT) & AC_MASK) | (c & ~AC_MASK); 2023 if (U.compareAndSwapLong(this, CTL, c, nc)) 2024 return true; // no compensation 2025 } 2026 else if (tc + pc < MAX_CAP) { 2027 long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK); 2028 if (U.compareAndSwapLong(this, CTL, c, nc)) { 2029 ForkJoinWorkerThreadFactory fac; 2030 Throwable ex = null; 2031 ForkJoinWorkerThread wt = null; 2032 try { 2033 if ((fac = factory) != null && 2034 (wt = fac.newThread(this)) != null) { 2035 wt.start(); 2036 return true; 2037 } 2038 } catch (Throwable rex) { 2039 ex = rex; 2040 } 2041 deregisterWorker(wt, ex); // clean up and return false 2042 } 2043 } 2044 } 2045 return false; 2046 } 2047 2048 /** 2049 * Helps and/or blocks until the given task is done. 2050 * 2051 * @param joiner the joining worker 2052 * @param task the task 2053 * @return task status on exit 2054 */ 2055 final int awaitJoin(WorkQueue joiner, ForkJoinTask<?> task) { 2056 int s = 0; 2057 if (joiner != null && task != null && (s = task.status) >= 0) { 2058 ForkJoinTask<?> prevJoin = joiner.currentJoin; 2059 joiner.currentJoin = task; 2060 do {} while ((s = task.status) >= 0 && !joiner.isEmpty() && 2061 joiner.tryRemoveAndExec(task)); // process local tasks 2062 if (s >= 0 && (s = task.status) >= 0) { 2063 helpSignal(task, joiner.poolIndex); 2064 if ((s = task.status) >= 0 && 2065 (task instanceof CountedCompleter)) 2066 s = helpComplete(task, LIFO_QUEUE); 2067 } 2068 while (s >= 0 && (s = task.status) >= 0) { 2069 if ((!joiner.isEmpty() || // try helping 2070 (s = tryHelpStealer(joiner, task)) == 0) && 2071 (s = task.status) >= 0) { 2072 helpSignal(task, joiner.poolIndex); 2073 if ((s = task.status) >= 0 && tryCompensate()) { 2074 if (task.trySetSignal() && (s = task.status) >= 0) { 2075 synchronized (task) { 2076 if (task.status >= 0) { 2077 try { // see ForkJoinTask 2078 task.wait(); // for explanation 2079 } catch (InterruptedException ie) { 2080 } 2081 } 2082 else 2083 task.notifyAll(); 2084 } 2085 } 2086 long c; // re-activate 2087 do {} while (!U.compareAndSwapLong 2088 (this, CTL, c = ctl, c + AC_UNIT)); 2089 } 2090 } 2091 } 2092 joiner.currentJoin = prevJoin; 2093 } 2094 return s; 2095 } 2096 2097 /** 2098 * Stripped-down variant of awaitJoin used by timed joins. Tries 2099 * to help join only while there is continuous progress. (Caller 2100 * will then enter a timed wait.) 2101 * 2102 * @param joiner the joining worker 2103 * @param task the task 2104 */ 2105 final void helpJoinOnce(WorkQueue joiner, ForkJoinTask<?> task) { 2106 int s; 2107 if (joiner != null && task != null && (s = task.status) >= 0) { 2108 ForkJoinTask<?> prevJoin = joiner.currentJoin; 2109 joiner.currentJoin = task; 2110 do {} while ((s = task.status) >= 0 && !joiner.isEmpty() && 2111 joiner.tryRemoveAndExec(task)); 2112 if (s >= 0 && (s = task.status) >= 0) { 2113 helpSignal(task, joiner.poolIndex); 2114 if ((s = task.status) >= 0 && 2115 (task instanceof CountedCompleter)) 2116 s = helpComplete(task, LIFO_QUEUE); 2117 } 2118 if (s >= 0 && joiner.isEmpty()) { 2119 do {} while (task.status >= 0 && 2120 tryHelpStealer(joiner, task) > 0); 2121 } 2122 joiner.currentJoin = prevJoin; 2123 } 2124 } 2125 2126 /** 2127 * Returns a (probably) non-empty steal queue, if one is found 2128 * during a random, then cyclic scan, else null. This method must 2129 * be retried by caller if, by the time it tries to use the queue, 2130 * it is empty. 2131 * @param r a (random) seed for scanning 2132 */ 2133 private WorkQueue findNonEmptyStealQueue(int r) { 2134 for (WorkQueue[] ws;;) { 2135 int ps = plock, m, n; 2136 if ((ws = workQueues) == null || (m = ws.length - 1) < 1) 2137 return null; 2138 for (int j = (m + 1) << 2; ;) { 2139 WorkQueue q = ws[(((r + j) << 1) | 1) & m]; 2140 if (q != null && (n = q.base - q.top) < 0) { 2141 if (n < -1) 2142 signalWork(q); 2143 return q; 2144 } 2145 else if (--j < 0) { 2146 if (plock == ps) 2147 return null; 2148 break; 2149 } 2150 } 2151 } 2152 } 2153 2154 /** 2155 * Runs tasks until {@code isQuiescent()}. We piggyback on 2156 * active count ctl maintenance, but rather than blocking 2157 * when tasks cannot be found, we rescan until all others cannot 2158 * find tasks either. 2159 */ 2160 final void helpQuiescePool(WorkQueue w) { 2161 for (boolean active = true;;) { 2162 ForkJoinTask<?> localTask; // exhaust local queue 2163 while ((localTask = w.nextLocalTask()) != null) 2164 localTask.doExec(); 2165 // Similar to loop in scan(), but ignoring submissions 2166 WorkQueue q = findNonEmptyStealQueue(w.nextSeed()); 2167 if (q != null) { 2168 ForkJoinTask<?> t; int b; 2169 if (!active) { // re-establish active count 2170 long c; 2171 active = true; 2172 do {} while (!U.compareAndSwapLong 2173 (this, CTL, c = ctl, c + AC_UNIT)); 2174 } 2175 if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) 2176 w.runSubtask(t); 2177 } 2178 else { 2179 long c; 2180 if (active) { // decrement active count without queuing 2181 active = false; 2182 do {} while (!U.compareAndSwapLong 2183 (this, CTL, c = ctl, c -= AC_UNIT)); 2184 } 2185 else 2186 c = ctl; // re-increment on exit 2187 if ((int)(c >> AC_SHIFT) + (config & SMASK) == 0) { 2188 do {} while (!U.compareAndSwapLong 2189 (this, CTL, c = ctl, c + AC_UNIT)); 2190 break; 2191 } 2192 } 2193 } 2194 } 2195 2196 /** 2197 * Gets and removes a local or stolen task for the given worker. 2198 * 2199 * @return a task, if available 2200 */ 2201 final ForkJoinTask<?> nextTaskFor(WorkQueue w) { 2202 for (ForkJoinTask<?> t;;) { 2203 WorkQueue q; int b; 2204 if ((t = w.nextLocalTask()) != null) 2205 return t; 2206 if ((q = findNonEmptyStealQueue(w.nextSeed())) == null) 2207 return null; 2208 if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) 2209 return t; 2210 } 2211 } 2212 2213 /** 2214 * Returns a cheap heuristic guide for task partitioning when 2215 * programmers, frameworks, tools, or languages have little or no 2216 * idea about task granularity. In essence by offering this 2217 * method, we ask users only about tradeoffs in overhead vs 2218 * expected throughput and its variance, rather than how finely to 2219 * partition tasks. 2220 * 2221 * In a steady state strict (tree-structured) computation, each 2222 * thread makes available for stealing enough tasks for other 2223 * threads to remain active. Inductively, if all threads play by 2224 * the same rules, each thread should make available only a 2225 * constant number of tasks. 2226 * 2227 * The minimum useful constant is just 1. But using a value of 1 2228 * would require immediate replenishment upon each steal to 2229 * maintain enough tasks, which is infeasible. Further, 2230 * partitionings/granularities of offered tasks should minimize 2231 * steal rates, which in general means that threads nearer the top 2232 * of computation tree should generate more than those nearer the 2233 * bottom. In perfect steady state, each thread is at 2234 * approximately the same level of computation tree. However, 2235 * producing extra tasks amortizes the uncertainty of progress and 2236 * diffusion assumptions. 2237 * 2238 * So, users will want to use values larger, but not much larger 2239 * than 1 to both smooth over transient shortages and hedge 2240 * against uneven progress; as traded off against the cost of 2241 * extra task overhead. We leave the user to pick a threshold 2242 * value to compare with the results of this call to guide 2243 * decisions, but recommend values such as 3. 2244 * 2245 * When all threads are active, it is on average OK to estimate 2246 * surplus strictly locally. In steady-state, if one thread is 2247 * maintaining say 2 surplus tasks, then so are others. So we can 2248 * just use estimated queue length. However, this strategy alone 2249 * leads to serious mis-estimates in some non-steady-state 2250 * conditions (ramp-up, ramp-down, other stalls). We can detect 2251 * many of these by further considering the number of "idle" 2252 * threads, that are known to have zero queued tasks, so 2253 * compensate by a factor of (#idle/#active) threads. 2254 * 2255 * Note: The approximation of #busy workers as #active workers is 2256 * not very good under current signalling scheme, and should be 2257 * improved. 2258 */ 2259 static int getSurplusQueuedTaskCount() { 2260 Thread t; ForkJoinWorkerThread wt; ForkJoinPool pool; WorkQueue q; 2261 if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)) { 2262 int p = (pool = (wt = (ForkJoinWorkerThread)t).pool).config & SMASK; 2263 int n = (q = wt.workQueue).top - q.base; 2264 int a = (int)(pool.ctl >> AC_SHIFT) + p; 2265 return n - (a > (p >>>= 1) ? 0 : 2266 a > (p >>>= 1) ? 1 : 2267 a > (p >>>= 1) ? 2 : 2268 a > (p >>>= 1) ? 4 : 2269 8); 2270 } 2271 return 0; 2272 } 2273 2274 // Termination 2275 2276 /** 2277 * Possibly initiates and/or completes termination. The caller 2278 * triggering termination runs three passes through workQueues: 2279 * (0) Setting termination status, followed by wakeups of queued 2280 * workers; (1) cancelling all tasks; (2) interrupting lagging 2281 * threads (likely in external tasks, but possibly also blocked in 2282 * joins). Each pass repeats previous steps because of potential 2283 * lagging thread creation. 2284 * 2285 * @param now if true, unconditionally terminate, else only 2286 * if no work and no active workers 2287 * @param enable if true, enable shutdown when next possible 2288 * @return true if now terminating or terminated 2289 */ 2290 private boolean tryTerminate(boolean now, boolean enable) { 2291 if (this == commonPool) // cannot shut down 2292 return false; 2293 for (long c;;) { 2294 if (((c = ctl) & STOP_BIT) != 0) { // already terminating 2295 if ((short)(c >>> TC_SHIFT) == -(config & SMASK)) { 2296 synchronized (this) { 2297 notifyAll(); // signal when 0 workers 2298 } 2299 } 2300 return true; 2301 } 2302 if (plock >= 0) { // not yet enabled 2303 int ps; 2304 if (!enable) 2305 return false; 2306 if (((ps = plock) & PL_LOCK) != 0 || 2307 !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK)) 2308 ps = acquirePlock(); 2309 if (!U.compareAndSwapInt(this, PLOCK, ps, SHUTDOWN)) 2310 releasePlock(SHUTDOWN); 2311 } 2312 if (!now) { // check if idle & no tasks 2313 if ((int)(c >> AC_SHIFT) != -(config & SMASK) || 2314 hasQueuedSubmissions()) 2315 return false; 2316 // Check for unqueued inactive workers. One pass suffices. 2317 WorkQueue[] ws = workQueues; WorkQueue w; 2318 if (ws != null) { 2319 for (int i = 1; i < ws.length; i += 2) { 2320 if ((w = ws[i]) != null && w.eventCount >= 0) 2321 return false; 2322 } 2323 } 2324 } 2325 if (U.compareAndSwapLong(this, CTL, c, c | STOP_BIT)) { 2326 for (int pass = 0; pass < 3; ++pass) { 2327 WorkQueue[] ws = workQueues; 2328 if (ws != null) { 2329 WorkQueue w; Thread wt; 2330 int n = ws.length; 2331 for (int i = 0; i < n; ++i) { 2332 if ((w = ws[i]) != null) { 2333 w.qlock = -1; 2334 if (pass > 0) { 2335 w.cancelAll(); 2336 if (pass > 1 && (wt = w.owner) != null) { 2337 if (!wt.isInterrupted()) { 2338 try { 2339 wt.interrupt(); 2340 } catch (SecurityException ignore) { 2341 } 2342 } 2343 U.unpark(wt); 2344 } 2345 } 2346 } 2347 } 2348 // Wake up workers parked on event queue 2349 int i, e; long cc; Thread p; 2350 while ((e = (int)(cc = ctl) & E_MASK) != 0 && 2351 (i = e & SMASK) < n && 2352 (w = ws[i]) != null) { 2353 long nc = ((long)(w.nextWait & E_MASK) | 2354 ((cc + AC_UNIT) & AC_MASK) | 2355 (cc & (TC_MASK|STOP_BIT))); 2356 if (w.eventCount == (e | INT_SIGN) && 2357 U.compareAndSwapLong(this, CTL, cc, nc)) { 2358 w.eventCount = (e + E_SEQ) & E_MASK; 2359 w.qlock = -1; 2360 if ((p = w.parker) != null) 2361 U.unpark(p); 2362 } 2363 } 2364 } 2365 } 2366 } 2367 } 2368 } 2369 2370 // external operations on common pool 2371 2372 /** 2373 * Returns common pool queue for a thread that has submitted at 2374 * least one task. 2375 */ 2376 static WorkQueue commonSubmitterQueue() { 2377 ForkJoinPool p; WorkQueue[] ws; int m; Submitter z; 2378 return ((z = submitters.get()) != null && 2379 (p = commonPool) != null && 2380 (ws = p.workQueues) != null && 2381 (m = ws.length - 1) >= 0) ? 2382 ws[m & z.seed & SQMASK] : null; 2383 } 2384 2385 /** 2386 * Tries to pop the given task from submitter's queue in common pool. 2387 */ 2388 static boolean tryExternalUnpush(ForkJoinTask<?> t) { 2389 ForkJoinPool p; WorkQueue[] ws; WorkQueue q; Submitter z; 2390 ForkJoinTask<?>[] a; int m, s; 2391 if (t != null && 2392 (z = submitters.get()) != null && 2393 (p = commonPool) != null && 2394 (ws = p.workQueues) != null && 2395 (m = ws.length - 1) >= 0 && 2396 (q = ws[m & z.seed & SQMASK]) != null && 2397 (s = q.top) != q.base && 2398 (a = q.array) != null) { 2399 long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE; 2400 if (U.getObject(a, j) == t && 2401 U.compareAndSwapInt(q, QLOCK, 0, 1)) { 2402 if (q.array == a && q.top == s && // recheck 2403 U.compareAndSwapObject(a, j, t, null)) { 2404 q.top = s - 1; 2405 q.qlock = 0; 2406 return true; 2407 } 2408 q.qlock = 0; 2409 } 2410 } 2411 return false; 2412 } 2413 2414 /** 2415 * Tries to pop and run local tasks within the same computation 2416 * as the given root. On failure, tries to help complete from 2417 * other queues via helpComplete. 2418 */ 2419 private void externalHelpComplete(WorkQueue q, ForkJoinTask<?> root) { 2420 ForkJoinTask<?>[] a; int m; 2421 if (q != null && (a = q.array) != null && (m = (a.length - 1)) >= 0 && 2422 root != null && root.status >= 0) { 2423 for (;;) { 2424 int s, u; Object o; CountedCompleter<?> task = null; 2425 if ((s = q.top) - q.base > 0) { 2426 long j = ((m & (s - 1)) << ASHIFT) + ABASE; 2427 if ((o = U.getObject(a, j)) != null && 2428 (o instanceof CountedCompleter)) { 2429 CountedCompleter<?> t = (CountedCompleter<?>)o, r = t; 2430 do { 2431 if (r == root) { 2432 if (U.compareAndSwapInt(q, QLOCK, 0, 1)) { 2433 if (q.array == a && q.top == s && 2434 U.compareAndSwapObject(a, j, t, null)) { 2435 q.top = s - 1; 2436 task = t; 2437 } 2438 q.qlock = 0; 2439 } 2440 break; 2441 } 2442 } while ((r = r.completer) != null); 2443 } 2444 } 2445 if (task != null) 2446 task.doExec(); 2447 if (root.status < 0 || 2448 (u = (int)(ctl >>> 32)) >= 0 || (u >> UAC_SHIFT) >= 0) 2449 break; 2450 if (task == null) { 2451 helpSignal(root, q.poolIndex); 2452 if (root.status >= 0) 2453 helpComplete(root, SHARED_QUEUE); 2454 break; 2455 } 2456 } 2457 } 2458 } 2459 2460 /** 2461 * Tries to help execute or signal availability of the given task 2462 * from submitter's queue in common pool. 2463 */ 2464 static void externalHelpJoin(ForkJoinTask<?> t) { 2465 // Some hard-to-avoid overlap with tryExternalUnpush 2466 ForkJoinPool p; WorkQueue[] ws; WorkQueue q, w; Submitter z; 2467 ForkJoinTask<?>[] a; int m, s, n; 2468 if (t != null && 2469 (z = submitters.get()) != null && 2470 (p = commonPool) != null && 2471 (ws = p.workQueues) != null && 2472 (m = ws.length - 1) >= 0 && 2473 (q = ws[m & z.seed & SQMASK]) != null && 2474 (a = q.array) != null) { 2475 int am = a.length - 1; 2476 if ((s = q.top) != q.base) { 2477 long j = ((am & (s - 1)) << ASHIFT) + ABASE; 2478 if (U.getObject(a, j) == t && 2479 U.compareAndSwapInt(q, QLOCK, 0, 1)) { 2480 if (q.array == a && q.top == s && 2481 U.compareAndSwapObject(a, j, t, null)) { 2482 q.top = s - 1; 2483 q.qlock = 0; 2484 t.doExec(); 2485 } 2486 else 2487 q.qlock = 0; 2488 } 2489 } 2490 if (t.status >= 0) { 2491 if (t instanceof CountedCompleter) 2492 p.externalHelpComplete(q, t); 2493 else 2494 p.helpSignal(t, q.poolIndex); 2495 } 2496 } 2497 } 2498 2499 /** 2500 * Restricted version of helpQuiescePool for external callers 2501 */ 2502 static void externalHelpQuiescePool() { 2503 ForkJoinPool p; ForkJoinTask<?> t; WorkQueue q; int b; 2504 if ((p = commonPool) != null && 2505 (q = p.findNonEmptyStealQueue(1)) != null && 2506 (b = q.base) - q.top < 0 && 2507 (t = q.pollAt(b)) != null) 2508 t.doExec(); 2509 } 2510 2511 // Exported methods 2512 2513 // Constructors 2514 2515 /** 2516 * Creates a {@code ForkJoinPool} with parallelism equal to {@link 2517 * java.lang.Runtime#availableProcessors}, using the {@linkplain 2518 * #defaultForkJoinWorkerThreadFactory default thread factory}, 2519 * no UncaughtExceptionHandler, and non-async LIFO processing mode. 2520 * 2521 * @throws SecurityException if a security manager exists and 2522 * the caller is not permitted to modify threads 2523 * because it does not hold {@link 2524 * java.lang.RuntimePermission}{@code ("modifyThread")} 2525 */ 2526 public ForkJoinPool() { 2527 this(Runtime.getRuntime().availableProcessors(), 2528 defaultForkJoinWorkerThreadFactory, null, false); 2529 } 2530 2531 /** 2532 * Creates a {@code ForkJoinPool} with the indicated parallelism 2533 * level, the {@linkplain 2534 * #defaultForkJoinWorkerThreadFactory default thread factory}, 2535 * no UncaughtExceptionHandler, and non-async LIFO processing mode. 2536 * 2537 * @param parallelism the parallelism level 2538 * @throws IllegalArgumentException if parallelism less than or 2539 * equal to zero, or greater than implementation limit 2540 * @throws SecurityException if a security manager exists and 2541 * the caller is not permitted to modify threads 2542 * because it does not hold {@link 2543 * java.lang.RuntimePermission}{@code ("modifyThread")} 2544 */ 2545 public ForkJoinPool(int parallelism) { 2546 this(parallelism, defaultForkJoinWorkerThreadFactory, null, false); 2547 } 2548 2549 /** 2550 * Creates a {@code ForkJoinPool} with the given parameters. 2551 * 2552 * @param parallelism the parallelism level. For default value, 2553 * use {@link java.lang.Runtime#availableProcessors}. 2554 * @param factory the factory for creating new threads. For default value, 2555 * use {@link #defaultForkJoinWorkerThreadFactory}. 2556 * @param handler the handler for internal worker threads that 2557 * terminate due to unrecoverable errors encountered while executing 2558 * tasks. For default value, use {@code null}. 2559 * @param asyncMode if true, 2560 * establishes local first-in-first-out scheduling mode for forked 2561 * tasks that are never joined. This mode may be more appropriate 2562 * than default locally stack-based mode in applications in which 2563 * worker threads only process event-style asynchronous tasks. 2564 * For default value, use {@code false}. 2565 * @throws IllegalArgumentException if parallelism less than or 2566 * equal to zero, or greater than implementation limit 2567 * @throws NullPointerException if the factory is null 2568 * @throws SecurityException if a security manager exists and 2569 * the caller is not permitted to modify threads 2570 * because it does not hold {@link 2571 * java.lang.RuntimePermission}{@code ("modifyThread")} 2572 */ 2573 public ForkJoinPool(int parallelism, 2574 ForkJoinWorkerThreadFactory factory, 2575 Thread.UncaughtExceptionHandler handler, 2576 boolean asyncMode) { 2577 checkPermission(); 2578 if (factory == null) 2579 throw new NullPointerException(); 2580 if (parallelism <= 0 || parallelism > MAX_CAP) 2581 throw new IllegalArgumentException(); 2582 this.factory = factory; 2583 this.ueh = handler; 2584 this.config = parallelism | (asyncMode ? (FIFO_QUEUE << 16) : 0); 2585 long np = (long)(-parallelism); // offset ctl counts 2586 this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK); 2587 int pn = nextPoolId(); 2588 StringBuilder sb = new StringBuilder("ForkJoinPool-"); 2589 sb.append(Integer.toString(pn)); 2590 sb.append("-worker-"); 2591 this.workerNamePrefix = sb.toString(); 2592 } 2593 2594 /** 2595 * Constructor for common pool, suitable only for static initialization. 2596 * Basically the same as above, but uses smallest possible initial footprint. 2597 */ 2598 ForkJoinPool(int parallelism, long ctl, 2599 ForkJoinWorkerThreadFactory factory, 2600 Thread.UncaughtExceptionHandler handler) { 2601 this.config = parallelism; 2602 this.ctl = ctl; 2603 this.factory = factory; 2604 this.ueh = handler; 2605 this.workerNamePrefix = "ForkJoinPool.commonPool-worker-"; 2606 } 2607 2608 /** 2609 * Returns the common pool instance. This pool is statically 2610 * constructed; its run state is unaffected by attempts to 2611 * {@link #shutdown} or {@link #shutdownNow}. 2612 * 2613 * @return the common pool instance 2614 * @since 1.8 2615 */ 2616 public static ForkJoinPool commonPool() { 2617 // assert commonPool != null : "static init error"; 2618 return commonPool; 2619 } 2620 2621 // Execution methods 2622 2623 /** 2624 * Performs the given task, returning its result upon completion. 2625 * If the computation encounters an unchecked Exception or Error, 2626 * it is rethrown as the outcome of this invocation. Rethrown 2627 * exceptions behave in the same way as regular exceptions, but, 2628 * when possible, contain stack traces (as displayed for example 2629 * using {@code ex.printStackTrace()}) of both the current thread 2630 * as well as the thread actually encountering the exception; 2631 * minimally only the latter. 2632 * 2633 * @param task the task 2634 * @return the task's result 2635 * @throws NullPointerException if the task is null 2636 * @throws RejectedExecutionException if the task cannot be 2637 * scheduled for execution 2638 */ 2639 public <T> T invoke(ForkJoinTask<T> task) { 2640 if (task == null) 2641 throw new NullPointerException(); 2642 externalPush(task); 2643 return task.join(); 2644 } 2645 2646 /** 2647 * Arranges for (asynchronous) execution of the given task. 2648 * 2649 * @param task the task 2650 * @throws NullPointerException if the task is null 2651 * @throws RejectedExecutionException if the task cannot be 2652 * scheduled for execution 2653 */ 2654 public void execute(ForkJoinTask<?> task) { 2655 if (task == null) 2656 throw new NullPointerException(); 2657 externalPush(task); 2658 } 2659 2660 // AbstractExecutorService methods 2661 2662 /** 2663 * @throws NullPointerException if the task is null 2664 * @throws RejectedExecutionException if the task cannot be 2665 * scheduled for execution 2666 */ 2667 public void execute(Runnable task) { 2668 if (task == null) 2669 throw new NullPointerException(); 2670 ForkJoinTask<?> job; 2671 if (task instanceof ForkJoinTask<?>) // avoid re-wrap 2672 job = (ForkJoinTask<?>) task; 2673 else 2674 job = new ForkJoinTask.AdaptedRunnableAction(task); 2675 externalPush(job); 2676 } 2677 2678 /** 2679 * Submits a ForkJoinTask for execution. 2680 * 2681 * @param task the task to submit 2682 * @return the task 2683 * @throws NullPointerException if the task is null 2684 * @throws RejectedExecutionException if the task cannot be 2685 * scheduled for execution 2686 */ 2687 public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) { 2688 if (task == null) 2689 throw new NullPointerException(); 2690 externalPush(task); 2691 return task; 2692 } 2693 2694 /** 2695 * @throws NullPointerException if the task is null 2696 * @throws RejectedExecutionException if the task cannot be 2697 * scheduled for execution 2698 */ 2699 public <T> ForkJoinTask<T> submit(Callable<T> task) { 2700 ForkJoinTask<T> job = new ForkJoinTask.AdaptedCallable<T>(task); 2701 externalPush(job); 2702 return job; 2703 } 2704 2705 /** 2706 * @throws NullPointerException if the task is null 2707 * @throws RejectedExecutionException if the task cannot be 2708 * scheduled for execution 2709 */ 2710 public <T> ForkJoinTask<T> submit(Runnable task, T result) { 2711 ForkJoinTask<T> job = new ForkJoinTask.AdaptedRunnable<T>(task, result); 2712 externalPush(job); 2713 return job; 2714 } 2715 2716 /** 2717 * @throws NullPointerException if the task is null 2718 * @throws RejectedExecutionException if the task cannot be 2719 * scheduled for execution 2720 */ 2721 public ForkJoinTask<?> submit(Runnable task) { 2722 if (task == null) 2723 throw new NullPointerException(); 2724 ForkJoinTask<?> job; 2725 if (task instanceof ForkJoinTask<?>) // avoid re-wrap 2726 job = (ForkJoinTask<?>) task; 2727 else 2728 job = new ForkJoinTask.AdaptedRunnableAction(task); 2729 externalPush(job); 2730 return job; 2731 } 2732 2733 /** 2734 * @throws NullPointerException {@inheritDoc} 2735 * @throws RejectedExecutionException {@inheritDoc} 2736 */ 2737 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) { 2738 // In previous versions of this class, this method constructed 2739 // a task to run ForkJoinTask.invokeAll, but now external 2740 // invocation of multiple tasks is at least as efficient. 2741 List<ForkJoinTask<T>> fs = new ArrayList<ForkJoinTask<T>>(tasks.size()); 2742 // Workaround needed because method wasn't declared with 2743 // wildcards in return type but should have been. 2744 @SuppressWarnings({"unchecked", "rawtypes"}) 2745 List<Future<T>> futures = (List<Future<T>>) (List) fs; 2746 2747 boolean done = false; 2748 try { 2749 for (Callable<T> t : tasks) { 2750 ForkJoinTask<T> f = new ForkJoinTask.AdaptedCallable<T>(t); 2751 externalPush(f); 2752 fs.add(f); 2753 } 2754 for (ForkJoinTask<T> f : fs) 2755 f.quietlyJoin(); 2756 done = true; 2757 return futures; 2758 } finally { 2759 if (!done) 2760 for (ForkJoinTask<T> f : fs) 2761 f.cancel(false); 2762 } 2763 } 2764 2765 /** 2766 * Returns the factory used for constructing new workers. 2767 * 2768 * @return the factory used for constructing new workers 2769 */ 2770 public ForkJoinWorkerThreadFactory getFactory() { 2771 return factory; 2772 } 2773 2774 /** 2775 * Returns the handler for internal worker threads that terminate 2776 * due to unrecoverable errors encountered while executing tasks. 2777 * 2778 * @return the handler, or {@code null} if none 2779 */ 2780 public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() { 2781 return ueh; 2782 } 2783 2784 /** 2785 * Returns the targeted parallelism level of this pool. 2786 * 2787 * @return the targeted parallelism level of this pool 2788 */ 2789 public int getParallelism() { 2790 return config & SMASK; 2791 } 2792 2793 /** 2794 * Returns the targeted parallelism level of the common pool. 2795 * 2796 * @return the targeted parallelism level of the common pool 2797 * @since 1.8 2798 */ 2799 public static int getCommonPoolParallelism() { 2800 return commonPoolParallelism; 2801 } 2802 2803 /** 2804 * Returns the number of worker threads that have started but not 2805 * yet terminated. The result returned by this method may differ 2806 * from {@link #getParallelism} when threads are created to 2807 * maintain parallelism when others are cooperatively blocked. 2808 * 2809 * @return the number of worker threads 2810 */ 2811 public int getPoolSize() { 2812 return (config & SMASK) + (short)(ctl >>> TC_SHIFT); 2813 } 2814 2815 /** 2816 * Returns {@code true} if this pool uses local first-in-first-out 2817 * scheduling mode for forked tasks that are never joined. 2818 * 2819 * @return {@code true} if this pool uses async mode 2820 */ 2821 public boolean getAsyncMode() { 2822 return (config >>> 16) == FIFO_QUEUE; 2823 } 2824 2825 /** 2826 * Returns an estimate of the number of worker threads that are 2827 * not blocked waiting to join tasks or for other managed 2828 * synchronization. This method may overestimate the 2829 * number of running threads. 2830 * 2831 * @return the number of worker threads 2832 */ 2833 public int getRunningThreadCount() { 2834 int rc = 0; 2835 WorkQueue[] ws; WorkQueue w; 2836 if ((ws = workQueues) != null) { 2837 for (int i = 1; i < ws.length; i += 2) { 2838 if ((w = ws[i]) != null && w.isApparentlyUnblocked()) 2839 ++rc; 2840 } 2841 } 2842 return rc; 2843 } 2844 2845 /** 2846 * Returns an estimate of the number of threads that are currently 2847 * stealing or executing tasks. This method may overestimate the 2848 * number of active threads. 2849 * 2850 * @return the number of active threads 2851 */ 2852 public int getActiveThreadCount() { 2853 int r = (config & SMASK) + (int)(ctl >> AC_SHIFT); 2854 return (r <= 0) ? 0 : r; // suppress momentarily negative values 2855 } 2856 2857 /** 2858 * Returns {@code true} if all worker threads are currently idle. 2859 * An idle worker is one that cannot obtain a task to execute 2860 * because none are available to steal from other threads, and 2861 * there are no pending submissions to the pool. This method is 2862 * conservative; it might not return {@code true} immediately upon 2863 * idleness of all threads, but will eventually become true if 2864 * threads remain inactive. 2865 * 2866 * @return {@code true} if all threads are currently idle 2867 */ 2868 public boolean isQuiescent() { 2869 return (int)(ctl >> AC_SHIFT) + (config & SMASK) == 0; 2870 } 2871 2872 /** 2873 * Returns an estimate of the total number of tasks stolen from 2874 * one thread's work queue by another. The reported value 2875 * underestimates the actual total number of steals when the pool 2876 * is not quiescent. This value may be useful for monitoring and 2877 * tuning fork/join programs: in general, steal counts should be 2878 * high enough to keep threads busy, but low enough to avoid 2879 * overhead and contention across threads. 2880 * 2881 * @return the number of steals 2882 */ 2883 public long getStealCount() { 2884 long count = stealCount; 2885 WorkQueue[] ws; WorkQueue w; 2886 if ((ws = workQueues) != null) { 2887 for (int i = 1; i < ws.length; i += 2) { 2888 if ((w = ws[i]) != null) 2889 count += w.nsteals; 2890 } 2891 } 2892 return count; 2893 } 2894 2895 /** 2896 * Returns an estimate of the total number of tasks currently held 2897 * in queues by worker threads (but not including tasks submitted 2898 * to the pool that have not begun executing). This value is only 2899 * an approximation, obtained by iterating across all threads in 2900 * the pool. This method may be useful for tuning task 2901 * granularities. 2902 * 2903 * @return the number of queued tasks 2904 */ 2905 public long getQueuedTaskCount() { 2906 long count = 0; 2907 WorkQueue[] ws; WorkQueue w; 2908 if ((ws = workQueues) != null) { 2909 for (int i = 1; i < ws.length; i += 2) { 2910 if ((w = ws[i]) != null) 2911 count += w.queueSize(); 2912 } 2913 } 2914 return count; 2915 } 2916 2917 /** 2918 * Returns an estimate of the number of tasks submitted to this 2919 * pool that have not yet begun executing. This method may take 2920 * time proportional to the number of submissions. 2921 * 2922 * @return the number of queued submissions 2923 */ 2924 public int getQueuedSubmissionCount() { 2925 int count = 0; 2926 WorkQueue[] ws; WorkQueue w; 2927 if ((ws = workQueues) != null) { 2928 for (int i = 0; i < ws.length; i += 2) { 2929 if ((w = ws[i]) != null) 2930 count += w.queueSize(); 2931 } 2932 } 2933 return count; 2934 } 2935 2936 /** 2937 * Returns {@code true} if there are any tasks submitted to this 2938 * pool that have not yet begun executing. 2939 * 2940 * @return {@code true} if there are any queued submissions 2941 */ 2942 public boolean hasQueuedSubmissions() { 2943 WorkQueue[] ws; WorkQueue w; 2944 if ((ws = workQueues) != null) { 2945 for (int i = 0; i < ws.length; i += 2) { 2946 if ((w = ws[i]) != null && !w.isEmpty()) 2947 return true; 2948 } 2949 } 2950 return false; 2951 } 2952 2953 /** 2954 * Removes and returns the next unexecuted submission if one is 2955 * available. This method may be useful in extensions to this 2956 * class that re-assign work in systems with multiple pools. 2957 * 2958 * @return the next submission, or {@code null} if none 2959 */ 2960 protected ForkJoinTask<?> pollSubmission() { 2961 WorkQueue[] ws; WorkQueue w; ForkJoinTask<?> t; 2962 if ((ws = workQueues) != null) { 2963 for (int i = 0; i < ws.length; i += 2) { 2964 if ((w = ws[i]) != null && (t = w.poll()) != null) 2965 return t; 2966 } 2967 } 2968 return null; 2969 } 2970 2971 /** 2972 * Removes all available unexecuted submitted and forked tasks 2973 * from scheduling queues and adds them to the given collection, 2974 * without altering their execution status. These may include 2975 * artificially generated or wrapped tasks. This method is 2976 * designed to be invoked only when the pool is known to be 2977 * quiescent. Invocations at other times may not remove all 2978 * tasks. A failure encountered while attempting to add elements 2979 * to collection {@code c} may result in elements being in 2980 * neither, either or both collections when the associated 2981 * exception is thrown. The behavior of this operation is 2982 * undefined if the specified collection is modified while the 2983 * operation is in progress. 2984 * 2985 * @param c the collection to transfer elements into 2986 * @return the number of elements transferred 2987 */ 2988 protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) { 2989 int count = 0; 2990 WorkQueue[] ws; WorkQueue w; ForkJoinTask<?> t; 2991 if ((ws = workQueues) != null) { 2992 for (int i = 0; i < ws.length; ++i) { 2993 if ((w = ws[i]) != null) { 2994 while ((t = w.poll()) != null) { 2995 c.add(t); 2996 ++count; 2997 } 2998 } 2999 } 3000 } 3001 return count; 3002 } 3003 3004 /** 3005 * Returns a string identifying this pool, as well as its state, 3006 * including indications of run state, parallelism level, and 3007 * worker and task counts. 3008 * 3009 * @return a string identifying this pool, as well as its state 3010 */ 3011 public String toString() { 3012 // Use a single pass through workQueues to collect counts 3013 long qt = 0L, qs = 0L; int rc = 0; 3014 long st = stealCount; 3015 long c = ctl; 3016 WorkQueue[] ws; WorkQueue w; 3017 if ((ws = workQueues) != null) { 3018 for (int i = 0; i < ws.length; ++i) { 3019 if ((w = ws[i]) != null) { 3020 int size = w.queueSize(); 3021 if ((i & 1) == 0) 3022 qs += size; 3023 else { 3024 qt += size; 3025 st += w.nsteals; 3026 if (w.isApparentlyUnblocked()) 3027 ++rc; 3028 } 3029 } 3030 } 3031 } 3032 int pc = (config & SMASK); 3033 int tc = pc + (short)(c >>> TC_SHIFT); 3034 int ac = pc + (int)(c >> AC_SHIFT); 3035 if (ac < 0) // ignore transient negative 3036 ac = 0; 3037 String level; 3038 if ((c & STOP_BIT) != 0) 3039 level = (tc == 0) ? "Terminated" : "Terminating"; 3040 else 3041 level = plock < 0 ? "Shutting down" : "Running"; 3042 return super.toString() + 3043 "[" + level + 3044 ", parallelism = " + pc + 3045 ", size = " + tc + 3046 ", active = " + ac + 3047 ", running = " + rc + 3048 ", steals = " + st + 3049 ", tasks = " + qt + 3050 ", submissions = " + qs + 3051 "]"; 3052 } 3053 3054 /** 3055 * Possibly initiates an orderly shutdown in which previously 3056 * submitted tasks are executed, but no new tasks will be 3057 * accepted. Invocation has no effect on execution state if this 3058 * is the {@link #commonPool}, and no additional effect if 3059 * already shut down. Tasks that are in the process of being 3060 * submitted concurrently during the course of this method may or 3061 * may not be rejected. 3062 * 3063 * @throws SecurityException if a security manager exists and 3064 * the caller is not permitted to modify threads 3065 * because it does not hold {@link 3066 * java.lang.RuntimePermission}{@code ("modifyThread")} 3067 */ 3068 public void shutdown() { 3069 checkPermission(); 3070 tryTerminate(false, true); 3071 } 3072 3073 /** 3074 * Possibly attempts to cancel and/or stop all tasks, and reject 3075 * all subsequently submitted tasks. Invocation has no effect on 3076 * execution state if this is the {@link #commonPool}, and no 3077 * additional effect if already shut down. Otherwise, tasks that 3078 * are in the process of being submitted or executed concurrently 3079 * during the course of this method may or may not be 3080 * rejected. This method cancels both existing and unexecuted 3081 * tasks, in order to permit termination in the presence of task 3082 * dependencies. So the method always returns an empty list 3083 * (unlike the case for some other Executors). 3084 * 3085 * @return an empty list 3086 * @throws SecurityException if a security manager exists and 3087 * the caller is not permitted to modify threads 3088 * because it does not hold {@link 3089 * java.lang.RuntimePermission}{@code ("modifyThread")} 3090 */ 3091 public List<Runnable> shutdownNow() { 3092 checkPermission(); 3093 tryTerminate(true, true); 3094 return Collections.emptyList(); 3095 } 3096 3097 /** 3098 * Returns {@code true} if all tasks have completed following shut down. 3099 * 3100 * @return {@code true} if all tasks have completed following shut down 3101 */ 3102 public boolean isTerminated() { 3103 long c = ctl; 3104 return ((c & STOP_BIT) != 0L && 3105 (short)(c >>> TC_SHIFT) == -(config & SMASK)); 3106 } 3107 3108 /** 3109 * Returns {@code true} if the process of termination has 3110 * commenced but not yet completed. This method may be useful for 3111 * debugging. A return of {@code true} reported a sufficient 3112 * period after shutdown may indicate that submitted tasks have 3113 * ignored or suppressed interruption, or are waiting for I/O, 3114 * causing this executor not to properly terminate. (See the 3115 * advisory notes for class {@link ForkJoinTask} stating that 3116 * tasks should not normally entail blocking operations. But if 3117 * they do, they must abort them on interrupt.) 3118 * 3119 * @return {@code true} if terminating but not yet terminated 3120 */ 3121 public boolean isTerminating() { 3122 long c = ctl; 3123 return ((c & STOP_BIT) != 0L && 3124 (short)(c >>> TC_SHIFT) != -(config & SMASK)); 3125 } 3126 3127 /** 3128 * Returns {@code true} if this pool has been shut down. 3129 * 3130 * @return {@code true} if this pool has been shut down 3131 */ 3132 public boolean isShutdown() { 3133 return plock < 0; 3134 } 3135 3136 /** 3137 * Blocks until all tasks have completed execution after a 3138 * shutdown request, or the timeout occurs, or the current thread 3139 * is interrupted, whichever happens first. Note that the {@link 3140 * #commonPool()} never terminates until program shutdown so 3141 * this method will always time out. 3142 * 3143 * @param timeout the maximum time to wait 3144 * @param unit the time unit of the timeout argument 3145 * @return {@code true} if this executor terminated and 3146 * {@code false} if the timeout elapsed before termination 3147 * @throws InterruptedException if interrupted while waiting 3148 */ 3149 public boolean awaitTermination(long timeout, TimeUnit unit) 3150 throws InterruptedException { 3151 long nanos = unit.toNanos(timeout); 3152 if (isTerminated()) 3153 return true; 3154 long startTime = System.nanoTime(); 3155 boolean terminated = false; 3156 synchronized (this) { 3157 for (long waitTime = nanos, millis = 0L;;) { 3158 if (terminated = isTerminated() || 3159 waitTime <= 0L || 3160 (millis = unit.toMillis(waitTime)) <= 0L) 3161 break; 3162 wait(millis); 3163 waitTime = nanos - (System.nanoTime() - startTime); 3164 } 3165 } 3166 return terminated; 3167 } 3168 3169 /** 3170 * Interface for extending managed parallelism for tasks running 3171 * in {@link ForkJoinPool}s. 3172 * 3173 * <p>A {@code ManagedBlocker} provides two methods. Method 3174 * {@code isReleasable} must return {@code true} if blocking is 3175 * not necessary. Method {@code block} blocks the current thread 3176 * if necessary (perhaps internally invoking {@code isReleasable} 3177 * before actually blocking). These actions are performed by any 3178 * thread invoking {@link ForkJoinPool#managedBlock}. The 3179 * unusual methods in this API accommodate synchronizers that may, 3180 * but don't usually, block for long periods. Similarly, they 3181 * allow more efficient internal handling of cases in which 3182 * additional workers may be, but usually are not, needed to 3183 * ensure sufficient parallelism. Toward this end, 3184 * implementations of method {@code isReleasable} must be amenable 3185 * to repeated invocation. 3186 * 3187 * <p>For example, here is a ManagedBlocker based on a 3188 * ReentrantLock: 3189 * <pre> {@code 3190 * class ManagedLocker implements ManagedBlocker { 3191 * final ReentrantLock lock; 3192 * boolean hasLock = false; 3193 * ManagedLocker(ReentrantLock lock) { this.lock = lock; } 3194 * public boolean block() { 3195 * if (!hasLock) 3196 * lock.lock(); 3197 * return true; 3198 * } 3199 * public boolean isReleasable() { 3200 * return hasLock || (hasLock = lock.tryLock()); 3201 * } 3202 * }}</pre> 3203 * 3204 * <p>Here is a class that possibly blocks waiting for an 3205 * item on a given queue: 3206 * <pre> {@code 3207 * class QueueTaker<E> implements ManagedBlocker { 3208 * final BlockingQueue<E> queue; 3209 * volatile E item = null; 3210 * QueueTaker(BlockingQueue<E> q) { this.queue = q; } 3211 * public boolean block() throws InterruptedException { 3212 * if (item == null) 3213 * item = queue.take(); 3214 * return true; 3215 * } 3216 * public boolean isReleasable() { 3217 * return item != null || (item = queue.poll()) != null; 3218 * } 3219 * public E getItem() { // call after pool.managedBlock completes 3220 * return item; 3221 * } 3222 * }}</pre> 3223 */ 3224 public static interface ManagedBlocker { 3225 /** 3226 * Possibly blocks the current thread, for example waiting for 3227 * a lock or condition. 3228 * 3229 * @return {@code true} if no additional blocking is necessary 3230 * (i.e., if isReleasable would return true) 3231 * @throws InterruptedException if interrupted while waiting 3232 * (the method is not required to do so, but is allowed to) 3233 */ 3234 boolean block() throws InterruptedException; 3235 3236 /** 3237 * Returns {@code true} if blocking is unnecessary. 3238 */ 3239 boolean isReleasable(); 3240 } 3241 3242 /** 3243 * Blocks in accord with the given blocker. If the current thread 3244 * is a {@link ForkJoinWorkerThread}, this method possibly 3245 * arranges for a spare thread to be activated if necessary to 3246 * ensure sufficient parallelism while the current thread is blocked. 3247 * 3248 * <p>If the caller is not a {@link ForkJoinTask}, this method is 3249 * behaviorally equivalent to 3250 * <pre> {@code 3251 * while (!blocker.isReleasable()) 3252 * if (blocker.block()) 3253 * return; 3254 * }</pre> 3255 * 3256 * If the caller is a {@code ForkJoinTask}, then the pool may 3257 * first be expanded to ensure parallelism, and later adjusted. 3258 * 3259 * @param blocker the blocker 3260 * @throws InterruptedException if blocker.block did so 3261 */ 3262 public static void managedBlock(ManagedBlocker blocker) 3263 throws InterruptedException { 3264 Thread t = Thread.currentThread(); 3265 if (t instanceof ForkJoinWorkerThread) { 3266 ForkJoinPool p = ((ForkJoinWorkerThread)t).pool; 3267 while (!blocker.isReleasable()) { // variant of helpSignal 3268 WorkQueue[] ws; WorkQueue q; int m, u; 3269 if ((ws = p.workQueues) != null && (m = ws.length - 1) >= 0) { 3270 for (int i = 0; i <= m; ++i) { 3271 if (blocker.isReleasable()) 3272 return; 3273 if ((q = ws[i]) != null && q.base - q.top < 0) { 3274 p.signalWork(q); 3275 if ((u = (int)(p.ctl >>> 32)) >= 0 || 3276 (u >> UAC_SHIFT) >= 0) 3277 break; 3278 } 3279 } 3280 } 3281 if (p.tryCompensate()) { 3282 try { 3283 do {} while (!blocker.isReleasable() && 3284 !blocker.block()); 3285 } finally { 3286 p.incrementActiveCount(); 3287 } 3288 break; 3289 } 3290 } 3291 } 3292 else { 3293 do {} while (!blocker.isReleasable() && 3294 !blocker.block()); 3295 } 3296 } 3297 3298 // AbstractExecutorService overrides. These rely on undocumented 3299 // fact that ForkJoinTask.adapt returns ForkJoinTasks that also 3300 // implement RunnableFuture. 3301 3302 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { 3303 return new ForkJoinTask.AdaptedRunnable<T>(runnable, value); 3304 } 3305 3306 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { 3307 return new ForkJoinTask.AdaptedCallable<T>(callable); 3308 } 3309 3310 // Unsafe mechanics 3311 private static final sun.misc.Unsafe U; 3312 private static final long CTL; 3313 private static final long PARKBLOCKER; 3314 private static final int ABASE; 3315 private static final int ASHIFT; 3316 private static final long STEALCOUNT; 3317 private static final long PLOCK; 3318 private static final long INDEXSEED; 3319 private static final long QLOCK; 3320 3321 static { 3322 int s; // initialize field offsets for CAS etc 3323 try { 3324 U = sun.misc.Unsafe.getUnsafe(); 3325 Class<?> k = ForkJoinPool.class; 3326 CTL = U.objectFieldOffset 3327 (k.getDeclaredField("ctl")); 3328 STEALCOUNT = U.objectFieldOffset 3329 (k.getDeclaredField("stealCount")); 3330 PLOCK = U.objectFieldOffset 3331 (k.getDeclaredField("plock")); 3332 INDEXSEED = U.objectFieldOffset 3333 (k.getDeclaredField("indexSeed")); 3334 Class<?> tk = Thread.class; 3335 PARKBLOCKER = U.objectFieldOffset 3336 (tk.getDeclaredField("parkBlocker")); 3337 Class<?> wk = WorkQueue.class; 3338 QLOCK = U.objectFieldOffset 3339 (wk.getDeclaredField("qlock")); 3340 Class<?> ak = ForkJoinTask[].class; 3341 ABASE = U.arrayBaseOffset(ak); 3342 s = U.arrayIndexScale(ak); 3343 ASHIFT = 31 - Integer.numberOfLeadingZeros(s); 3344 } catch (Exception e) { 3345 throw new Error(e); 3346 } 3347 if ((s & (s-1)) != 0) 3348 throw new Error("data type scale not a power of two"); 3349 3350 submitters = new ThreadLocal<Submitter>(); 3351 ForkJoinWorkerThreadFactory fac = defaultForkJoinWorkerThreadFactory = 3352 new DefaultForkJoinWorkerThreadFactory(); 3353 modifyThreadPermission = new RuntimePermission("modifyThread"); 3354 3355 /* 3356 * Establish common pool parameters. For extra caution, 3357 * computations to set up common pool state are here; the 3358 * constructor just assigns these values to fields. 3359 */ 3360 3361 int par = 0; 3362 Thread.UncaughtExceptionHandler handler = null; 3363 try { // TBD: limit or report ignored exceptions? 3364 String pp = System.getProperty 3365 ("java.util.concurrent.ForkJoinPool.common.parallelism"); 3366 String hp = System.getProperty 3367 ("java.util.concurrent.ForkJoinPool.common.exceptionHandler"); 3368 String fp = System.getProperty 3369 ("java.util.concurrent.ForkJoinPool.common.threadFactory"); 3370 if (fp != null) 3371 fac = ((ForkJoinWorkerThreadFactory)ClassLoader. 3372 getSystemClassLoader().loadClass(fp).newInstance()); 3373 if (hp != null) 3374 handler = ((Thread.UncaughtExceptionHandler)ClassLoader. 3375 getSystemClassLoader().loadClass(hp).newInstance()); 3376 if (pp != null) 3377 par = Integer.parseInt(pp); 3378 } catch (Exception ignore) { 3379 } 3380 3381 if (par <= 0) 3382 par = Runtime.getRuntime().availableProcessors(); 3383 if (par > MAX_CAP) 3384 par = MAX_CAP; 3385 commonPoolParallelism = par; 3386 long np = (long)(-par); // precompute initial ctl value 3387 long ct = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK); 3388 3389 commonPool = new ForkJoinPool(par, ct, fac, handler); 3390 } 3391 3392 } --- EOF ---