1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent; 37 38 import java.util.ArrayList; 39 import java.util.Arrays; 40 import java.util.Collection; 41 import java.util.Collections; 42 import java.util.List; 43 import java.util.Random; 44 import java.util.concurrent.AbstractExecutorService; 45 import java.util.concurrent.Callable; 46 import java.util.concurrent.ExecutorService; 47 import java.util.concurrent.Future; 48 import java.util.concurrent.RejectedExecutionException; 49 import java.util.concurrent.RunnableFuture; 50 import java.util.concurrent.TimeUnit; 51 import java.util.concurrent.TimeoutException; 52 import java.util.concurrent.atomic.AtomicInteger; 53 import java.util.concurrent.locks.LockSupport; 54 import java.util.concurrent.locks.ReentrantLock; 55 import java.util.concurrent.locks.Condition; 56 57 /** 58 * An {@link ExecutorService} for running {@link ForkJoinTask}s. 59 * A {@code ForkJoinPool} provides the entry point for submissions 60 * from non-{@code ForkJoinTask} clients, as well as management and 61 * monitoring operations. 62 * 63 * <p>A {@code ForkJoinPool} differs from other kinds of {@link 64 * ExecutorService} mainly by virtue of employing 65 * <em>work-stealing</em>: all threads in the pool attempt to find and 66 * execute subtasks created by other active tasks (eventually blocking 67 * waiting for work if none exist). This enables efficient processing 68 * when most tasks spawn other subtasks (as do most {@code 69 * ForkJoinTask}s). When setting <em>asyncMode</em> to true in 70 * constructors, {@code ForkJoinPool}s may also be appropriate for use 71 * with event-style tasks that are never joined. 72 * 73 * <p>A {@code ForkJoinPool} is constructed with a given target 74 * parallelism level; by default, equal to the number of available 75 * processors. The pool attempts to maintain enough active (or 76 * available) threads by dynamically adding, suspending, or resuming 77 * internal worker threads, even if some tasks are stalled waiting to 78 * join others. However, no such adjustments are guaranteed in the 79 * face of blocked IO or other unmanaged synchronization. The nested 80 * {@link ManagedBlocker} interface enables extension of the kinds of 81 * synchronization accommodated. 82 * 83 * <p>In addition to execution and lifecycle control methods, this 84 * class provides status check methods (for example 85 * {@link #getStealCount}) that are intended to aid in developing, 86 * tuning, and monitoring fork/join applications. Also, method 87 * {@link #toString} returns indications of pool state in a 88 * convenient form for informal monitoring. 89 * 90 * <p> As is the case with other ExecutorServices, there are three 91 * main task execution methods summarized in the following 92 * table. These are designed to be used by clients not already engaged 93 * in fork/join computations in the current pool. The main forms of 94 * these methods accept instances of {@code ForkJoinTask}, but 95 * overloaded forms also allow mixed execution of plain {@code 96 * Runnable}- or {@code Callable}- based activities as well. However, 97 * tasks that are already executing in a pool should normally 98 * <em>NOT</em> use these pool execution methods, but instead use the 99 * within-computation forms listed in the table. 100 * 101 * <table BORDER CELLPADDING=3 CELLSPACING=1> 102 * <tr> 103 * <td></td> 104 * <td ALIGN=CENTER> <b>Call from non-fork/join clients</b></td> 105 * <td ALIGN=CENTER> <b>Call from within fork/join computations</b></td> 106 * </tr> 107 * <tr> 108 * <td> <b>Arrange async execution</td> 109 * <td> {@link #execute(ForkJoinTask)}</td> 110 * <td> {@link ForkJoinTask#fork}</td> 111 * </tr> 112 * <tr> 113 * <td> <b>Await and obtain result</td> 114 * <td> {@link #invoke(ForkJoinTask)}</td> 115 * <td> {@link ForkJoinTask#invoke}</td> 116 * </tr> 117 * <tr> 118 * <td> <b>Arrange exec and obtain Future</td> 119 * <td> {@link #submit(ForkJoinTask)}</td> 120 * <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td> 121 * </tr> 122 * </table> 123 * 124 * <p><b>Sample Usage.</b> Normally a single {@code ForkJoinPool} is 125 * used for all parallel task execution in a program or subsystem. 126 * Otherwise, use would not usually outweigh the construction and 127 * bookkeeping overhead of creating a large set of threads. For 128 * example, a common pool could be used for the {@code SortTasks} 129 * illustrated in {@link RecursiveAction}. Because {@code 130 * ForkJoinPool} uses threads in {@linkplain java.lang.Thread#isDaemon 131 * daemon} mode, there is typically no need to explicitly {@link 132 * #shutdown} such a pool upon program exit. 133 * 134 * <pre> 135 * static final ForkJoinPool mainPool = new ForkJoinPool(); 136 * ... 137 * public void sort(long[] array) { 138 * mainPool.invoke(new SortTask(array, 0, array.length)); 139 * } 140 * </pre> 141 * 142 * <p><b>Implementation notes</b>: This implementation restricts the 143 * maximum number of running threads to 32767. Attempts to create 144 * pools with greater than the maximum number result in 145 * {@code IllegalArgumentException}. 146 * 147 * <p>This implementation rejects submitted tasks (that is, by throwing 148 * {@link RejectedExecutionException}) only when the pool is shut down 149 * or internal resources have been exhausted. 150 * 151 * @since 1.7 152 * @author Doug Lea 153 */ 154 public class ForkJoinPool extends AbstractExecutorService { 155 156 /* 157 * Implementation Overview 158 * 159 * This class provides the central bookkeeping and control for a 160 * set of worker threads: Submissions from non-FJ threads enter 161 * into a submission queue. Workers take these tasks and typically 162 * split them into subtasks that may be stolen by other workers. 163 * Preference rules give first priority to processing tasks from 164 * their own queues (LIFO or FIFO, depending on mode), then to 165 * randomized FIFO steals of tasks in other worker queues, and 166 * lastly to new submissions. 167 * 168 * The main throughput advantages of work-stealing stem from 169 * decentralized control -- workers mostly take tasks from 170 * themselves or each other. We cannot negate this in the 171 * implementation of other management responsibilities. The main 172 * tactic for avoiding bottlenecks is packing nearly all 173 * essentially atomic control state into a single 64bit volatile 174 * variable ("ctl"). This variable is read on the order of 10-100 175 * times as often as it is modified (always via CAS). (There is 176 * some additional control state, for example variable "shutdown" 177 * for which we can cope with uncoordinated updates.) This 178 * streamlines synchronization and control at the expense of messy 179 * constructions needed to repack status bits upon updates. 180 * Updates tend not to contend with each other except during 181 * bursts while submitted tasks begin or end. In some cases when 182 * they do contend, threads can instead do something else 183 * (usually, scan for tasks) until contention subsides. 184 * 185 * To enable packing, we restrict maximum parallelism to (1<<15)-1 186 * (which is far in excess of normal operating range) to allow 187 * ids, counts, and their negations (used for thresholding) to fit 188 * into 16bit fields. 189 * 190 * Recording Workers. Workers are recorded in the "workers" array 191 * that is created upon pool construction and expanded if (rarely) 192 * necessary. This is an array as opposed to some other data 193 * structure to support index-based random steals by workers. 194 * Updates to the array recording new workers and unrecording 195 * terminated ones are protected from each other by a seqLock 196 * (scanGuard) but the array is otherwise concurrently readable, 197 * and accessed directly by workers. To simplify index-based 198 * operations, the array size is always a power of two, and all 199 * readers must tolerate null slots. To avoid flailing during 200 * start-up, the array is presized to hold twice #parallelism 201 * workers (which is unlikely to need further resizing during 202 * execution). But to avoid dealing with so many null slots, 203 * variable scanGuard includes a mask for the nearest power of two 204 * that contains all current workers. All worker thread creation 205 * is on-demand, triggered by task submissions, replacement of 206 * terminated workers, and/or compensation for blocked 207 * workers. However, all other support code is set up to work with 208 * other policies. To ensure that we do not hold on to worker 209 * references that would prevent GC, ALL accesses to workers are 210 * via indices into the workers array (which is one source of some 211 * of the messy code constructions here). In essence, the workers 212 * array serves as a weak reference mechanism. Thus for example 213 * the wait queue field of ctl stores worker indices, not worker 214 * references. Access to the workers in associated methods (for 215 * example signalWork) must both index-check and null-check the 216 * IDs. All such accesses ignore bad IDs by returning out early 217 * from what they are doing, since this can only be associated 218 * with termination, in which case it is OK to give up. 219 * 220 * All uses of the workers array, as well as queue arrays, check 221 * that the array is non-null (even if previously non-null). This 222 * allows nulling during termination, which is currently not 223 * necessary, but remains an option for resource-revocation-based 224 * shutdown schemes. 225 * 226 * Wait Queuing. Unlike HPC work-stealing frameworks, we cannot 227 * let workers spin indefinitely scanning for tasks when none can 228 * be found immediately, and we cannot start/resume workers unless 229 * there appear to be tasks available. On the other hand, we must 230 * quickly prod them into action when new tasks are submitted or 231 * generated. We park/unpark workers after placing in an event 232 * wait queue when they cannot find work. This "queue" is actually 233 * a simple Treiber stack, headed by the "id" field of ctl, plus a 234 * 15bit counter value to both wake up waiters (by advancing their 235 * count) and avoid ABA effects. Successors are held in worker 236 * field "nextWait". Queuing deals with several intrinsic races, 237 * mainly that a task-producing thread can miss seeing (and 238 * signalling) another thread that gave up looking for work but 239 * has not yet entered the wait queue. We solve this by requiring 240 * a full sweep of all workers both before (in scan()) and after 241 * (in tryAwaitWork()) a newly waiting worker is added to the wait 242 * queue. During a rescan, the worker might release some other 243 * queued worker rather than itself, which has the same net 244 * effect. Because enqueued workers may actually be rescanning 245 * rather than waiting, we set and clear the "parked" field of 246 * ForkJoinWorkerThread to reduce unnecessary calls to unpark. 247 * (Use of the parked field requires a secondary recheck to avoid 248 * missed signals.) 249 * 250 * Signalling. We create or wake up workers only when there 251 * appears to be at least one task they might be able to find and 252 * execute. When a submission is added or another worker adds a 253 * task to a queue that previously had two or fewer tasks, they 254 * signal waiting workers (or trigger creation of new ones if 255 * fewer than the given parallelism level -- see signalWork). 256 * These primary signals are buttressed by signals during rescans 257 * as well as those performed when a worker steals a task and 258 * notices that there are more tasks too; together these cover the 259 * signals needed in cases when more than two tasks are pushed 260 * but untaken. 261 * 262 * Trimming workers. To release resources after periods of lack of 263 * use, a worker starting to wait when the pool is quiescent will 264 * time out and terminate if the pool has remained quiescent for 265 * SHRINK_RATE nanosecs. This will slowly propagate, eventually 266 * terminating all workers after long periods of non-use. 267 * 268 * Submissions. External submissions are maintained in an 269 * array-based queue that is structured identically to 270 * ForkJoinWorkerThread queues except for the use of 271 * submissionLock in method addSubmission. Unlike the case for 272 * worker queues, multiple external threads can add new 273 * submissions, so adding requires a lock. 274 * 275 * Compensation. Beyond work-stealing support and lifecycle 276 * control, the main responsibility of this framework is to take 277 * actions when one worker is waiting to join a task stolen (or 278 * always held by) another. Because we are multiplexing many 279 * tasks on to a pool of workers, we can't just let them block (as 280 * in Thread.join). We also cannot just reassign the joiner's 281 * run-time stack with another and replace it later, which would 282 * be a form of "continuation", that even if possible is not 283 * necessarily a good idea since we sometimes need both an 284 * unblocked task and its continuation to progress. Instead we 285 * combine two tactics: 286 * 287 * Helping: Arranging for the joiner to execute some task that it 288 * would be running if the steal had not occurred. Method 289 * ForkJoinWorkerThread.joinTask tracks joining->stealing 290 * links to try to find such a task. 291 * 292 * Compensating: Unless there are already enough live threads, 293 * method tryPreBlock() may create or re-activate a spare 294 * thread to compensate for blocked joiners until they 295 * unblock. 296 * 297 * The ManagedBlocker extension API can't use helping so relies 298 * only on compensation in method awaitBlocker. 299 * 300 * It is impossible to keep exactly the target parallelism number 301 * of threads running at any given time. Determining the 302 * existence of conservatively safe helping targets, the 303 * availability of already-created spares, and the apparent need 304 * to create new spares are all racy and require heuristic 305 * guidance, so we rely on multiple retries of each. Currently, 306 * in keeping with on-demand signalling policy, we compensate only 307 * if blocking would leave less than one active (non-waiting, 308 * non-blocked) worker. Additionally, to avoid some false alarms 309 * due to GC, lagging counters, system activity, etc, compensated 310 * blocking for joins is only attempted after rechecks stabilize 311 * (retries are interspersed with Thread.yield, for good 312 * citizenship). The variable blockedCount, incremented before 313 * blocking and decremented after, is sometimes needed to 314 * distinguish cases of waiting for work vs blocking on joins or 315 * other managed sync. Both cases are equivalent for most pool 316 * control, so we can update non-atomically. (Additionally, 317 * contention on blockedCount alleviates some contention on ctl). 318 * 319 * Shutdown and Termination. A call to shutdownNow atomically sets 320 * the ctl stop bit and then (non-atomically) sets each workers 321 * "terminate" status, cancels all unprocessed tasks, and wakes up 322 * all waiting workers. Detecting whether termination should 323 * commence after a non-abrupt shutdown() call requires more work 324 * and bookkeeping. We need consensus about quiesence (i.e., that 325 * there is no more work) which is reflected in active counts so 326 * long as there are no current blockers, as well as possible 327 * re-evaluations during independent changes in blocking or 328 * quiescing workers. 329 * 330 * Style notes: There is a lot of representation-level coupling 331 * among classes ForkJoinPool, ForkJoinWorkerThread, and 332 * ForkJoinTask. Most fields of ForkJoinWorkerThread maintain 333 * data structures managed by ForkJoinPool, so are directly 334 * accessed. Conversely we allow access to "workers" array by 335 * workers, and direct access to ForkJoinTask.status by both 336 * ForkJoinPool and ForkJoinWorkerThread. There is little point 337 * trying to reduce this, since any associated future changes in 338 * representations will need to be accompanied by algorithmic 339 * changes anyway. All together, these low-level implementation 340 * choices produce as much as a factor of 4 performance 341 * improvement compared to naive implementations, and enable the 342 * processing of billions of tasks per second, at the expense of 343 * some ugliness. 344 * 345 * Methods signalWork() and scan() are the main bottlenecks so are 346 * especially heavily micro-optimized/mangled. There are lots of 347 * inline assignments (of form "while ((local = field) != 0)") 348 * which are usually the simplest way to ensure the required read 349 * orderings (which are sometimes critical). This leads to a 350 * "C"-like style of listing declarations of these locals at the 351 * heads of methods or blocks. There are several occurrences of 352 * the unusual "do {} while (!cas...)" which is the simplest way 353 * to force an update of a CAS'ed variable. There are also other 354 * coding oddities that help some methods perform reasonably even 355 * when interpreted (not compiled). 356 * 357 * The order of declarations in this file is: (1) declarations of 358 * statics (2) fields (along with constants used when unpacking 359 * some of them), listed in an order that tends to reduce 360 * contention among them a bit under most JVMs. (3) internal 361 * control methods (4) callbacks and other support for 362 * ForkJoinTask and ForkJoinWorkerThread classes, (5) exported 363 * methods (plus a few little helpers). (6) static block 364 * initializing all statics in a minimally dependent order. 365 */ 366 367 /** 368 * Factory for creating new {@link ForkJoinWorkerThread}s. 369 * A {@code ForkJoinWorkerThreadFactory} must be defined and used 370 * for {@code ForkJoinWorkerThread} subclasses that extend base 371 * functionality or initialize threads with different contexts. 372 */ 373 public static interface ForkJoinWorkerThreadFactory { 374 /** 375 * Returns a new worker thread operating in the given pool. 376 * 377 * @param pool the pool this thread works in 378 * @throws NullPointerException if the pool is null 379 */ 380 public ForkJoinWorkerThread newThread(ForkJoinPool pool); 381 } 382 383 /** 384 * Default ForkJoinWorkerThreadFactory implementation; creates a 385 * new ForkJoinWorkerThread. 386 */ 387 static class DefaultForkJoinWorkerThreadFactory 388 implements ForkJoinWorkerThreadFactory { 389 public ForkJoinWorkerThread newThread(ForkJoinPool pool) { 390 return new ForkJoinWorkerThread(pool); 391 } 392 } 393 394 /** 395 * Creates a new ForkJoinWorkerThread. This factory is used unless 396 * overridden in ForkJoinPool constructors. 397 */ 398 public static final ForkJoinWorkerThreadFactory 399 defaultForkJoinWorkerThreadFactory; 400 401 /** 402 * Permission required for callers of methods that may start or 403 * kill threads. 404 */ 405 private static final RuntimePermission modifyThreadPermission; 406 407 /** 408 * If there is a security manager, makes sure caller has 409 * permission to modify threads. 410 */ 411 private static void checkPermission() { 412 SecurityManager security = System.getSecurityManager(); 413 if (security != null) 414 security.checkPermission(modifyThreadPermission); 415 } 416 417 /** 418 * Generator for assigning sequence numbers as pool names. 419 */ 420 private static final AtomicInteger poolNumberGenerator; 421 422 /** 423 * Generator for initial random seeds for worker victim 424 * selection. This is used only to create initial seeds. Random 425 * steals use a cheaper xorshift generator per steal attempt. We 426 * don't expect much contention on seedGenerator, so just use a 427 * plain Random. 428 */ 429 static final Random workerSeedGenerator; 430 431 /** 432 * Array holding all worker threads in the pool. Initialized upon 433 * construction. Array size must be a power of two. Updates and 434 * replacements are protected by scanGuard, but the array is 435 * always kept in a consistent enough state to be randomly 436 * accessed without locking by workers performing work-stealing, 437 * as well as other traversal-based methods in this class, so long 438 * as reads memory-acquire by first reading ctl. All readers must 439 * tolerate that some array slots may be null. 440 */ 441 ForkJoinWorkerThread[] workers; 442 443 /** 444 * Initial size for submission queue array. Must be a power of 445 * two. In many applications, these always stay small so we use a 446 * small initial cap. 447 */ 448 private static final int INITIAL_QUEUE_CAPACITY = 8; 449 450 /** 451 * Maximum size for submission queue array. Must be a power of two 452 * less than or equal to 1 << (31 - width of array entry) to 453 * ensure lack of index wraparound, but is capped at a lower 454 * value to help users trap runaway computations. 455 */ 456 private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 24; // 16M 457 458 /** 459 * Array serving as submission queue. Initialized upon construction. 460 */ 461 private ForkJoinTask<?>[] submissionQueue; 462 463 /** 464 * Lock protecting submissions array for addSubmission 465 */ 466 private final ReentrantLock submissionLock; 467 468 /** 469 * Condition for awaitTermination, using submissionLock for 470 * convenience. 471 */ 472 private final Condition termination; 473 474 /** 475 * Creation factory for worker threads. 476 */ 477 private final ForkJoinWorkerThreadFactory factory; 478 479 /** 480 * The uncaught exception handler used when any worker abruptly 481 * terminates. 482 */ 483 final Thread.UncaughtExceptionHandler ueh; 484 485 /** 486 * Prefix for assigning names to worker threads 487 */ 488 private final String workerNamePrefix; 489 490 /** 491 * Sum of per-thread steal counts, updated only when threads are 492 * idle or terminating. 493 */ 494 private volatile long stealCount; 495 496 /** 497 * Main pool control -- a long packed with: 498 * AC: Number of active running workers minus target parallelism (16 bits) 499 * TC: Number of total workers minus target parallelism (16bits) 500 * ST: true if pool is terminating (1 bit) 501 * EC: the wait count of top waiting thread (15 bits) 502 * ID: ~poolIndex of top of Treiber stack of waiting threads (16 bits) 503 * 504 * When convenient, we can extract the upper 32 bits of counts and 505 * the lower 32 bits of queue state, u = (int)(ctl >>> 32) and e = 506 * (int)ctl. The ec field is never accessed alone, but always 507 * together with id and st. The offsets of counts by the target 508 * parallelism and the positionings of fields makes it possible to 509 * perform the most common checks via sign tests of fields: When 510 * ac is negative, there are not enough active workers, when tc is 511 * negative, there are not enough total workers, when id is 512 * negative, there is at least one waiting worker, and when e is 513 * negative, the pool is terminating. To deal with these possibly 514 * negative fields, we use casts in and out of "short" and/or 515 * signed shifts to maintain signedness. 516 */ 517 volatile long ctl; 518 519 // bit positions/shifts for fields 520 private static final int AC_SHIFT = 48; 521 private static final int TC_SHIFT = 32; 522 private static final int ST_SHIFT = 31; 523 private static final int EC_SHIFT = 16; 524 525 // bounds 526 private static final int MAX_ID = 0x7fff; // max poolIndex 527 private static final int SMASK = 0xffff; // mask short bits 528 private static final int SHORT_SIGN = 1 << 15; 529 private static final int INT_SIGN = 1 << 31; 530 531 // masks 532 private static final long STOP_BIT = 0x0001L << ST_SHIFT; 533 private static final long AC_MASK = ((long)SMASK) << AC_SHIFT; 534 private static final long TC_MASK = ((long)SMASK) << TC_SHIFT; 535 536 // units for incrementing and decrementing 537 private static final long TC_UNIT = 1L << TC_SHIFT; 538 private static final long AC_UNIT = 1L << AC_SHIFT; 539 540 // masks and units for dealing with u = (int)(ctl >>> 32) 541 private static final int UAC_SHIFT = AC_SHIFT - 32; 542 private static final int UTC_SHIFT = TC_SHIFT - 32; 543 private static final int UAC_MASK = SMASK << UAC_SHIFT; 544 private static final int UTC_MASK = SMASK << UTC_SHIFT; 545 private static final int UAC_UNIT = 1 << UAC_SHIFT; 546 private static final int UTC_UNIT = 1 << UTC_SHIFT; 547 548 // masks and units for dealing with e = (int)ctl 549 private static final int E_MASK = 0x7fffffff; // no STOP_BIT 550 private static final int EC_UNIT = 1 << EC_SHIFT; 551 552 /** 553 * The target parallelism level. 554 */ 555 final int parallelism; 556 557 /** 558 * Index (mod submission queue length) of next element to take 559 * from submission queue. Usage is identical to that for 560 * per-worker queues -- see ForkJoinWorkerThread internal 561 * documentation. 562 */ 563 volatile int queueBase; 564 565 /** 566 * Index (mod submission queue length) of next element to add 567 * in submission queue. Usage is identical to that for 568 * per-worker queues -- see ForkJoinWorkerThread internal 569 * documentation. 570 */ 571 int queueTop; 572 573 /** 574 * True when shutdown() has been called. 575 */ 576 volatile boolean shutdown; 577 578 /** 579 * True if use local fifo, not default lifo, for local polling 580 * Read by, and replicated by ForkJoinWorkerThreads 581 */ 582 final boolean locallyFifo; 583 584 /** 585 * The number of threads in ForkJoinWorkerThreads.helpQuiescePool. 586 * When non-zero, suppresses automatic shutdown when active 587 * counts become zero. 588 */ 589 volatile int quiescerCount; 590 591 /** 592 * The number of threads blocked in join. 593 */ 594 volatile int blockedCount; 595 596 /** 597 * Counter for worker Thread names (unrelated to their poolIndex) 598 */ 599 private volatile int nextWorkerNumber; 600 601 /** 602 * The index for the next created worker. Accessed under scanGuard. 603 */ 604 private int nextWorkerIndex; 605 606 /** 607 * SeqLock and index masking for updates to workers array. Locked 608 * when SG_UNIT is set. Unlocking clears bit by adding 609 * SG_UNIT. Staleness of read-only operations can be checked by 610 * comparing scanGuard to value before the reads. The low 16 bits 611 * (i.e, anding with SMASK) hold (the smallest power of two 612 * covering all worker indices, minus one, and is used to avoid 613 * dealing with large numbers of null slots when the workers array 614 * is overallocated. 615 */ 616 volatile int scanGuard; 617 618 private static final int SG_UNIT = 1 << 16; 619 620 /** 621 * The wakeup interval (in nanoseconds) for a worker waiting for a 622 * task when the pool is quiescent to instead try to shrink the 623 * number of workers. The exact value does not matter too 624 * much. It must be short enough to release resources during 625 * sustained periods of idleness, but not so short that threads 626 * are continually re-created. 627 */ 628 private static final long SHRINK_RATE = 629 4L * 1000L * 1000L * 1000L; // 4 seconds 630 631 /** 632 * Top-level loop for worker threads: On each step: if the 633 * previous step swept through all queues and found no tasks, or 634 * there are excess threads, then possibly blocks. Otherwise, 635 * scans for and, if found, executes a task. Returns when pool 636 * and/or worker terminate. 637 * 638 * @param w the worker 639 */ 640 final void work(ForkJoinWorkerThread w) { 641 boolean swept = false; // true on empty scans 642 long c; 643 while (!w.terminate && (int)(c = ctl) >= 0) { 644 int a; // active count 645 if (!swept && (a = (int)(c >> AC_SHIFT)) <= 0) 646 swept = scan(w, a); 647 else if (tryAwaitWork(w, c)) 648 swept = false; 649 } 650 } 651 652 // Signalling 653 654 /** 655 * Wakes up or creates a worker. 656 */ 657 final void signalWork() { 658 /* 659 * The while condition is true if: (there is are too few total 660 * workers OR there is at least one waiter) AND (there are too 661 * few active workers OR the pool is terminating). The value 662 * of e distinguishes the remaining cases: zero (no waiters) 663 * for create, negative if terminating (in which case do 664 * nothing), else release a waiter. The secondary checks for 665 * release (non-null array etc) can fail if the pool begins 666 * terminating after the test, and don't impose any added cost 667 * because JVMs must perform null and bounds checks anyway. 668 */ 669 long c; int e, u; 670 while ((((e = (int)(c = ctl)) | (u = (int)(c >>> 32))) & 671 (INT_SIGN|SHORT_SIGN)) == (INT_SIGN|SHORT_SIGN) && e >= 0) { 672 if (e > 0) { // release a waiting worker 673 int i; ForkJoinWorkerThread w; ForkJoinWorkerThread[] ws; 674 if ((ws = workers) == null || 675 (i = ~e & SMASK) >= ws.length || 676 (w = ws[i]) == null) 677 break; 678 long nc = (((long)(w.nextWait & E_MASK)) | 679 ((long)(u + UAC_UNIT) << 32)); 680 if (w.eventCount == e && 681 UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) { 682 w.eventCount = (e + EC_UNIT) & E_MASK; 683 if (w.parked) 684 UNSAFE.unpark(w); 685 break; 686 } 687 } 688 else if (UNSAFE.compareAndSwapLong 689 (this, ctlOffset, c, 690 (long)(((u + UTC_UNIT) & UTC_MASK) | 691 ((u + UAC_UNIT) & UAC_MASK)) << 32)) { 692 addWorker(); 693 break; 694 } 695 } 696 } 697 698 /** 699 * Variant of signalWork to help release waiters on rescans. 700 * Tries once to release a waiter if active count < 0. 701 * 702 * @return false if failed due to contention, else true 703 */ 704 private boolean tryReleaseWaiter() { 705 long c; int e, i; ForkJoinWorkerThread w; ForkJoinWorkerThread[] ws; 706 if ((e = (int)(c = ctl)) > 0 && 707 (int)(c >> AC_SHIFT) < 0 && 708 (ws = workers) != null && 709 (i = ~e & SMASK) < ws.length && 710 (w = ws[i]) != null) { 711 long nc = ((long)(w.nextWait & E_MASK) | 712 ((c + AC_UNIT) & (AC_MASK|TC_MASK))); 713 if (w.eventCount != e || 714 !UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) 715 return false; 716 w.eventCount = (e + EC_UNIT) & E_MASK; 717 if (w.parked) 718 UNSAFE.unpark(w); 719 } 720 return true; 721 } 722 723 // Scanning for tasks 724 725 /** 726 * Scans for and, if found, executes one task. Scans start at a 727 * random index of workers array, and randomly select the first 728 * (2*#workers)-1 probes, and then, if all empty, resort to 2 729 * circular sweeps, which is necessary to check quiescence. and 730 * taking a submission only if no stealable tasks were found. The 731 * steal code inside the loop is a specialized form of 732 * ForkJoinWorkerThread.deqTask, followed bookkeeping to support 733 * helpJoinTask and signal propagation. The code for submission 734 * queues is almost identical. On each steal, the worker completes 735 * not only the task, but also all local tasks that this task may 736 * have generated. On detecting staleness or contention when 737 * trying to take a task, this method returns without finishing 738 * sweep, which allows global state rechecks before retry. 739 * 740 * @param w the worker 741 * @param a the number of active workers 742 * @return true if swept all queues without finding a task 743 */ 744 private boolean scan(ForkJoinWorkerThread w, int a) { 745 int g = scanGuard; // mask 0 avoids useless scans if only one active 746 int m = (parallelism == 1 - a && blockedCount == 0) ? 0 : g & SMASK; 747 ForkJoinWorkerThread[] ws = workers; 748 if (ws == null || ws.length <= m) // staleness check 749 return false; 750 for (int r = w.seed, k = r, j = -(m + m); j <= m + m; ++j) { 751 ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i; 752 ForkJoinWorkerThread v = ws[k & m]; 753 if (v != null && (b = v.queueBase) != v.queueTop && 754 (q = v.queue) != null && (i = (q.length - 1) & b) >= 0) { 755 long u = (i << ASHIFT) + ABASE; 756 if ((t = q[i]) != null && v.queueBase == b && 757 UNSAFE.compareAndSwapObject(q, u, t, null)) { 758 int d = (v.queueBase = b + 1) - v.queueTop; 759 v.stealHint = w.poolIndex; 760 if (d != 0) 761 signalWork(); // propagate if nonempty 762 w.execTask(t); 763 } 764 r ^= r << 13; r ^= r >>> 17; w.seed = r ^ (r << 5); 765 return false; // store next seed 766 } 767 else if (j < 0) { // xorshift 768 r ^= r << 13; r ^= r >>> 17; k = r ^= r << 5; 769 } 770 else 771 ++k; 772 } 773 if (scanGuard != g) // staleness check 774 return false; 775 else { // try to take submission 776 ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i; 777 if ((b = queueBase) != queueTop && 778 (q = submissionQueue) != null && 779 (i = (q.length - 1) & b) >= 0) { 780 long u = (i << ASHIFT) + ABASE; 781 if ((t = q[i]) != null && queueBase == b && 782 UNSAFE.compareAndSwapObject(q, u, t, null)) { 783 queueBase = b + 1; 784 w.execTask(t); 785 } 786 return false; 787 } 788 return true; // all queues empty 789 } 790 } 791 792 /** 793 * Tries to enqueue worker w in wait queue and await change in 794 * worker's eventCount. If the pool is quiescent and there is 795 * more than one worker, possibly terminates worker upon exit. 796 * Otherwise, before blocking, rescans queues to avoid missed 797 * signals. Upon finding work, releases at least one worker 798 * (which may be the current worker). Rescans restart upon 799 * detected staleness or failure to release due to 800 * contention. Note the unusual conventions about Thread.interrupt 801 * here and elsewhere: Because interrupts are used solely to alert 802 * threads to check termination, which is checked here anyway, we 803 * clear status (using Thread.interrupted) before any call to 804 * park, so that park does not immediately return due to status 805 * being set via some other unrelated call to interrupt in user 806 * code. 807 * 808 * @param w the calling worker 809 * @param c the ctl value on entry 810 * @return true if waited or another thread was released upon enq 811 */ 812 private boolean tryAwaitWork(ForkJoinWorkerThread w, long c) { 813 int v = w.eventCount; 814 w.nextWait = (int)c; // w's successor record 815 long nc = (long)(v & E_MASK) | ((c - AC_UNIT) & (AC_MASK|TC_MASK)); 816 if (ctl != c || !UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) { 817 long d = ctl; // return true if lost to a deq, to force scan 818 return (int)d != (int)c && ((d - c) & AC_MASK) >= 0L; 819 } 820 for (int sc = w.stealCount; sc != 0;) { // accumulate stealCount 821 long s = stealCount; 822 if (UNSAFE.compareAndSwapLong(this, stealCountOffset, s, s + sc)) 823 sc = w.stealCount = 0; 824 else if (w.eventCount != v) 825 return true; // update next time 826 } 827 if ((!shutdown || !tryTerminate(false)) && 828 (int)c != 0 && parallelism + (int)(nc >> AC_SHIFT) == 0 && 829 blockedCount == 0 && quiescerCount == 0) 830 idleAwaitWork(w, nc, c, v); // quiescent 831 for (boolean rescanned = false;;) { 832 if (w.eventCount != v) 833 return true; 834 if (!rescanned) { 835 int g = scanGuard, m = g & SMASK; 836 ForkJoinWorkerThread[] ws = workers; 837 if (ws != null && m < ws.length) { 838 rescanned = true; 839 for (int i = 0; i <= m; ++i) { 840 ForkJoinWorkerThread u = ws[i]; 841 if (u != null) { 842 if (u.queueBase != u.queueTop && 843 !tryReleaseWaiter()) 844 rescanned = false; // contended 845 if (w.eventCount != v) 846 return true; 847 } 848 } 849 } 850 if (scanGuard != g || // stale 851 (queueBase != queueTop && !tryReleaseWaiter())) 852 rescanned = false; 853 if (!rescanned) 854 Thread.yield(); // reduce contention 855 else 856 Thread.interrupted(); // clear before park 857 } 858 else { 859 w.parked = true; // must recheck 860 if (w.eventCount != v) { 861 w.parked = false; 862 return true; 863 } 864 LockSupport.park(this); 865 rescanned = w.parked = false; 866 } 867 } 868 } 869 870 /** 871 * If inactivating worker w has caused pool to become 872 * quiescent, check for pool termination, and wait for event 873 * for up to SHRINK_RATE nanosecs (rescans are unnecessary in 874 * this case because quiescence reflects consensus about lack 875 * of work). On timeout, if ctl has not changed, terminate the 876 * worker. Upon its termination (see deregisterWorker), it may 877 * wake up another worker to possibly repeat this process. 878 * 879 * @param w the calling worker 880 * @param currentCtl the ctl value after enqueuing w 881 * @param prevCtl the ctl value if w terminated 882 * @param v the eventCount w awaits change 883 */ 884 private void idleAwaitWork(ForkJoinWorkerThread w, long currentCtl, 885 long prevCtl, int v) { 886 if (w.eventCount == v) { 887 if (shutdown) 888 tryTerminate(false); 889 ForkJoinTask.helpExpungeStaleExceptions(); // help clean weak refs 890 while (ctl == currentCtl) { 891 long startTime = System.nanoTime(); 892 w.parked = true; 893 if (w.eventCount == v) // must recheck 894 LockSupport.parkNanos(this, SHRINK_RATE); 895 w.parked = false; 896 if (w.eventCount != v) 897 break; 898 else if (System.nanoTime() - startTime < 899 SHRINK_RATE - (SHRINK_RATE / 10)) // timing slop 900 Thread.interrupted(); // spurious wakeup 901 else if (UNSAFE.compareAndSwapLong(this, ctlOffset, 902 currentCtl, prevCtl)) { 903 w.terminate = true; // restore previous 904 w.eventCount = ((int)currentCtl + EC_UNIT) & E_MASK; 905 break; 906 } 907 } 908 } 909 } 910 911 // Submissions 912 913 /** 914 * Enqueues the given task in the submissionQueue. Same idea as 915 * ForkJoinWorkerThread.pushTask except for use of submissionLock. 916 * 917 * @param t the task 918 */ 919 private void addSubmission(ForkJoinTask<?> t) { 920 final ReentrantLock lock = this.submissionLock; 921 lock.lock(); 922 try { 923 ForkJoinTask<?>[] q; int s, m; 924 if ((q = submissionQueue) != null) { // ignore if queue removed 925 long u = (((s = queueTop) & (m = q.length-1)) << ASHIFT)+ABASE; 926 UNSAFE.putOrderedObject(q, u, t); 927 queueTop = s + 1; 928 if (s - queueBase == m) 929 growSubmissionQueue(); 930 } 931 } finally { 932 lock.unlock(); 933 } 934 signalWork(); 935 } 936 937 // (pollSubmission is defined below with exported methods) 938 939 /** 940 * Creates or doubles submissionQueue array. 941 * Basically identical to ForkJoinWorkerThread version. 942 */ 943 private void growSubmissionQueue() { 944 ForkJoinTask<?>[] oldQ = submissionQueue; 945 int size = oldQ != null ? oldQ.length << 1 : INITIAL_QUEUE_CAPACITY; 946 if (size > MAXIMUM_QUEUE_CAPACITY) 947 throw new RejectedExecutionException("Queue capacity exceeded"); 948 if (size < INITIAL_QUEUE_CAPACITY) 949 size = INITIAL_QUEUE_CAPACITY; 950 ForkJoinTask<?>[] q = submissionQueue = new ForkJoinTask<?>[size]; 951 int mask = size - 1; 952 int top = queueTop; 953 int oldMask; 954 if (oldQ != null && (oldMask = oldQ.length - 1) >= 0) { 955 for (int b = queueBase; b != top; ++b) { 956 long u = ((b & oldMask) << ASHIFT) + ABASE; 957 Object x = UNSAFE.getObjectVolatile(oldQ, u); 958 if (x != null && UNSAFE.compareAndSwapObject(oldQ, u, x, null)) 959 UNSAFE.putObjectVolatile 960 (q, ((b & mask) << ASHIFT) + ABASE, x); 961 } 962 } 963 } 964 965 // Blocking support 966 967 /** 968 * Tries to increment blockedCount, decrement active count 969 * (sometimes implicitly) and possibly release or create a 970 * compensating worker in preparation for blocking. Fails 971 * on contention or termination. 972 * 973 * @return true if the caller can block, else should recheck and retry 974 */ 975 private boolean tryPreBlock() { 976 int b = blockedCount; 977 if (UNSAFE.compareAndSwapInt(this, blockedCountOffset, b, b + 1)) { 978 int pc = parallelism; 979 do { 980 ForkJoinWorkerThread[] ws; ForkJoinWorkerThread w; 981 int e, ac, tc, rc, i; 982 long c = ctl; 983 int u = (int)(c >>> 32); 984 if ((e = (int)c) < 0) { 985 // skip -- terminating 986 } 987 else if ((ac = (u >> UAC_SHIFT)) <= 0 && e != 0 && 988 (ws = workers) != null && 989 (i = ~e & SMASK) < ws.length && 990 (w = ws[i]) != null) { 991 long nc = ((long)(w.nextWait & E_MASK) | 992 (c & (AC_MASK|TC_MASK))); 993 if (w.eventCount == e && 994 UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) { 995 w.eventCount = (e + EC_UNIT) & E_MASK; 996 if (w.parked) 997 UNSAFE.unpark(w); 998 return true; // release an idle worker 999 } 1000 } 1001 else if ((tc = (short)(u >>> UTC_SHIFT)) >= 0 && ac + pc > 1) { 1002 long nc = ((c - AC_UNIT) & AC_MASK) | (c & ~AC_MASK); 1003 if (UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) 1004 return true; // no compensation needed 1005 } 1006 else if (tc + pc < MAX_ID) { 1007 long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK); 1008 if (UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) { 1009 addWorker(); 1010 return true; // create a replacement 1011 } 1012 } 1013 // try to back out on any failure and let caller retry 1014 } while (!UNSAFE.compareAndSwapInt(this, blockedCountOffset, 1015 b = blockedCount, b - 1)); 1016 } 1017 return false; 1018 } 1019 1020 /** 1021 * Decrements blockedCount and increments active count 1022 */ 1023 private void postBlock() { 1024 long c; 1025 do {} while (!UNSAFE.compareAndSwapLong(this, ctlOffset, // no mask 1026 c = ctl, c + AC_UNIT)); 1027 int b; 1028 do {} while (!UNSAFE.compareAndSwapInt(this, blockedCountOffset, 1029 b = blockedCount, b - 1)); 1030 } 1031 1032 /** 1033 * Possibly blocks waiting for the given task to complete, or 1034 * cancels the task if terminating. Fails to wait if contended. 1035 * 1036 * @param joinMe the task 1037 */ 1038 final void tryAwaitJoin(ForkJoinTask<?> joinMe) { 1039 int s; 1040 Thread.interrupted(); // clear interrupts before checking termination 1041 if (joinMe.status >= 0) { 1042 if (tryPreBlock()) { 1043 joinMe.tryAwaitDone(0L); 1044 postBlock(); 1045 } 1046 else if ((ctl & STOP_BIT) != 0L) 1047 joinMe.cancelIgnoringExceptions(); 1048 } 1049 } 1050 1051 /** 1052 * Possibly blocks the given worker waiting for joinMe to 1053 * complete or timeout 1054 * 1055 * @param joinMe the task 1056 * @param millis the wait time for underlying Object.wait 1057 */ 1058 final void timedAwaitJoin(ForkJoinTask<?> joinMe, long nanos) { 1059 while (joinMe.status >= 0) { 1060 Thread.interrupted(); 1061 if ((ctl & STOP_BIT) != 0L) { 1062 joinMe.cancelIgnoringExceptions(); 1063 break; 1064 } 1065 if (tryPreBlock()) { 1066 long last = System.nanoTime(); 1067 while (joinMe.status >= 0) { 1068 long millis = TimeUnit.NANOSECONDS.toMillis(nanos); 1069 if (millis <= 0) 1070 break; 1071 joinMe.tryAwaitDone(millis); 1072 if (joinMe.status < 0) 1073 break; 1074 if ((ctl & STOP_BIT) != 0L) { 1075 joinMe.cancelIgnoringExceptions(); 1076 break; 1077 } 1078 long now = System.nanoTime(); 1079 nanos -= now - last; 1080 last = now; 1081 } 1082 postBlock(); 1083 break; 1084 } 1085 } 1086 } 1087 1088 /** 1089 * If necessary, compensates for blocker, and blocks 1090 */ 1091 private void awaitBlocker(ManagedBlocker blocker) 1092 throws InterruptedException { 1093 while (!blocker.isReleasable()) { 1094 if (tryPreBlock()) { 1095 try { 1096 do {} while (!blocker.isReleasable() && !blocker.block()); 1097 } finally { 1098 postBlock(); 1099 } 1100 break; 1101 } 1102 } 1103 } 1104 1105 // Creating, registering and deregistring workers 1106 1107 /** 1108 * Tries to create and start a worker; minimally rolls back counts 1109 * on failure. 1110 */ 1111 private void addWorker() { 1112 Throwable ex = null; 1113 ForkJoinWorkerThread t = null; 1114 try { 1115 t = factory.newThread(this); 1116 } catch (Throwable e) { 1117 ex = e; 1118 } 1119 if (t == null) { // null or exceptional factory return 1120 long c; // adjust counts 1121 do {} while (!UNSAFE.compareAndSwapLong 1122 (this, ctlOffset, c = ctl, 1123 (((c - AC_UNIT) & AC_MASK) | 1124 ((c - TC_UNIT) & TC_MASK) | 1125 (c & ~(AC_MASK|TC_MASK))))); 1126 // Propagate exception if originating from an external caller 1127 if (!tryTerminate(false) && ex != null && 1128 !(Thread.currentThread() instanceof ForkJoinWorkerThread)) 1129 UNSAFE.throwException(ex); 1130 } 1131 else 1132 t.start(); 1133 } 1134 1135 /** 1136 * Callback from ForkJoinWorkerThread constructor to assign a 1137 * public name 1138 */ 1139 final String nextWorkerName() { 1140 for (int n;;) { 1141 if (UNSAFE.compareAndSwapInt(this, nextWorkerNumberOffset, 1142 n = nextWorkerNumber, ++n)) 1143 return workerNamePrefix + n; 1144 } 1145 } 1146 1147 /** 1148 * Callback from ForkJoinWorkerThread constructor to 1149 * determine its poolIndex and record in workers array. 1150 * 1151 * @param w the worker 1152 * @return the worker's pool index 1153 */ 1154 final int registerWorker(ForkJoinWorkerThread w) { 1155 /* 1156 * In the typical case, a new worker acquires the lock, uses 1157 * next available index and returns quickly. Since we should 1158 * not block callers (ultimately from signalWork or 1159 * tryPreBlock) waiting for the lock needed to do this, we 1160 * instead help release other workers while waiting for the 1161 * lock. 1162 */ 1163 for (int g;;) { 1164 ForkJoinWorkerThread[] ws; 1165 if (((g = scanGuard) & SG_UNIT) == 0 && 1166 UNSAFE.compareAndSwapInt(this, scanGuardOffset, 1167 g, g | SG_UNIT)) { 1168 int k = nextWorkerIndex; 1169 try { 1170 if ((ws = workers) != null) { // ignore on shutdown 1171 int n = ws.length; 1172 if (k < 0 || k >= n || ws[k] != null) { 1173 for (k = 0; k < n && ws[k] != null; ++k) 1174 ; 1175 if (k == n) 1176 ws = workers = Arrays.copyOf(ws, n << 1); 1177 } 1178 ws[k] = w; 1179 nextWorkerIndex = k + 1; 1180 int m = g & SMASK; 1181 g = (k > m) ? ((m << 1) + 1) & SMASK : g + (SG_UNIT<<1); 1182 } 1183 } finally { 1184 scanGuard = g; 1185 } 1186 return k; 1187 } 1188 else if ((ws = workers) != null) { // help release others 1189 for (ForkJoinWorkerThread u : ws) { 1190 if (u != null && u.queueBase != u.queueTop) { 1191 if (tryReleaseWaiter()) 1192 break; 1193 } 1194 } 1195 } 1196 } 1197 } 1198 1199 /** 1200 * Final callback from terminating worker. Removes record of 1201 * worker from array, and adjusts counts. If pool is shutting 1202 * down, tries to complete termination. 1203 * 1204 * @param w the worker 1205 */ 1206 final void deregisterWorker(ForkJoinWorkerThread w, Throwable ex) { 1207 int idx = w.poolIndex; 1208 int sc = w.stealCount; 1209 int steps = 0; 1210 // Remove from array, adjust worker counts and collect steal count. 1211 // We can intermix failed removes or adjusts with steal updates 1212 do { 1213 long s, c; 1214 int g; 1215 if (steps == 0 && ((g = scanGuard) & SG_UNIT) == 0 && 1216 UNSAFE.compareAndSwapInt(this, scanGuardOffset, 1217 g, g |= SG_UNIT)) { 1218 ForkJoinWorkerThread[] ws = workers; 1219 if (ws != null && idx >= 0 && 1220 idx < ws.length && ws[idx] == w) 1221 ws[idx] = null; // verify 1222 nextWorkerIndex = idx; 1223 scanGuard = g + SG_UNIT; 1224 steps = 1; 1225 } 1226 if (steps == 1 && 1227 UNSAFE.compareAndSwapLong(this, ctlOffset, c = ctl, 1228 (((c - AC_UNIT) & AC_MASK) | 1229 ((c - TC_UNIT) & TC_MASK) | 1230 (c & ~(AC_MASK|TC_MASK))))) 1231 steps = 2; 1232 if (sc != 0 && 1233 UNSAFE.compareAndSwapLong(this, stealCountOffset, 1234 s = stealCount, s + sc)) 1235 sc = 0; 1236 } while (steps != 2 || sc != 0); 1237 if (!tryTerminate(false)) { 1238 if (ex != null) // possibly replace if died abnormally 1239 signalWork(); 1240 else 1241 tryReleaseWaiter(); 1242 } 1243 } 1244 1245 // Shutdown and termination 1246 1247 /** 1248 * Possibly initiates and/or completes termination. 1249 * 1250 * @param now if true, unconditionally terminate, else only 1251 * if shutdown and empty queue and no active workers 1252 * @return true if now terminating or terminated 1253 */ 1254 private boolean tryTerminate(boolean now) { 1255 long c; 1256 while (((c = ctl) & STOP_BIT) == 0) { 1257 if (!now) { 1258 if ((int)(c >> AC_SHIFT) != -parallelism) 1259 return false; 1260 if (!shutdown || blockedCount != 0 || quiescerCount != 0 || 1261 queueBase != queueTop) { 1262 if (ctl == c) // staleness check 1263 return false; 1264 continue; 1265 } 1266 } 1267 if (UNSAFE.compareAndSwapLong(this, ctlOffset, c, c | STOP_BIT)) 1268 startTerminating(); 1269 } 1270 if ((short)(c >>> TC_SHIFT) == -parallelism) { // signal when 0 workers 1271 final ReentrantLock lock = this.submissionLock; 1272 lock.lock(); 1273 try { 1274 termination.signalAll(); 1275 } finally { 1276 lock.unlock(); 1277 } 1278 } 1279 return true; 1280 } 1281 1282 /** 1283 * Runs up to three passes through workers: (0) Setting 1284 * termination status for each worker, followed by wakeups up to 1285 * queued workers; (1) helping cancel tasks; (2) interrupting 1286 * lagging threads (likely in external tasks, but possibly also 1287 * blocked in joins). Each pass repeats previous steps because of 1288 * potential lagging thread creation. 1289 */ 1290 private void startTerminating() { 1291 cancelSubmissions(); 1292 for (int pass = 0; pass < 3; ++pass) { 1293 ForkJoinWorkerThread[] ws = workers; 1294 if (ws != null) { 1295 for (ForkJoinWorkerThread w : ws) { 1296 if (w != null) { 1297 w.terminate = true; 1298 if (pass > 0) { 1299 w.cancelTasks(); 1300 if (pass > 1 && !w.isInterrupted()) { 1301 try { 1302 w.interrupt(); 1303 } catch (SecurityException ignore) { 1304 } 1305 } 1306 } 1307 } 1308 } 1309 terminateWaiters(); 1310 } 1311 } 1312 } 1313 1314 /** 1315 * Polls and cancels all submissions. Called only during termination. 1316 */ 1317 private void cancelSubmissions() { 1318 while (queueBase != queueTop) { 1319 ForkJoinTask<?> task = pollSubmission(); 1320 if (task != null) { 1321 try { 1322 task.cancel(false); 1323 } catch (Throwable ignore) { 1324 } 1325 } 1326 } 1327 } 1328 1329 /** 1330 * Tries to set the termination status of waiting workers, and 1331 * then wakes them up (after which they will terminate). 1332 */ 1333 private void terminateWaiters() { 1334 ForkJoinWorkerThread[] ws = workers; 1335 if (ws != null) { 1336 ForkJoinWorkerThread w; long c; int i, e; 1337 int n = ws.length; 1338 while ((i = ~(e = (int)(c = ctl)) & SMASK) < n && 1339 (w = ws[i]) != null && w.eventCount == (e & E_MASK)) { 1340 if (UNSAFE.compareAndSwapLong(this, ctlOffset, c, 1341 (long)(w.nextWait & E_MASK) | 1342 ((c + AC_UNIT) & AC_MASK) | 1343 (c & (TC_MASK|STOP_BIT)))) { 1344 w.terminate = true; 1345 w.eventCount = e + EC_UNIT; 1346 if (w.parked) 1347 UNSAFE.unpark(w); 1348 } 1349 } 1350 } 1351 } 1352 1353 // misc ForkJoinWorkerThread support 1354 1355 /** 1356 * Increment or decrement quiescerCount. Needed only to prevent 1357 * triggering shutdown if a worker is transiently inactive while 1358 * checking quiescence. 1359 * 1360 * @param delta 1 for increment, -1 for decrement 1361 */ 1362 final void addQuiescerCount(int delta) { 1363 int c; 1364 do {} while (!UNSAFE.compareAndSwapInt(this, quiescerCountOffset, 1365 c = quiescerCount, c + delta)); 1366 } 1367 1368 /** 1369 * Directly increment or decrement active count without 1370 * queuing. This method is used to transiently assert inactivation 1371 * while checking quiescence. 1372 * 1373 * @param delta 1 for increment, -1 for decrement 1374 */ 1375 final void addActiveCount(int delta) { 1376 long d = delta < 0 ? -AC_UNIT : AC_UNIT; 1377 long c; 1378 do {} while (!UNSAFE.compareAndSwapLong(this, ctlOffset, c = ctl, 1379 ((c + d) & AC_MASK) | 1380 (c & ~AC_MASK))); 1381 } 1382 1383 /** 1384 * Returns the approximate (non-atomic) number of idle threads per 1385 * active thread. 1386 */ 1387 final int idlePerActive() { 1388 // Approximate at powers of two for small values, saturate past 4 1389 int p = parallelism; 1390 int a = p + (int)(ctl >> AC_SHIFT); 1391 return (a > (p >>>= 1) ? 0 : 1392 a > (p >>>= 1) ? 1 : 1393 a > (p >>>= 1) ? 2 : 1394 a > (p >>>= 1) ? 4 : 1395 8); 1396 } 1397 1398 // Exported methods 1399 1400 // Constructors 1401 1402 /** 1403 * Creates a {@code ForkJoinPool} with parallelism equal to {@link 1404 * java.lang.Runtime#availableProcessors}, using the {@linkplain 1405 * #defaultForkJoinWorkerThreadFactory default thread factory}, 1406 * no UncaughtExceptionHandler, and non-async LIFO processing mode. 1407 * 1408 * @throws SecurityException if a security manager exists and 1409 * the caller is not permitted to modify threads 1410 * because it does not hold {@link 1411 * java.lang.RuntimePermission}{@code ("modifyThread")} 1412 */ 1413 public ForkJoinPool() { 1414 this(Runtime.getRuntime().availableProcessors(), 1415 defaultForkJoinWorkerThreadFactory, null, false); 1416 } 1417 1418 /** 1419 * Creates a {@code ForkJoinPool} with the indicated parallelism 1420 * level, the {@linkplain 1421 * #defaultForkJoinWorkerThreadFactory default thread factory}, 1422 * no UncaughtExceptionHandler, and non-async LIFO processing mode. 1423 * 1424 * @param parallelism the parallelism level 1425 * @throws IllegalArgumentException if parallelism less than or 1426 * equal to zero, or greater than implementation limit 1427 * @throws SecurityException if a security manager exists and 1428 * the caller is not permitted to modify threads 1429 * because it does not hold {@link 1430 * java.lang.RuntimePermission}{@code ("modifyThread")} 1431 */ 1432 public ForkJoinPool(int parallelism) { 1433 this(parallelism, defaultForkJoinWorkerThreadFactory, null, false); 1434 } 1435 1436 /** 1437 * Creates a {@code ForkJoinPool} with the given parameters. 1438 * 1439 * @param parallelism the parallelism level. For default value, 1440 * use {@link java.lang.Runtime#availableProcessors}. 1441 * @param factory the factory for creating new threads. For default value, 1442 * use {@link #defaultForkJoinWorkerThreadFactory}. 1443 * @param handler the handler for internal worker threads that 1444 * terminate due to unrecoverable errors encountered while executing 1445 * tasks. For default value, use {@code null}. 1446 * @param asyncMode if true, 1447 * establishes local first-in-first-out scheduling mode for forked 1448 * tasks that are never joined. This mode may be more appropriate 1449 * than default locally stack-based mode in applications in which 1450 * worker threads only process event-style asynchronous tasks. 1451 * For default value, use {@code false}. 1452 * @throws IllegalArgumentException if parallelism less than or 1453 * equal to zero, or greater than implementation limit 1454 * @throws NullPointerException if the factory is null 1455 * @throws SecurityException if a security manager exists and 1456 * the caller is not permitted to modify threads 1457 * because it does not hold {@link 1458 * java.lang.RuntimePermission}{@code ("modifyThread")} 1459 */ 1460 public ForkJoinPool(int parallelism, 1461 ForkJoinWorkerThreadFactory factory, 1462 Thread.UncaughtExceptionHandler handler, 1463 boolean asyncMode) { 1464 checkPermission(); 1465 if (factory == null) 1466 throw new NullPointerException(); 1467 if (parallelism <= 0 || parallelism > MAX_ID) 1468 throw new IllegalArgumentException(); 1469 this.parallelism = parallelism; 1470 this.factory = factory; 1471 this.ueh = handler; 1472 this.locallyFifo = asyncMode; 1473 long np = (long)(-parallelism); // offset ctl counts 1474 this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK); 1475 this.submissionQueue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY]; 1476 // initialize workers array with room for 2*parallelism if possible 1477 int n = parallelism << 1; 1478 if (n >= MAX_ID) 1479 n = MAX_ID; 1480 else { // See Hackers Delight, sec 3.2, where n < (1 << 16) 1481 n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; 1482 } 1483 workers = new ForkJoinWorkerThread[n + 1]; 1484 this.submissionLock = new ReentrantLock(); 1485 this.termination = submissionLock.newCondition(); 1486 StringBuilder sb = new StringBuilder("ForkJoinPool-"); 1487 sb.append(poolNumberGenerator.incrementAndGet()); 1488 sb.append("-worker-"); 1489 this.workerNamePrefix = sb.toString(); 1490 } 1491 1492 // Execution methods 1493 1494 /** 1495 * Performs the given task, returning its result upon completion. 1496 * If the computation encounters an unchecked Exception or Error, 1497 * it is rethrown as the outcome of this invocation. Rethrown 1498 * exceptions behave in the same way as regular exceptions, but, 1499 * when possible, contain stack traces (as displayed for example 1500 * using {@code ex.printStackTrace()}) of both the current thread 1501 * as well as the thread actually encountering the exception; 1502 * minimally only the latter. 1503 * 1504 * @param task the task 1505 * @return the task's result 1506 * @throws NullPointerException if the task is null 1507 * @throws RejectedExecutionException if the task cannot be 1508 * scheduled for execution 1509 */ 1510 public <T> T invoke(ForkJoinTask<T> task) { 1511 Thread t = Thread.currentThread(); 1512 if (task == null) 1513 throw new NullPointerException(); 1514 if (shutdown) 1515 throw new RejectedExecutionException(); 1516 if ((t instanceof ForkJoinWorkerThread) && 1517 ((ForkJoinWorkerThread)t).pool == this) 1518 return task.invoke(); // bypass submit if in same pool 1519 else { 1520 addSubmission(task); 1521 return task.join(); 1522 } 1523 } 1524 1525 /** 1526 * Unless terminating, forks task if within an ongoing FJ 1527 * computation in the current pool, else submits as external task. 1528 */ 1529 private <T> void forkOrSubmit(ForkJoinTask<T> task) { 1530 ForkJoinWorkerThread w; 1531 Thread t = Thread.currentThread(); 1532 if (shutdown) 1533 throw new RejectedExecutionException(); 1534 if ((t instanceof ForkJoinWorkerThread) && 1535 (w = (ForkJoinWorkerThread)t).pool == this) 1536 w.pushTask(task); 1537 else 1538 addSubmission(task); 1539 } 1540 1541 /** 1542 * Arranges for (asynchronous) execution of the given task. 1543 * 1544 * @param task the task 1545 * @throws NullPointerException if the task is null 1546 * @throws RejectedExecutionException if the task cannot be 1547 * scheduled for execution 1548 */ 1549 public void execute(ForkJoinTask<?> task) { 1550 if (task == null) 1551 throw new NullPointerException(); 1552 forkOrSubmit(task); 1553 } 1554 1555 // AbstractExecutorService methods 1556 1557 /** 1558 * @throws NullPointerException if the task is null 1559 * @throws RejectedExecutionException if the task cannot be 1560 * scheduled for execution 1561 */ 1562 public void execute(Runnable task) { 1563 if (task == null) 1564 throw new NullPointerException(); 1565 ForkJoinTask<?> job; 1566 if (task instanceof ForkJoinTask<?>) // avoid re-wrap 1567 job = (ForkJoinTask<?>) task; 1568 else 1569 job = ForkJoinTask.adapt(task, null); 1570 forkOrSubmit(job); 1571 } 1572 1573 /** 1574 * Submits a ForkJoinTask for execution. 1575 * 1576 * @param task the task to submit 1577 * @return the task 1578 * @throws NullPointerException if the task is null 1579 * @throws RejectedExecutionException if the task cannot be 1580 * scheduled for execution 1581 */ 1582 public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) { 1583 if (task == null) 1584 throw new NullPointerException(); 1585 forkOrSubmit(task); 1586 return task; 1587 } 1588 1589 /** 1590 * @throws NullPointerException if the task is null 1591 * @throws RejectedExecutionException if the task cannot be 1592 * scheduled for execution 1593 */ 1594 public <T> ForkJoinTask<T> submit(Callable<T> task) { 1595 if (task == null) 1596 throw new NullPointerException(); 1597 ForkJoinTask<T> job = ForkJoinTask.adapt(task); 1598 forkOrSubmit(job); 1599 return job; 1600 } 1601 1602 /** 1603 * @throws NullPointerException if the task is null 1604 * @throws RejectedExecutionException if the task cannot be 1605 * scheduled for execution 1606 */ 1607 public <T> ForkJoinTask<T> submit(Runnable task, T result) { 1608 if (task == null) 1609 throw new NullPointerException(); 1610 ForkJoinTask<T> job = ForkJoinTask.adapt(task, result); 1611 forkOrSubmit(job); 1612 return job; 1613 } 1614 1615 /** 1616 * @throws NullPointerException if the task is null 1617 * @throws RejectedExecutionException if the task cannot be 1618 * scheduled for execution 1619 */ 1620 public ForkJoinTask<?> submit(Runnable task) { 1621 if (task == null) 1622 throw new NullPointerException(); 1623 ForkJoinTask<?> job; 1624 if (task instanceof ForkJoinTask<?>) // avoid re-wrap 1625 job = (ForkJoinTask<?>) task; 1626 else 1627 job = ForkJoinTask.adapt(task, null); 1628 forkOrSubmit(job); 1629 return job; 1630 } 1631 1632 /** 1633 * @throws NullPointerException {@inheritDoc} 1634 * @throws RejectedExecutionException {@inheritDoc} 1635 */ 1636 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) { 1637 ArrayList<ForkJoinTask<T>> forkJoinTasks = 1638 new ArrayList<ForkJoinTask<T>>(tasks.size()); 1639 for (Callable<T> task : tasks) 1640 forkJoinTasks.add(ForkJoinTask.adapt(task)); 1641 invoke(new InvokeAll<T>(forkJoinTasks)); 1642 1643 @SuppressWarnings({"unchecked", "rawtypes"}) 1644 List<Future<T>> futures = (List<Future<T>>) (List) forkJoinTasks; 1645 return futures; 1646 } 1647 1648 static final class InvokeAll<T> extends RecursiveAction { 1649 final ArrayList<ForkJoinTask<T>> tasks; 1650 InvokeAll(ArrayList<ForkJoinTask<T>> tasks) { this.tasks = tasks; } 1651 public void compute() { 1652 try { invokeAll(tasks); } 1653 catch (Exception ignore) {} 1654 } 1655 private static final long serialVersionUID = -7914297376763021607L; 1656 } 1657 1658 /** 1659 * Returns the factory used for constructing new workers. 1660 * 1661 * @return the factory used for constructing new workers 1662 */ 1663 public ForkJoinWorkerThreadFactory getFactory() { 1664 return factory; 1665 } 1666 1667 /** 1668 * Returns the handler for internal worker threads that terminate 1669 * due to unrecoverable errors encountered while executing tasks. 1670 * 1671 * @return the handler, or {@code null} if none 1672 */ 1673 public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() { 1674 return ueh; 1675 } 1676 1677 /** 1678 * Returns the targeted parallelism level of this pool. 1679 * 1680 * @return the targeted parallelism level of this pool 1681 */ 1682 public int getParallelism() { 1683 return parallelism; 1684 } 1685 1686 /** 1687 * Returns the number of worker threads that have started but not 1688 * yet terminated. The result returned by this method may differ 1689 * from {@link #getParallelism} when threads are created to 1690 * maintain parallelism when others are cooperatively blocked. 1691 * 1692 * @return the number of worker threads 1693 */ 1694 public int getPoolSize() { 1695 return parallelism + (short)(ctl >>> TC_SHIFT); 1696 } 1697 1698 /** 1699 * Returns {@code true} if this pool uses local first-in-first-out 1700 * scheduling mode for forked tasks that are never joined. 1701 * 1702 * @return {@code true} if this pool uses async mode 1703 */ 1704 public boolean getAsyncMode() { 1705 return locallyFifo; 1706 } 1707 1708 /** 1709 * Returns an estimate of the number of worker threads that are 1710 * not blocked waiting to join tasks or for other managed 1711 * synchronization. This method may overestimate the 1712 * number of running threads. 1713 * 1714 * @return the number of worker threads 1715 */ 1716 public int getRunningThreadCount() { 1717 int r = parallelism + (int)(ctl >> AC_SHIFT); 1718 return (r <= 0) ? 0 : r; // suppress momentarily negative values 1719 } 1720 1721 /** 1722 * Returns an estimate of the number of threads that are currently 1723 * stealing or executing tasks. This method may overestimate the 1724 * number of active threads. 1725 * 1726 * @return the number of active threads 1727 */ 1728 public int getActiveThreadCount() { 1729 int r = parallelism + (int)(ctl >> AC_SHIFT) + blockedCount; 1730 return (r <= 0) ? 0 : r; // suppress momentarily negative values 1731 } 1732 1733 /** 1734 * Returns {@code true} if all worker threads are currently idle. 1735 * An idle worker is one that cannot obtain a task to execute 1736 * because none are available to steal from other threads, and 1737 * there are no pending submissions to the pool. This method is 1738 * conservative; it might not return {@code true} immediately upon 1739 * idleness of all threads, but will eventually become true if 1740 * threads remain inactive. 1741 * 1742 * @return {@code true} if all threads are currently idle 1743 */ 1744 public boolean isQuiescent() { 1745 return parallelism + (int)(ctl >> AC_SHIFT) + blockedCount == 0; 1746 } 1747 1748 /** 1749 * Returns an estimate of the total number of tasks stolen from 1750 * one thread's work queue by another. The reported value 1751 * underestimates the actual total number of steals when the pool 1752 * is not quiescent. This value may be useful for monitoring and 1753 * tuning fork/join programs: in general, steal counts should be 1754 * high enough to keep threads busy, but low enough to avoid 1755 * overhead and contention across threads. 1756 * 1757 * @return the number of steals 1758 */ 1759 public long getStealCount() { 1760 return stealCount; 1761 } 1762 1763 /** 1764 * Returns an estimate of the total number of tasks currently held 1765 * in queues by worker threads (but not including tasks submitted 1766 * to the pool that have not begun executing). This value is only 1767 * an approximation, obtained by iterating across all threads in 1768 * the pool. This method may be useful for tuning task 1769 * granularities. 1770 * 1771 * @return the number of queued tasks 1772 */ 1773 public long getQueuedTaskCount() { 1774 long count = 0; 1775 ForkJoinWorkerThread[] ws; 1776 if ((short)(ctl >>> TC_SHIFT) > -parallelism && 1777 (ws = workers) != null) { 1778 for (ForkJoinWorkerThread w : ws) 1779 if (w != null) 1780 count -= w.queueBase - w.queueTop; // must read base first 1781 } 1782 return count; 1783 } 1784 1785 /** 1786 * Returns an estimate of the number of tasks submitted to this 1787 * pool that have not yet begun executing. This method may take 1788 * time proportional to the number of submissions. 1789 * 1790 * @return the number of queued submissions 1791 */ 1792 public int getQueuedSubmissionCount() { 1793 return -queueBase + queueTop; 1794 } 1795 1796 /** 1797 * Returns {@code true} if there are any tasks submitted to this 1798 * pool that have not yet begun executing. 1799 * 1800 * @return {@code true} if there are any queued submissions 1801 */ 1802 public boolean hasQueuedSubmissions() { 1803 return queueBase != queueTop; 1804 } 1805 1806 /** 1807 * Removes and returns the next unexecuted submission if one is 1808 * available. This method may be useful in extensions to this 1809 * class that re-assign work in systems with multiple pools. 1810 * 1811 * @return the next submission, or {@code null} if none 1812 */ 1813 protected ForkJoinTask<?> pollSubmission() { 1814 ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i; 1815 while ((b = queueBase) != queueTop && 1816 (q = submissionQueue) != null && 1817 (i = (q.length - 1) & b) >= 0) { 1818 long u = (i << ASHIFT) + ABASE; 1819 if ((t = q[i]) != null && 1820 queueBase == b && 1821 UNSAFE.compareAndSwapObject(q, u, t, null)) { 1822 queueBase = b + 1; 1823 return t; 1824 } 1825 } 1826 return null; 1827 } 1828 1829 /** 1830 * Removes all available unexecuted submitted and forked tasks 1831 * from scheduling queues and adds them to the given collection, 1832 * without altering their execution status. These may include 1833 * artificially generated or wrapped tasks. This method is 1834 * designed to be invoked only when the pool is known to be 1835 * quiescent. Invocations at other times may not remove all 1836 * tasks. A failure encountered while attempting to add elements 1837 * to collection {@code c} may result in elements being in 1838 * neither, either or both collections when the associated 1839 * exception is thrown. The behavior of this operation is 1840 * undefined if the specified collection is modified while the 1841 * operation is in progress. 1842 * 1843 * @param c the collection to transfer elements into 1844 * @return the number of elements transferred 1845 */ 1846 protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) { 1847 int count = 0; 1848 while (queueBase != queueTop) { 1849 ForkJoinTask<?> t = pollSubmission(); 1850 if (t != null) { 1851 c.add(t); 1852 ++count; 1853 } 1854 } 1855 ForkJoinWorkerThread[] ws; 1856 if ((short)(ctl >>> TC_SHIFT) > -parallelism && 1857 (ws = workers) != null) { 1858 for (ForkJoinWorkerThread w : ws) 1859 if (w != null) 1860 count += w.drainTasksTo(c); 1861 } 1862 return count; 1863 } 1864 1865 /** 1866 * Returns a string identifying this pool, as well as its state, 1867 * including indications of run state, parallelism level, and 1868 * worker and task counts. 1869 * 1870 * @return a string identifying this pool, as well as its state 1871 */ 1872 public String toString() { 1873 long st = getStealCount(); 1874 long qt = getQueuedTaskCount(); 1875 long qs = getQueuedSubmissionCount(); 1876 int pc = parallelism; 1877 long c = ctl; 1878 int tc = pc + (short)(c >>> TC_SHIFT); 1879 int rc = pc + (int)(c >> AC_SHIFT); 1880 if (rc < 0) // ignore transient negative 1881 rc = 0; 1882 int ac = rc + blockedCount; 1883 String level; 1884 if ((c & STOP_BIT) != 0) 1885 level = (tc == 0) ? "Terminated" : "Terminating"; 1886 else 1887 level = shutdown ? "Shutting down" : "Running"; 1888 return super.toString() + 1889 "[" + level + 1890 ", parallelism = " + pc + 1891 ", size = " + tc + 1892 ", active = " + ac + 1893 ", running = " + rc + 1894 ", steals = " + st + 1895 ", tasks = " + qt + 1896 ", submissions = " + qs + 1897 "]"; 1898 } 1899 1900 /** 1901 * Initiates an orderly shutdown in which previously submitted 1902 * tasks are executed, but no new tasks will be accepted. 1903 * Invocation has no additional effect if already shut down. 1904 * Tasks that are in the process of being submitted concurrently 1905 * during the course of this method may or may not be rejected. 1906 * 1907 * @throws SecurityException if a security manager exists and 1908 * the caller is not permitted to modify threads 1909 * because it does not hold {@link 1910 * java.lang.RuntimePermission}{@code ("modifyThread")} 1911 */ 1912 public void shutdown() { 1913 checkPermission(); 1914 shutdown = true; 1915 tryTerminate(false); 1916 } 1917 1918 /** 1919 * Attempts to cancel and/or stop all tasks, and reject all 1920 * subsequently submitted tasks. Tasks that are in the process of 1921 * being submitted or executed concurrently during the course of 1922 * this method may or may not be rejected. This method cancels 1923 * both existing and unexecuted tasks, in order to permit 1924 * termination in the presence of task dependencies. So the method 1925 * always returns an empty list (unlike the case for some other 1926 * Executors). 1927 * 1928 * @return an empty list 1929 * @throws SecurityException if a security manager exists and 1930 * the caller is not permitted to modify threads 1931 * because it does not hold {@link 1932 * java.lang.RuntimePermission}{@code ("modifyThread")} 1933 */ 1934 public List<Runnable> shutdownNow() { 1935 checkPermission(); 1936 shutdown = true; 1937 tryTerminate(true); 1938 return Collections.emptyList(); 1939 } 1940 1941 /** 1942 * Returns {@code true} if all tasks have completed following shut down. 1943 * 1944 * @return {@code true} if all tasks have completed following shut down 1945 */ 1946 public boolean isTerminated() { 1947 long c = ctl; 1948 return ((c & STOP_BIT) != 0L && 1949 (short)(c >>> TC_SHIFT) == -parallelism); 1950 } 1951 1952 /** 1953 * Returns {@code true} if the process of termination has 1954 * commenced but not yet completed. This method may be useful for 1955 * debugging. A return of {@code true} reported a sufficient 1956 * period after shutdown may indicate that submitted tasks have 1957 * ignored or suppressed interruption, or are waiting for IO, 1958 * causing this executor not to properly terminate. (See the 1959 * advisory notes for class {@link ForkJoinTask} stating that 1960 * tasks should not normally entail blocking operations. But if 1961 * they do, they must abort them on interrupt.) 1962 * 1963 * @return {@code true} if terminating but not yet terminated 1964 */ 1965 public boolean isTerminating() { 1966 long c = ctl; 1967 return ((c & STOP_BIT) != 0L && 1968 (short)(c >>> TC_SHIFT) != -parallelism); 1969 } 1970 1971 /** 1972 * Returns true if terminating or terminated. Used by ForkJoinWorkerThread. 1973 */ 1974 final boolean isAtLeastTerminating() { 1975 return (ctl & STOP_BIT) != 0L; 1976 } 1977 1978 /** 1979 * Returns {@code true} if this pool has been shut down. 1980 * 1981 * @return {@code true} if this pool has been shut down 1982 */ 1983 public boolean isShutdown() { 1984 return shutdown; 1985 } 1986 1987 /** 1988 * Blocks until all tasks have completed execution after a shutdown 1989 * request, or the timeout occurs, or the current thread is 1990 * interrupted, whichever happens first. 1991 * 1992 * @param timeout the maximum time to wait 1993 * @param unit the time unit of the timeout argument 1994 * @return {@code true} if this executor terminated and 1995 * {@code false} if the timeout elapsed before termination 1996 * @throws InterruptedException if interrupted while waiting 1997 */ 1998 public boolean awaitTermination(long timeout, TimeUnit unit) 1999 throws InterruptedException { 2000 long nanos = unit.toNanos(timeout); 2001 final ReentrantLock lock = this.submissionLock; 2002 lock.lock(); 2003 try { 2004 for (;;) { 2005 if (isTerminated()) 2006 return true; 2007 if (nanos <= 0) 2008 return false; 2009 nanos = termination.awaitNanos(nanos); 2010 } 2011 } finally { 2012 lock.unlock(); 2013 } 2014 } 2015 2016 /** 2017 * Interface for extending managed parallelism for tasks running 2018 * in {@link ForkJoinPool}s. 2019 * 2020 * <p>A {@code ManagedBlocker} provides two methods. Method 2021 * {@code isReleasable} must return {@code true} if blocking is 2022 * not necessary. Method {@code block} blocks the current thread 2023 * if necessary (perhaps internally invoking {@code isReleasable} 2024 * before actually blocking). These actions are performed by any 2025 * thread invoking {@link ForkJoinPool#managedBlock}. The 2026 * unusual methods in this API accommodate synchronizers that may, 2027 * but don't usually, block for long periods. Similarly, they 2028 * allow more efficient internal handling of cases in which 2029 * additional workers may be, but usually are not, needed to 2030 * ensure sufficient parallelism. Toward this end, 2031 * implementations of method {@code isReleasable} must be amenable 2032 * to repeated invocation. 2033 * 2034 * <p>For example, here is a ManagedBlocker based on a 2035 * ReentrantLock: 2036 * <pre> {@code 2037 * class ManagedLocker implements ManagedBlocker { 2038 * final ReentrantLock lock; 2039 * boolean hasLock = false; 2040 * ManagedLocker(ReentrantLock lock) { this.lock = lock; } 2041 * public boolean block() { 2042 * if (!hasLock) 2043 * lock.lock(); 2044 * return true; 2045 * } 2046 * public boolean isReleasable() { 2047 * return hasLock || (hasLock = lock.tryLock()); 2048 * } 2049 * }}</pre> 2050 * 2051 * <p>Here is a class that possibly blocks waiting for an 2052 * item on a given queue: 2053 * <pre> {@code 2054 * class QueueTaker<E> implements ManagedBlocker { 2055 * final BlockingQueue<E> queue; 2056 * volatile E item = null; 2057 * QueueTaker(BlockingQueue<E> q) { this.queue = q; } 2058 * public boolean block() throws InterruptedException { 2059 * if (item == null) 2060 * item = queue.take(); 2061 * return true; 2062 * } 2063 * public boolean isReleasable() { 2064 * return item != null || (item = queue.poll()) != null; 2065 * } 2066 * public E getItem() { // call after pool.managedBlock completes 2067 * return item; 2068 * } 2069 * }}</pre> 2070 */ 2071 public static interface ManagedBlocker { 2072 /** 2073 * Possibly blocks the current thread, for example waiting for 2074 * a lock or condition. 2075 * 2076 * @return {@code true} if no additional blocking is necessary 2077 * (i.e., if isReleasable would return true) 2078 * @throws InterruptedException if interrupted while waiting 2079 * (the method is not required to do so, but is allowed to) 2080 */ 2081 boolean block() throws InterruptedException; 2082 2083 /** 2084 * Returns {@code true} if blocking is unnecessary. 2085 */ 2086 boolean isReleasable(); 2087 } 2088 2089 /** 2090 * Blocks in accord with the given blocker. If the current thread 2091 * is a {@link ForkJoinWorkerThread}, this method possibly 2092 * arranges for a spare thread to be activated if necessary to 2093 * ensure sufficient parallelism while the current thread is blocked. 2094 * 2095 * <p>If the caller is not a {@link ForkJoinTask}, this method is 2096 * behaviorally equivalent to 2097 * <pre> {@code 2098 * while (!blocker.isReleasable()) 2099 * if (blocker.block()) 2100 * return; 2101 * }</pre> 2102 * 2103 * If the caller is a {@code ForkJoinTask}, then the pool may 2104 * first be expanded to ensure parallelism, and later adjusted. 2105 * 2106 * @param blocker the blocker 2107 * @throws InterruptedException if blocker.block did so 2108 */ 2109 public static void managedBlock(ManagedBlocker blocker) 2110 throws InterruptedException { 2111 Thread t = Thread.currentThread(); 2112 if (t instanceof ForkJoinWorkerThread) { 2113 ForkJoinWorkerThread w = (ForkJoinWorkerThread) t; 2114 w.pool.awaitBlocker(blocker); 2115 } 2116 else { 2117 do {} while (!blocker.isReleasable() && !blocker.block()); 2118 } 2119 } 2120 2121 // AbstractExecutorService overrides. These rely on undocumented 2122 // fact that ForkJoinTask.adapt returns ForkJoinTasks that also 2123 // implement RunnableFuture. 2124 2125 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { 2126 return (RunnableFuture<T>) ForkJoinTask.adapt(runnable, value); 2127 } 2128 2129 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { 2130 return (RunnableFuture<T>) ForkJoinTask.adapt(callable); 2131 } 2132 2133 // Unsafe mechanics 2134 private static final sun.misc.Unsafe UNSAFE; 2135 private static final long ctlOffset; 2136 private static final long stealCountOffset; 2137 private static final long blockedCountOffset; 2138 private static final long quiescerCountOffset; 2139 private static final long scanGuardOffset; 2140 private static final long nextWorkerNumberOffset; 2141 private static final long ABASE; 2142 private static final int ASHIFT; 2143 2144 static { 2145 poolNumberGenerator = new AtomicInteger(); 2146 workerSeedGenerator = new Random(); 2147 modifyThreadPermission = new RuntimePermission("modifyThread"); 2148 defaultForkJoinWorkerThreadFactory = 2149 new DefaultForkJoinWorkerThreadFactory(); 2150 int s; 2151 try { 2152 UNSAFE = sun.misc.Unsafe.getUnsafe(); 2153 Class k = ForkJoinPool.class; 2154 ctlOffset = UNSAFE.objectFieldOffset 2155 (k.getDeclaredField("ctl")); 2156 stealCountOffset = UNSAFE.objectFieldOffset 2157 (k.getDeclaredField("stealCount")); 2158 blockedCountOffset = UNSAFE.objectFieldOffset 2159 (k.getDeclaredField("blockedCount")); 2160 quiescerCountOffset = UNSAFE.objectFieldOffset 2161 (k.getDeclaredField("quiescerCount")); 2162 scanGuardOffset = UNSAFE.objectFieldOffset 2163 (k.getDeclaredField("scanGuard")); 2164 nextWorkerNumberOffset = UNSAFE.objectFieldOffset 2165 (k.getDeclaredField("nextWorkerNumber")); 2166 Class a = ForkJoinTask[].class; 2167 ABASE = UNSAFE.arrayBaseOffset(a); 2168 s = UNSAFE.arrayIndexScale(a); 2169 } catch (Exception e) { 2170 throw new Error(e); 2171 } 2172 if ((s & (s-1)) != 0) 2173 throw new Error("data type scale not a power of two"); 2174 ASHIFT = 31 - Integer.numberOfLeadingZeros(s); 2175 } 2176 2177 }