1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/licenses/publicdomain 34 */ 35 36 package java.util.concurrent; 37 38 import java.util.ArrayList; 39 import java.util.Arrays; 40 import java.util.Collection; 41 import java.util.Collections; 42 import java.util.List; 43 import java.util.concurrent.AbstractExecutorService; 44 import java.util.concurrent.Callable; 45 import java.util.concurrent.ExecutorService; 46 import java.util.concurrent.Future; 47 import java.util.concurrent.RejectedExecutionException; 48 import java.util.concurrent.RunnableFuture; 49 import java.util.concurrent.TimeUnit; 50 import java.util.concurrent.TimeoutException; 51 import java.util.concurrent.atomic.AtomicInteger; 52 import java.util.concurrent.locks.LockSupport; 53 import java.util.concurrent.locks.ReentrantLock; 54 55 /** 56 * An {@link ExecutorService} for running {@link ForkJoinTask}s. 57 * A {@code ForkJoinPool} provides the entry point for submissions 58 * from non-{@code ForkJoinTask} clients, as well as management and 59 * monitoring operations. 60 * 61 * <p>A {@code ForkJoinPool} differs from other kinds of {@link 62 * ExecutorService} mainly by virtue of employing 63 * <em>work-stealing</em>: all threads in the pool attempt to find and 64 * execute subtasks created by other active tasks (eventually blocking 65 * waiting for work if none exist). This enables efficient processing 66 * when most tasks spawn other subtasks (as do most {@code 67 * ForkJoinTask}s). When setting <em>asyncMode</em> to true in 68 * constructors, {@code ForkJoinPool}s may also be appropriate for use 69 * with event-style tasks that are never joined. 70 * 71 * <p>A {@code ForkJoinPool} is constructed with a given target 72 * parallelism level; by default, equal to the number of available 73 * processors. The pool attempts to maintain enough active (or 74 * available) threads by dynamically adding, suspending, or resuming 75 * internal worker threads, even if some tasks are stalled waiting to 76 * join others. However, no such adjustments are guaranteed in the 77 * face of blocked IO or other unmanaged synchronization. The nested 78 * {@link ManagedBlocker} interface enables extension of the kinds of 79 * synchronization accommodated. 80 * 81 * <p>In addition to execution and lifecycle control methods, this 82 * class provides status check methods (for example 83 * {@link #getStealCount}) that are intended to aid in developing, 84 * tuning, and monitoring fork/join applications. Also, method 85 * {@link #toString} returns indications of pool state in a 86 * convenient form for informal monitoring. 87 * 88 * <p> As is the case with other ExecutorServices, there are three 89 * main task execution methods summarized in the following 90 * table. These are designed to be used by clients not already engaged 91 * in fork/join computations in the current pool. The main forms of 92 * these methods accept instances of {@code ForkJoinTask}, but 93 * overloaded forms also allow mixed execution of plain {@code 94 * Runnable}- or {@code Callable}- based activities as well. However, 95 * tasks that are already executing in a pool should normally 96 * <em>NOT</em> use these pool execution methods, but instead use the 97 * within-computation forms listed in the table. 98 * 99 * <table BORDER CELLPADDING=3 CELLSPACING=1> 100 * <tr> 101 * <td></td> 102 * <td ALIGN=CENTER> <b>Call from non-fork/join clients</b></td> 103 * <td ALIGN=CENTER> <b>Call from within fork/join computations</b></td> 104 * </tr> 105 * <tr> 106 * <td> <b>Arrange async execution</td> 107 * <td> {@link #execute(ForkJoinTask)}</td> 108 * <td> {@link ForkJoinTask#fork}</td> 109 * </tr> 110 * <tr> 111 * <td> <b>Await and obtain result</td> 112 * <td> {@link #invoke(ForkJoinTask)}</td> 113 * <td> {@link ForkJoinTask#invoke}</td> 114 * </tr> 115 * <tr> 116 * <td> <b>Arrange exec and obtain Future</td> 117 * <td> {@link #submit(ForkJoinTask)}</td> 118 * <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td> 119 * </tr> 120 * </table> 121 * 122 * <p><b>Sample Usage.</b> Normally a single {@code ForkJoinPool} is 123 * used for all parallel task execution in a program or subsystem. 124 * Otherwise, use would not usually outweigh the construction and 125 * bookkeeping overhead of creating a large set of threads. For 126 * example, a common pool could be used for the {@code SortTasks} 127 * illustrated in {@link RecursiveAction}. Because {@code 128 * ForkJoinPool} uses threads in {@linkplain java.lang.Thread#isDaemon 129 * daemon} mode, there is typically no need to explicitly {@link 130 * #shutdown} such a pool upon program exit. 131 * 132 * <pre> 133 * static final ForkJoinPool mainPool = new ForkJoinPool(); 134 * ... 135 * public void sort(long[] array) { 136 * mainPool.invoke(new SortTask(array, 0, array.length)); 137 * } 138 * </pre> 139 * 140 * <p><b>Implementation notes</b>: This implementation restricts the 141 * maximum number of running threads to 32767. Attempts to create 142 * pools with greater than the maximum number result in 143 * {@code IllegalArgumentException}. 144 * 145 * <p>This implementation rejects submitted tasks (that is, by throwing 146 * {@link RejectedExecutionException}) only when the pool is shut down 147 * or internal resources have been exhausted. 148 * 149 * @since 1.7 150 * @author Doug Lea 151 */ 152 public class ForkJoinPool extends AbstractExecutorService { 153 154 /* 155 * Implementation Overview 156 * 157 * This class provides the central bookkeeping and control for a 158 * set of worker threads: Submissions from non-FJ threads enter 159 * into a submission queue. Workers take these tasks and typically 160 * split them into subtasks that may be stolen by other workers. 161 * The main work-stealing mechanics implemented in class 162 * ForkJoinWorkerThread give first priority to processing tasks 163 * from their own queues (LIFO or FIFO, depending on mode), then 164 * to randomized FIFO steals of tasks in other worker queues, and 165 * lastly to new submissions. These mechanics do not consider 166 * affinities, loads, cache localities, etc, so rarely provide the 167 * best possible performance on a given machine, but portably 168 * provide good throughput by averaging over these factors. 169 * (Further, even if we did try to use such information, we do not 170 * usually have a basis for exploiting it. For example, some sets 171 * of tasks profit from cache affinities, but others are harmed by 172 * cache pollution effects.) 173 * 174 * Beyond work-stealing support and essential bookkeeping, the 175 * main responsibility of this framework is to take actions when 176 * one worker is waiting to join a task stolen (or always held by) 177 * another. Because we are multiplexing many tasks on to a pool 178 * of workers, we can't just let them block (as in Thread.join). 179 * We also cannot just reassign the joiner's run-time stack with 180 * another and replace it later, which would be a form of 181 * "continuation", that even if possible is not necessarily a good 182 * idea. Given that the creation costs of most threads on most 183 * systems mainly surrounds setting up runtime stacks, thread 184 * creation and switching is usually not much more expensive than 185 * stack creation and switching, and is more flexible). Instead we 186 * combine two tactics: 187 * 188 * Helping: Arranging for the joiner to execute some task that it 189 * would be running if the steal had not occurred. Method 190 * ForkJoinWorkerThread.helpJoinTask tracks joining->stealing 191 * links to try to find such a task. 192 * 193 * Compensating: Unless there are already enough live threads, 194 * method helpMaintainParallelism() may create or 195 * re-activate a spare thread to compensate for blocked 196 * joiners until they unblock. 197 * 198 * It is impossible to keep exactly the target (parallelism) 199 * number of threads running at any given time. Determining 200 * existence of conservatively safe helping targets, the 201 * availability of already-created spares, and the apparent need 202 * to create new spares are all racy and require heuristic 203 * guidance, so we rely on multiple retries of each. Compensation 204 * occurs in slow-motion. It is triggered only upon timeouts of 205 * Object.wait used for joins. This reduces poor decisions that 206 * would otherwise be made when threads are waiting for others 207 * that are stalled because of unrelated activities such as 208 * garbage collection. 209 * 210 * The ManagedBlocker extension API can't use helping so relies 211 * only on compensation in method awaitBlocker. 212 * 213 * The main throughput advantages of work-stealing stem from 214 * decentralized control -- workers mostly steal tasks from each 215 * other. We do not want to negate this by creating bottlenecks 216 * implementing other management responsibilities. So we use a 217 * collection of techniques that avoid, reduce, or cope well with 218 * contention. These entail several instances of bit-packing into 219 * CASable fields to maintain only the minimally required 220 * atomicity. To enable such packing, we restrict maximum 221 * parallelism to (1<<15)-1 (enabling twice this (to accommodate 222 * unbalanced increments and decrements) to fit into a 16 bit 223 * field, which is far in excess of normal operating range. Even 224 * though updates to some of these bookkeeping fields do sometimes 225 * contend with each other, they don't normally cache-contend with 226 * updates to others enough to warrant memory padding or 227 * isolation. So they are all held as fields of ForkJoinPool 228 * objects. The main capabilities are as follows: 229 * 230 * 1. Creating and removing workers. Workers are recorded in the 231 * "workers" array. This is an array as opposed to some other data 232 * structure to support index-based random steals by workers. 233 * Updates to the array recording new workers and unrecording 234 * terminated ones are protected from each other by a lock 235 * (workerLock) but the array is otherwise concurrently readable, 236 * and accessed directly by workers. To simplify index-based 237 * operations, the array size is always a power of two, and all 238 * readers must tolerate null slots. Currently, all worker thread 239 * creation is on-demand, triggered by task submissions, 240 * replacement of terminated workers, and/or compensation for 241 * blocked workers. However, all other support code is set up to 242 * work with other policies. 243 * 244 * To ensure that we do not hold on to worker references that 245 * would prevent GC, ALL accesses to workers are via indices into 246 * the workers array (which is one source of some of the unusual 247 * code constructions here). In essence, the workers array serves 248 * as a WeakReference mechanism. Thus for example the event queue 249 * stores worker indices, not worker references. Access to the 250 * workers in associated methods (for example releaseEventWaiters) 251 * must both index-check and null-check the IDs. All such accesses 252 * ignore bad IDs by returning out early from what they are doing, 253 * since this can only be associated with shutdown, in which case 254 * it is OK to give up. On termination, we just clobber these 255 * data structures without trying to use them. 256 * 257 * 2. Bookkeeping for dynamically adding and removing workers. We 258 * aim to approximately maintain the given level of parallelism. 259 * When some workers are known to be blocked (on joins or via 260 * ManagedBlocker), we may create or resume others to take their 261 * place until they unblock (see below). Implementing this 262 * requires counts of the number of "running" threads (i.e., those 263 * that are neither blocked nor artificially suspended) as well as 264 * the total number. These two values are packed into one field, 265 * "workerCounts" because we need accurate snapshots when deciding 266 * to create, resume or suspend. Note however that the 267 * correspondence of these counts to reality is not guaranteed. In 268 * particular updates for unblocked threads may lag until they 269 * actually wake up. 270 * 271 * 3. Maintaining global run state. The run state of the pool 272 * consists of a runLevel (SHUTDOWN, TERMINATING, etc) similar to 273 * those in other Executor implementations, as well as a count of 274 * "active" workers -- those that are, or soon will be, or 275 * recently were executing tasks. The runLevel and active count 276 * are packed together in order to correctly trigger shutdown and 277 * termination. Without care, active counts can be subject to very 278 * high contention. We substantially reduce this contention by 279 * relaxing update rules. A worker must claim active status 280 * prospectively, by activating if it sees that a submitted or 281 * stealable task exists (it may find after activating that the 282 * task no longer exists). It stays active while processing this 283 * task (if it exists) and any other local subtasks it produces, 284 * until it cannot find any other tasks. It then tries 285 * inactivating (see method preStep), but upon update contention 286 * instead scans for more tasks, later retrying inactivation if it 287 * doesn't find any. 288 * 289 * 4. Managing idle workers waiting for tasks. We cannot let 290 * workers spin indefinitely scanning for tasks when none are 291 * available. On the other hand, we must quickly prod them into 292 * action when new tasks are submitted or generated. We 293 * park/unpark these idle workers using an event-count scheme. 294 * Field eventCount is incremented upon events that may enable 295 * workers that previously could not find a task to now find one: 296 * Submission of a new task to the pool, or another worker pushing 297 * a task onto a previously empty queue. (We also use this 298 * mechanism for configuration and termination actions that 299 * require wakeups of idle workers). Each worker maintains its 300 * last known event count, and blocks when a scan for work did not 301 * find a task AND its lastEventCount matches the current 302 * eventCount. Waiting idle workers are recorded in a variant of 303 * Treiber stack headed by field eventWaiters which, when nonzero, 304 * encodes the thread index and count awaited for by the worker 305 * thread most recently calling eventSync. This thread in turn has 306 * a record (field nextEventWaiter) for the next waiting worker. 307 * In addition to allowing simpler decisions about need for 308 * wakeup, the event count bits in eventWaiters serve the role of 309 * tags to avoid ABA errors in Treiber stacks. Upon any wakeup, 310 * released threads also try to release at most two others. The 311 * net effect is a tree-like diffusion of signals, where released 312 * threads (and possibly others) help with unparks. To further 313 * reduce contention effects a bit, failed CASes to increment 314 * field eventCount are tolerated without retries in signalWork. 315 * Conceptually they are merged into the same event, which is OK 316 * when their only purpose is to enable workers to scan for work. 317 * 318 * 5. Managing suspension of extra workers. When a worker notices 319 * (usually upon timeout of a wait()) that there are too few 320 * running threads, we may create a new thread to maintain 321 * parallelism level, or at least avoid starvation. Usually, extra 322 * threads are needed for only very short periods, yet join 323 * dependencies are such that we sometimes need them in 324 * bursts. Rather than create new threads each time this happens, 325 * we suspend no-longer-needed extra ones as "spares". For most 326 * purposes, we don't distinguish "extra" spare threads from 327 * normal "core" threads: On each call to preStep (the only point 328 * at which we can do this) a worker checks to see if there are 329 * now too many running workers, and if so, suspends itself. 330 * Method helpMaintainParallelism looks for suspended threads to 331 * resume before considering creating a new replacement. The 332 * spares themselves are encoded on another variant of a Treiber 333 * Stack, headed at field "spareWaiters". Note that the use of 334 * spares is intrinsically racy. One thread may become a spare at 335 * about the same time as another is needlessly being created. We 336 * counteract this and related slop in part by requiring resumed 337 * spares to immediately recheck (in preStep) to see whether they 338 * should re-suspend. 339 * 340 * 6. Killing off unneeded workers. A timeout mechanism is used to 341 * shed unused workers: The oldest (first) event queue waiter uses 342 * a timed rather than hard wait. When this wait times out without 343 * a normal wakeup, it tries to shutdown any one (for convenience 344 * the newest) other spare or event waiter via 345 * tryShutdownUnusedWorker. This eventually reduces the number of 346 * worker threads to a minimum of one after a long enough period 347 * without use. 348 * 349 * 7. Deciding when to create new workers. The main dynamic 350 * control in this class is deciding when to create extra threads 351 * in method helpMaintainParallelism. We would like to keep 352 * exactly #parallelism threads running, which is an impossible 353 * task. We always need to create one when the number of running 354 * threads would become zero and all workers are busy. Beyond 355 * this, we must rely on heuristics that work well in the 356 * presence of transient phenomena such as GC stalls, dynamic 357 * compilation, and wake-up lags. These transients are extremely 358 * common -- we are normally trying to fully saturate the CPUs on 359 * a machine, so almost any activity other than running tasks 360 * impedes accuracy. Our main defense is to allow parallelism to 361 * lapse for a while during joins, and use a timeout to see if, 362 * after the resulting settling, there is still a need for 363 * additional workers. This also better copes with the fact that 364 * some of the methods in this class tend to never become compiled 365 * (but are interpreted), so some components of the entire set of 366 * controls might execute 100 times faster than others. And 367 * similarly for cases where the apparent lack of work is just due 368 * to GC stalls and other transient system activity. 369 * 370 * Beware that there is a lot of representation-level coupling 371 * among classes ForkJoinPool, ForkJoinWorkerThread, and 372 * ForkJoinTask. For example, direct access to "workers" array by 373 * workers, and direct access to ForkJoinTask.status by both 374 * ForkJoinPool and ForkJoinWorkerThread. There is little point 375 * trying to reduce this, since any associated future changes in 376 * representations will need to be accompanied by algorithmic 377 * changes anyway. 378 * 379 * Style notes: There are lots of inline assignments (of form 380 * "while ((local = field) != 0)") which are usually the simplest 381 * way to ensure the required read orderings (which are sometimes 382 * critical). Also several occurrences of the unusual "do {} 383 * while (!cas...)" which is the simplest way to force an update of 384 * a CAS'ed variable. There are also other coding oddities that 385 * help some methods perform reasonably even when interpreted (not 386 * compiled), at the expense of some messy constructions that 387 * reduce byte code counts. 388 * 389 * The order of declarations in this file is: (1) statics (2) 390 * fields (along with constants used when unpacking some of them) 391 * (3) internal control methods (4) callbacks and other support 392 * for ForkJoinTask and ForkJoinWorkerThread classes, (5) exported 393 * methods (plus a few little helpers). 394 */ 395 396 /** 397 * Factory for creating new {@link ForkJoinWorkerThread}s. 398 * A {@code ForkJoinWorkerThreadFactory} must be defined and used 399 * for {@code ForkJoinWorkerThread} subclasses that extend base 400 * functionality or initialize threads with different contexts. 401 */ 402 public static interface ForkJoinWorkerThreadFactory { 403 /** 404 * Returns a new worker thread operating in the given pool. 405 * 406 * @param pool the pool this thread works in 407 * @throws NullPointerException if the pool is null 408 */ 409 public ForkJoinWorkerThread newThread(ForkJoinPool pool); 410 } 411 412 /** 413 * Default ForkJoinWorkerThreadFactory implementation; creates a 414 * new ForkJoinWorkerThread. 415 */ 416 static class DefaultForkJoinWorkerThreadFactory 417 implements ForkJoinWorkerThreadFactory { 418 public ForkJoinWorkerThread newThread(ForkJoinPool pool) { 419 return new ForkJoinWorkerThread(pool); 420 } 421 } 422 423 /** 424 * Creates a new ForkJoinWorkerThread. This factory is used unless 425 * overridden in ForkJoinPool constructors. 426 */ 427 public static final ForkJoinWorkerThreadFactory 428 defaultForkJoinWorkerThreadFactory = 429 new DefaultForkJoinWorkerThreadFactory(); 430 431 /** 432 * Permission required for callers of methods that may start or 433 * kill threads. 434 */ 435 private static final RuntimePermission modifyThreadPermission = 436 new RuntimePermission("modifyThread"); 437 438 /** 439 * If there is a security manager, makes sure caller has 440 * permission to modify threads. 441 */ 442 private static void checkPermission() { 443 SecurityManager security = System.getSecurityManager(); 444 if (security != null) 445 security.checkPermission(modifyThreadPermission); 446 } 447 448 /** 449 * Generator for assigning sequence numbers as pool names. 450 */ 451 private static final AtomicInteger poolNumberGenerator = 452 new AtomicInteger(); 453 454 /** 455 * The time to block in a join (see awaitJoin) before checking if 456 * a new worker should be (re)started to maintain parallelism 457 * level. The value should be short enough to maintain global 458 * responsiveness and progress but long enough to avoid 459 * counterproductive firings during GC stalls or unrelated system 460 * activity, and to not bog down systems with continual re-firings 461 * on GCs or legitimately long waits. 462 */ 463 private static final long JOIN_TIMEOUT_MILLIS = 250L; // 4 per second 464 465 /** 466 * The wakeup interval (in nanoseconds) for the oldest worker 467 * waiting for an event to invoke tryShutdownUnusedWorker to 468 * shrink the number of workers. The exact value does not matter 469 * too much. It must be short enough to release resources during 470 * sustained periods of idleness, but not so short that threads 471 * are continually re-created. 472 */ 473 private static final long SHRINK_RATE_NANOS = 474 30L * 1000L * 1000L * 1000L; // 2 per minute 475 476 /** 477 * Absolute bound for parallelism level. Twice this number plus 478 * one (i.e., 0xfff) must fit into a 16bit field to enable 479 * word-packing for some counts and indices. 480 */ 481 private static final int MAX_WORKERS = 0x7fff; 482 483 /** 484 * Array holding all worker threads in the pool. Array size must 485 * be a power of two. Updates and replacements are protected by 486 * workerLock, but the array is always kept in a consistent enough 487 * state to be randomly accessed without locking by workers 488 * performing work-stealing, as well as other traversal-based 489 * methods in this class. All readers must tolerate that some 490 * array slots may be null. 491 */ 492 volatile ForkJoinWorkerThread[] workers; 493 494 /** 495 * Queue for external submissions. 496 */ 497 private final LinkedTransferQueue<ForkJoinTask<?>> submissionQueue; 498 499 /** 500 * Lock protecting updates to workers array. 501 */ 502 private final ReentrantLock workerLock; 503 504 /** 505 * Latch released upon termination. 506 */ 507 private final Phaser termination; 508 509 /** 510 * Creation factory for worker threads. 511 */ 512 private final ForkJoinWorkerThreadFactory factory; 513 514 /** 515 * Sum of per-thread steal counts, updated only when threads are 516 * idle or terminating. 517 */ 518 private volatile long stealCount; 519 520 /** 521 * Encoded record of top of Treiber stack of threads waiting for 522 * events. The top 32 bits contain the count being waited for. The 523 * bottom 16 bits contains one plus the pool index of waiting 524 * worker thread. (Bits 16-31 are unused.) 525 */ 526 private volatile long eventWaiters; 527 528 private static final int EVENT_COUNT_SHIFT = 32; 529 private static final long WAITER_ID_MASK = (1L << 16) - 1L; 530 531 /** 532 * A counter for events that may wake up worker threads: 533 * - Submission of a new task to the pool 534 * - A worker pushing a task on an empty queue 535 * - termination 536 */ 537 private volatile int eventCount; 538 539 /** 540 * Encoded record of top of Treiber stack of spare threads waiting 541 * for resumption. The top 16 bits contain an arbitrary count to 542 * avoid ABA effects. The bottom 16bits contains one plus the pool 543 * index of waiting worker thread. 544 */ 545 private volatile int spareWaiters; 546 547 private static final int SPARE_COUNT_SHIFT = 16; 548 private static final int SPARE_ID_MASK = (1 << 16) - 1; 549 550 /** 551 * Lifecycle control. The low word contains the number of workers 552 * that are (probably) executing tasks. This value is atomically 553 * incremented before a worker gets a task to run, and decremented 554 * when a worker has no tasks and cannot find any. Bits 16-18 555 * contain runLevel value. When all are zero, the pool is 556 * running. Level transitions are monotonic (running -> shutdown 557 * -> terminating -> terminated) so each transition adds a bit. 558 * These are bundled together to ensure consistent read for 559 * termination checks (i.e., that runLevel is at least SHUTDOWN 560 * and active threads is zero). 561 * 562 * Notes: Most direct CASes are dependent on these bitfield 563 * positions. Also, this field is non-private to enable direct 564 * performance-sensitive CASes in ForkJoinWorkerThread. 565 */ 566 volatile int runState; 567 568 // Note: The order among run level values matters. 569 private static final int RUNLEVEL_SHIFT = 16; 570 private static final int SHUTDOWN = 1 << RUNLEVEL_SHIFT; 571 private static final int TERMINATING = 1 << (RUNLEVEL_SHIFT + 1); 572 private static final int TERMINATED = 1 << (RUNLEVEL_SHIFT + 2); 573 private static final int ACTIVE_COUNT_MASK = (1 << RUNLEVEL_SHIFT) - 1; 574 575 /** 576 * Holds number of total (i.e., created and not yet terminated) 577 * and running (i.e., not blocked on joins or other managed sync) 578 * threads, packed together to ensure consistent snapshot when 579 * making decisions about creating and suspending spare 580 * threads. Updated only by CAS. Note that adding a new worker 581 * requires incrementing both counts, since workers start off in 582 * running state. 583 */ 584 private volatile int workerCounts; 585 586 private static final int TOTAL_COUNT_SHIFT = 16; 587 private static final int RUNNING_COUNT_MASK = (1 << TOTAL_COUNT_SHIFT) - 1; 588 private static final int ONE_RUNNING = 1; 589 private static final int ONE_TOTAL = 1 << TOTAL_COUNT_SHIFT; 590 591 /** 592 * The target parallelism level. 593 * Accessed directly by ForkJoinWorkerThreads. 594 */ 595 final int parallelism; 596 597 /** 598 * True if use local fifo, not default lifo, for local polling 599 * Read by, and replicated by ForkJoinWorkerThreads 600 */ 601 final boolean locallyFifo; 602 603 /** 604 * The uncaught exception handler used when any worker abruptly 605 * terminates. 606 */ 607 private final Thread.UncaughtExceptionHandler ueh; 608 609 /** 610 * Pool number, just for assigning useful names to worker threads 611 */ 612 private final int poolNumber; 613 614 // Utilities for CASing fields. Note that most of these 615 // are usually manually inlined by callers 616 617 /** 618 * Increments running count part of workerCounts 619 */ 620 final void incrementRunningCount() { 621 int c; 622 do {} while (!UNSAFE.compareAndSwapInt(this, workerCountsOffset, 623 c = workerCounts, 624 c + ONE_RUNNING)); 625 } 626 627 /** 628 * Tries to decrement running count unless already zero 629 */ 630 final boolean tryDecrementRunningCount() { 631 int wc = workerCounts; 632 if ((wc & RUNNING_COUNT_MASK) == 0) 633 return false; 634 return UNSAFE.compareAndSwapInt(this, workerCountsOffset, 635 wc, wc - ONE_RUNNING); 636 } 637 638 /** 639 * Forces decrement of encoded workerCounts, awaiting nonzero if 640 * (rarely) necessary when other count updates lag. 641 * 642 * @param dr -- either zero or ONE_RUNNING 643 * @param dt -- either zero or ONE_TOTAL 644 */ 645 private void decrementWorkerCounts(int dr, int dt) { 646 for (;;) { 647 int wc = workerCounts; 648 if ((wc & RUNNING_COUNT_MASK) - dr < 0 || 649 (wc >>> TOTAL_COUNT_SHIFT) - dt < 0) { 650 if ((runState & TERMINATED) != 0) 651 return; // lagging termination on a backout 652 Thread.yield(); 653 } 654 if (UNSAFE.compareAndSwapInt(this, workerCountsOffset, 655 wc, wc - (dr + dt))) 656 return; 657 } 658 } 659 660 /** 661 * Tries decrementing active count; fails on contention. 662 * Called when workers cannot find tasks to run. 663 */ 664 final boolean tryDecrementActiveCount() { 665 int c; 666 return UNSAFE.compareAndSwapInt(this, runStateOffset, 667 c = runState, c - 1); 668 } 669 670 /** 671 * Advances to at least the given level. Returns true if not 672 * already in at least the given level. 673 */ 674 private boolean advanceRunLevel(int level) { 675 for (;;) { 676 int s = runState; 677 if ((s & level) != 0) 678 return false; 679 if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, s | level)) 680 return true; 681 } 682 } 683 684 // workers array maintenance 685 686 /** 687 * Records and returns a workers array index for new worker. 688 */ 689 private int recordWorker(ForkJoinWorkerThread w) { 690 // Try using slot totalCount-1. If not available, scan and/or resize 691 int k = (workerCounts >>> TOTAL_COUNT_SHIFT) - 1; 692 final ReentrantLock lock = this.workerLock; 693 lock.lock(); 694 try { 695 ForkJoinWorkerThread[] ws = workers; 696 int n = ws.length; 697 if (k < 0 || k >= n || ws[k] != null) { 698 for (k = 0; k < n && ws[k] != null; ++k) 699 ; 700 if (k == n) 701 ws = Arrays.copyOf(ws, n << 1); 702 } 703 ws[k] = w; 704 workers = ws; // volatile array write ensures slot visibility 705 } finally { 706 lock.unlock(); 707 } 708 return k; 709 } 710 711 /** 712 * Nulls out record of worker in workers array. 713 */ 714 private void forgetWorker(ForkJoinWorkerThread w) { 715 int idx = w.poolIndex; 716 // Locking helps method recordWorker avoid unnecessary expansion 717 final ReentrantLock lock = this.workerLock; 718 lock.lock(); 719 try { 720 ForkJoinWorkerThread[] ws = workers; 721 if (idx >= 0 && idx < ws.length && ws[idx] == w) // verify 722 ws[idx] = null; 723 } finally { 724 lock.unlock(); 725 } 726 } 727 728 /** 729 * Final callback from terminating worker. Removes record of 730 * worker from array, and adjusts counts. If pool is shutting 731 * down, tries to complete termination. 732 * 733 * @param w the worker 734 */ 735 final void workerTerminated(ForkJoinWorkerThread w) { 736 forgetWorker(w); 737 decrementWorkerCounts(w.isTrimmed()? 0 : ONE_RUNNING, ONE_TOTAL); 738 while (w.stealCount != 0) // collect final count 739 tryAccumulateStealCount(w); 740 tryTerminate(false); 741 } 742 743 // Waiting for and signalling events 744 745 /** 746 * Releases workers blocked on a count not equal to current count. 747 * Normally called after precheck that eventWaiters isn't zero to 748 * avoid wasted array checks. Gives up upon a change in count or 749 * upon releasing two workers, letting others take over. 750 */ 751 private void releaseEventWaiters() { 752 ForkJoinWorkerThread[] ws = workers; 753 int n = ws.length; 754 long h = eventWaiters; 755 int ec = eventCount; 756 boolean releasedOne = false; 757 ForkJoinWorkerThread w; int id; 758 while ((id = ((int)(h & WAITER_ID_MASK)) - 1) >= 0 && 759 (int)(h >>> EVENT_COUNT_SHIFT) != ec && 760 id < n && (w = ws[id]) != null) { 761 if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset, 762 h, w.nextWaiter)) { 763 LockSupport.unpark(w); 764 if (releasedOne) // exit on second release 765 break; 766 releasedOne = true; 767 } 768 if (eventCount != ec) 769 break; 770 h = eventWaiters; 771 } 772 } 773 774 /** 775 * Tries to advance eventCount and releases waiters. Called only 776 * from workers. 777 */ 778 final void signalWork() { 779 int c; // try to increment event count -- CAS failure OK 780 UNSAFE.compareAndSwapInt(this, eventCountOffset, c = eventCount, c+1); 781 if (eventWaiters != 0L) 782 releaseEventWaiters(); 783 } 784 785 /** 786 * Adds the given worker to event queue and blocks until 787 * terminating or event count advances from the given value 788 * 789 * @param w the calling worker thread 790 * @param ec the count 791 */ 792 private void eventSync(ForkJoinWorkerThread w, int ec) { 793 long nh = (((long)ec) << EVENT_COUNT_SHIFT) | ((long)(w.poolIndex+1)); 794 long h; 795 while ((runState < SHUTDOWN || !tryTerminate(false)) && 796 (((int)((h = eventWaiters) & WAITER_ID_MASK)) == 0 || 797 (int)(h >>> EVENT_COUNT_SHIFT) == ec) && 798 eventCount == ec) { 799 if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset, 800 w.nextWaiter = h, nh)) { 801 awaitEvent(w, ec); 802 break; 803 } 804 } 805 } 806 807 /** 808 * Blocks the given worker (that has already been entered as an 809 * event waiter) until terminating or event count advances from 810 * the given value. The oldest (first) waiter uses a timed wait to 811 * occasionally one-by-one shrink the number of workers (to a 812 * minimum of one) if the pool has not been used for extended 813 * periods. 814 * 815 * @param w the calling worker thread 816 * @param ec the count 817 */ 818 private void awaitEvent(ForkJoinWorkerThread w, int ec) { 819 while (eventCount == ec) { 820 if (tryAccumulateStealCount(w)) { // transfer while idle 821 boolean untimed = (w.nextWaiter != 0L || 822 (workerCounts & RUNNING_COUNT_MASK) <= 1); 823 long startTime = untimed? 0 : System.nanoTime(); 824 Thread.interrupted(); // clear/ignore interrupt 825 if (eventCount != ec || w.isTerminating()) 826 break; // recheck after clear 827 if (untimed) 828 LockSupport.park(w); 829 else { 830 LockSupport.parkNanos(w, SHRINK_RATE_NANOS); 831 if (eventCount != ec || w.isTerminating()) 832 break; 833 if (System.nanoTime() - startTime >= SHRINK_RATE_NANOS) 834 tryShutdownUnusedWorker(ec); 835 } 836 } 837 } 838 } 839 840 // Maintaining parallelism 841 842 /** 843 * Pushes worker onto the spare stack. 844 */ 845 final void pushSpare(ForkJoinWorkerThread w) { 846 int ns = (++w.spareCount << SPARE_COUNT_SHIFT) | (w.poolIndex + 1); 847 do {} while (!UNSAFE.compareAndSwapInt(this, spareWaitersOffset, 848 w.nextSpare = spareWaiters,ns)); 849 } 850 851 /** 852 * Tries (once) to resume a spare if the number of running 853 * threads is less than target. 854 */ 855 private void tryResumeSpare() { 856 int sw, id; 857 ForkJoinWorkerThread[] ws = workers; 858 int n = ws.length; 859 ForkJoinWorkerThread w; 860 if ((sw = spareWaiters) != 0 && 861 (id = (sw & SPARE_ID_MASK) - 1) >= 0 && 862 id < n && (w = ws[id]) != null && 863 (workerCounts & RUNNING_COUNT_MASK) < parallelism && 864 spareWaiters == sw && 865 UNSAFE.compareAndSwapInt(this, spareWaitersOffset, 866 sw, w.nextSpare)) { 867 int c; // increment running count before resume 868 do {} while (!UNSAFE.compareAndSwapInt 869 (this, workerCountsOffset, 870 c = workerCounts, c + ONE_RUNNING)); 871 if (w.tryUnsuspend()) 872 LockSupport.unpark(w); 873 else // back out if w was shutdown 874 decrementWorkerCounts(ONE_RUNNING, 0); 875 } 876 } 877 878 /** 879 * Tries to increase the number of running workers if below target 880 * parallelism: If a spare exists tries to resume it via 881 * tryResumeSpare. Otherwise, if not enough total workers or all 882 * existing workers are busy, adds a new worker. In all cases also 883 * helps wake up releasable workers waiting for work. 884 */ 885 private void helpMaintainParallelism() { 886 int pc = parallelism; 887 int wc, rs, tc; 888 while (((wc = workerCounts) & RUNNING_COUNT_MASK) < pc && 889 (rs = runState) < TERMINATING) { 890 if (spareWaiters != 0) 891 tryResumeSpare(); 892 else if ((tc = wc >>> TOTAL_COUNT_SHIFT) >= MAX_WORKERS || 893 (tc >= pc && (rs & ACTIVE_COUNT_MASK) != tc)) 894 break; // enough total 895 else if (runState == rs && workerCounts == wc && 896 UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc, 897 wc + (ONE_RUNNING|ONE_TOTAL))) { 898 ForkJoinWorkerThread w = null; 899 Throwable fail = null; 900 try { 901 w = factory.newThread(this); 902 } catch (Throwable ex) { 903 fail = ex; 904 } 905 if (w == null) { // null or exceptional factory return 906 decrementWorkerCounts(ONE_RUNNING, ONE_TOTAL); 907 tryTerminate(false); // handle failure during shutdown 908 // If originating from an external caller, 909 // propagate exception, else ignore 910 if (fail != null && runState < TERMINATING && 911 !(Thread.currentThread() instanceof 912 ForkJoinWorkerThread)) 913 UNSAFE.throwException(fail); 914 break; 915 } 916 w.start(recordWorker(w), ueh); 917 if ((workerCounts >>> TOTAL_COUNT_SHIFT) >= pc) { 918 int c; // advance event count 919 UNSAFE.compareAndSwapInt(this, eventCountOffset, 920 c = eventCount, c+1); 921 break; // add at most one unless total below target 922 } 923 } 924 } 925 if (eventWaiters != 0L) 926 releaseEventWaiters(); 927 } 928 929 /** 930 * Callback from the oldest waiter in awaitEvent waking up after a 931 * period of non-use. If all workers are idle, tries (once) to 932 * shutdown an event waiter or a spare, if one exists. Note that 933 * we don't need CAS or locks here because the method is called 934 * only from one thread occasionally waking (and even misfires are 935 * OK). Note that until the shutdown worker fully terminates, 936 * workerCounts will overestimate total count, which is tolerable. 937 * 938 * @param ec the event count waited on by caller (to abort 939 * attempt if count has since changed). 940 */ 941 private void tryShutdownUnusedWorker(int ec) { 942 if (runState == 0 && eventCount == ec) { // only trigger if all idle 943 ForkJoinWorkerThread[] ws = workers; 944 int n = ws.length; 945 ForkJoinWorkerThread w = null; 946 boolean shutdown = false; 947 int sw; 948 long h; 949 if ((sw = spareWaiters) != 0) { // prefer killing spares 950 int id = (sw & SPARE_ID_MASK) - 1; 951 if (id >= 0 && id < n && (w = ws[id]) != null && 952 UNSAFE.compareAndSwapInt(this, spareWaitersOffset, 953 sw, w.nextSpare)) 954 shutdown = true; 955 } 956 else if ((h = eventWaiters) != 0L) { 957 long nh; 958 int id = ((int)(h & WAITER_ID_MASK)) - 1; 959 if (id >= 0 && id < n && (w = ws[id]) != null && 960 (nh = w.nextWaiter) != 0L && // keep at least one worker 961 UNSAFE.compareAndSwapLong(this, eventWaitersOffset, h, nh)) 962 shutdown = true; 963 } 964 if (w != null && shutdown) { 965 w.shutdown(); 966 LockSupport.unpark(w); 967 } 968 } 969 releaseEventWaiters(); // in case of interference 970 } 971 972 /** 973 * Callback from workers invoked upon each top-level action (i.e., 974 * stealing a task or taking a submission and running it). 975 * Performs one or more of the following: 976 * 977 * 1. If the worker is active and either did not run a task 978 * or there are too many workers, try to set its active status 979 * to inactive and update activeCount. On contention, we may 980 * try again in this or a subsequent call. 981 * 982 * 2. If not enough total workers, help create some. 983 * 984 * 3. If there are too many running workers, suspend this worker 985 * (first forcing inactive if necessary). If it is not needed, 986 * it may be shutdown while suspended (via 987 * tryShutdownUnusedWorker). Otherwise, upon resume it 988 * rechecks running thread count and need for event sync. 989 * 990 * 4. If worker did not run a task, await the next task event via 991 * eventSync if necessary (first forcing inactivation), upon 992 * which the worker may be shutdown via 993 * tryShutdownUnusedWorker. Otherwise, help release any 994 * existing event waiters that are now releasable, 995 * 996 * @param w the worker 997 * @param ran true if worker ran a task since last call to this method 998 */ 999 final void preStep(ForkJoinWorkerThread w, boolean ran) { 1000 int wec = w.lastEventCount; 1001 boolean active = w.active; 1002 boolean inactivate = false; 1003 int pc = parallelism; 1004 while (w.runState == 0) { 1005 int rs = runState; 1006 if (rs >= TERMINATING) { // propagate shutdown 1007 w.shutdown(); 1008 break; 1009 } 1010 if ((inactivate || (active && (rs & ACTIVE_COUNT_MASK) >= pc)) && 1011 UNSAFE.compareAndSwapInt(this, runStateOffset, rs, rs - 1)) 1012 inactivate = active = w.active = false; 1013 int wc = workerCounts; 1014 if ((wc & RUNNING_COUNT_MASK) > pc) { 1015 if (!(inactivate |= active) && // must inactivate to suspend 1016 workerCounts == wc && // try to suspend as spare 1017 UNSAFE.compareAndSwapInt(this, workerCountsOffset, 1018 wc, wc - ONE_RUNNING)) 1019 w.suspendAsSpare(); 1020 } 1021 else if ((wc >>> TOTAL_COUNT_SHIFT) < pc) 1022 helpMaintainParallelism(); // not enough workers 1023 else if (!ran) { 1024 long h = eventWaiters; 1025 int ec = eventCount; 1026 if (h != 0L && (int)(h >>> EVENT_COUNT_SHIFT) != ec) 1027 releaseEventWaiters(); // release others before waiting 1028 else if (ec != wec) { 1029 w.lastEventCount = ec; // no need to wait 1030 break; 1031 } 1032 else if (!(inactivate |= active)) 1033 eventSync(w, wec); // must inactivate before sync 1034 } 1035 else 1036 break; 1037 } 1038 } 1039 1040 /** 1041 * Helps and/or blocks awaiting join of the given task. 1042 * See above for explanation. 1043 * 1044 * @param joinMe the task to join 1045 * @param worker the current worker thread 1046 */ 1047 final void awaitJoin(ForkJoinTask<?> joinMe, ForkJoinWorkerThread worker) { 1048 int retries = 2 + (parallelism >> 2); // #helpJoins before blocking 1049 while (joinMe.status >= 0) { 1050 int wc; 1051 worker.helpJoinTask(joinMe); 1052 if (joinMe.status < 0) 1053 break; 1054 else if (retries > 0) 1055 --retries; 1056 else if (((wc = workerCounts) & RUNNING_COUNT_MASK) != 0 && 1057 UNSAFE.compareAndSwapInt(this, workerCountsOffset, 1058 wc, wc - ONE_RUNNING)) { 1059 int stat, c; long h; 1060 while ((stat = joinMe.status) >= 0 && 1061 (h = eventWaiters) != 0L && // help release others 1062 (int)(h >>> EVENT_COUNT_SHIFT) != eventCount) 1063 releaseEventWaiters(); 1064 if (stat >= 0 && 1065 ((workerCounts & RUNNING_COUNT_MASK) == 0 || 1066 (stat = 1067 joinMe.internalAwaitDone(JOIN_TIMEOUT_MILLIS)) >= 0)) 1068 helpMaintainParallelism(); // timeout or no running workers 1069 do {} while (!UNSAFE.compareAndSwapInt 1070 (this, workerCountsOffset, 1071 c = workerCounts, c + ONE_RUNNING)); 1072 if (stat < 0) 1073 break; // else restart 1074 } 1075 } 1076 } 1077 1078 /** 1079 * Same idea as awaitJoin, but no helping, retries, or timeouts. 1080 */ 1081 final void awaitBlocker(ManagedBlocker blocker) 1082 throws InterruptedException { 1083 while (!blocker.isReleasable()) { 1084 int wc = workerCounts; 1085 if ((wc & RUNNING_COUNT_MASK) != 0 && 1086 UNSAFE.compareAndSwapInt(this, workerCountsOffset, 1087 wc, wc - ONE_RUNNING)) { 1088 try { 1089 while (!blocker.isReleasable()) { 1090 long h = eventWaiters; 1091 if (h != 0L && 1092 (int)(h >>> EVENT_COUNT_SHIFT) != eventCount) 1093 releaseEventWaiters(); 1094 else if ((workerCounts & RUNNING_COUNT_MASK) == 0 && 1095 runState < TERMINATING) 1096 helpMaintainParallelism(); 1097 else if (blocker.block()) 1098 break; 1099 } 1100 } finally { 1101 int c; 1102 do {} while (!UNSAFE.compareAndSwapInt 1103 (this, workerCountsOffset, 1104 c = workerCounts, c + ONE_RUNNING)); 1105 } 1106 break; 1107 } 1108 } 1109 } 1110 1111 /** 1112 * Possibly initiates and/or completes termination. 1113 * 1114 * @param now if true, unconditionally terminate, else only 1115 * if shutdown and empty queue and no active workers 1116 * @return true if now terminating or terminated 1117 */ 1118 private boolean tryTerminate(boolean now) { 1119 if (now) 1120 advanceRunLevel(SHUTDOWN); // ensure at least SHUTDOWN 1121 else if (runState < SHUTDOWN || 1122 !submissionQueue.isEmpty() || 1123 (runState & ACTIVE_COUNT_MASK) != 0) 1124 return false; 1125 1126 if (advanceRunLevel(TERMINATING)) 1127 startTerminating(); 1128 1129 // Finish now if all threads terminated; else in some subsequent call 1130 if ((workerCounts >>> TOTAL_COUNT_SHIFT) == 0) { 1131 advanceRunLevel(TERMINATED); 1132 termination.arrive(); 1133 } 1134 return true; 1135 } 1136 1137 1138 /** 1139 * Actions on transition to TERMINATING 1140 * 1141 * Runs up to four passes through workers: (0) shutting down each 1142 * (without waking up if parked) to quickly spread notifications 1143 * without unnecessary bouncing around event queues etc (1) wake 1144 * up and help cancel tasks (2) interrupt (3) mop up races with 1145 * interrupted workers 1146 */ 1147 private void startTerminating() { 1148 cancelSubmissions(); 1149 for (int passes = 0; passes < 4 && workerCounts != 0; ++passes) { 1150 int c; // advance event count 1151 UNSAFE.compareAndSwapInt(this, eventCountOffset, 1152 c = eventCount, c+1); 1153 eventWaiters = 0L; // clobber lists 1154 spareWaiters = 0; 1155 for (ForkJoinWorkerThread w : workers) { 1156 if (w != null) { 1157 w.shutdown(); 1158 if (passes > 0 && !w.isTerminated()) { 1159 w.cancelTasks(); 1160 LockSupport.unpark(w); 1161 if (passes > 1 && !w.isInterrupted()) { 1162 try { 1163 w.interrupt(); 1164 } catch (SecurityException ignore) { 1165 } 1166 } 1167 } 1168 } 1169 } 1170 } 1171 } 1172 1173 /** 1174 * Clears out and cancels submissions, ignoring exceptions. 1175 */ 1176 private void cancelSubmissions() { 1177 ForkJoinTask<?> task; 1178 while ((task = submissionQueue.poll()) != null) { 1179 try { 1180 task.cancel(false); 1181 } catch (Throwable ignore) { 1182 } 1183 } 1184 } 1185 1186 // misc support for ForkJoinWorkerThread 1187 1188 /** 1189 * Returns pool number. 1190 */ 1191 final int getPoolNumber() { 1192 return poolNumber; 1193 } 1194 1195 /** 1196 * Tries to accumulate steal count from a worker, clearing 1197 * the worker's value if successful. 1198 * 1199 * @return true if worker steal count now zero 1200 */ 1201 final boolean tryAccumulateStealCount(ForkJoinWorkerThread w) { 1202 int sc = w.stealCount; 1203 long c = stealCount; 1204 // CAS even if zero, for fence effects 1205 if (UNSAFE.compareAndSwapLong(this, stealCountOffset, c, c + sc)) { 1206 if (sc != 0) 1207 w.stealCount = 0; 1208 return true; 1209 } 1210 return sc == 0; 1211 } 1212 1213 /** 1214 * Returns the approximate (non-atomic) number of idle threads per 1215 * active thread. 1216 */ 1217 final int idlePerActive() { 1218 int pc = parallelism; // use parallelism, not rc 1219 int ac = runState; // no mask -- artificially boosts during shutdown 1220 // Use exact results for small values, saturate past 4 1221 return ((pc <= ac) ? 0 : 1222 (pc >>> 1 <= ac) ? 1 : 1223 (pc >>> 2 <= ac) ? 3 : 1224 pc >>> 3); 1225 } 1226 1227 // Public and protected methods 1228 1229 // Constructors 1230 1231 /** 1232 * Creates a {@code ForkJoinPool} with parallelism equal to {@link 1233 * java.lang.Runtime#availableProcessors}, using the {@linkplain 1234 * #defaultForkJoinWorkerThreadFactory default thread factory}, 1235 * no UncaughtExceptionHandler, and non-async LIFO processing mode. 1236 * 1237 * @throws SecurityException if a security manager exists and 1238 * the caller is not permitted to modify threads 1239 * because it does not hold {@link 1240 * java.lang.RuntimePermission}{@code ("modifyThread")} 1241 */ 1242 public ForkJoinPool() { 1243 this(Runtime.getRuntime().availableProcessors(), 1244 defaultForkJoinWorkerThreadFactory, null, false); 1245 } 1246 1247 /** 1248 * Creates a {@code ForkJoinPool} with the indicated parallelism 1249 * level, the {@linkplain 1250 * #defaultForkJoinWorkerThreadFactory default thread factory}, 1251 * no UncaughtExceptionHandler, and non-async LIFO processing mode. 1252 * 1253 * @param parallelism the parallelism level 1254 * @throws IllegalArgumentException if parallelism less than or 1255 * equal to zero, or greater than implementation limit 1256 * @throws SecurityException if a security manager exists and 1257 * the caller is not permitted to modify threads 1258 * because it does not hold {@link 1259 * java.lang.RuntimePermission}{@code ("modifyThread")} 1260 */ 1261 public ForkJoinPool(int parallelism) { 1262 this(parallelism, defaultForkJoinWorkerThreadFactory, null, false); 1263 } 1264 1265 /** 1266 * Creates a {@code ForkJoinPool} with the given parameters. 1267 * 1268 * @param parallelism the parallelism level. For default value, 1269 * use {@link java.lang.Runtime#availableProcessors}. 1270 * @param factory the factory for creating new threads. For default value, 1271 * use {@link #defaultForkJoinWorkerThreadFactory}. 1272 * @param handler the handler for internal worker threads that 1273 * terminate due to unrecoverable errors encountered while executing 1274 * tasks. For default value, use {@code null}. 1275 * @param asyncMode if true, 1276 * establishes local first-in-first-out scheduling mode for forked 1277 * tasks that are never joined. This mode may be more appropriate 1278 * than default locally stack-based mode in applications in which 1279 * worker threads only process event-style asynchronous tasks. 1280 * For default value, use {@code false}. 1281 * @throws IllegalArgumentException if parallelism less than or 1282 * equal to zero, or greater than implementation limit 1283 * @throws NullPointerException if the factory is null 1284 * @throws SecurityException if a security manager exists and 1285 * the caller is not permitted to modify threads 1286 * because it does not hold {@link 1287 * java.lang.RuntimePermission}{@code ("modifyThread")} 1288 */ 1289 public ForkJoinPool(int parallelism, 1290 ForkJoinWorkerThreadFactory factory, 1291 Thread.UncaughtExceptionHandler handler, 1292 boolean asyncMode) { 1293 checkPermission(); 1294 if (factory == null) 1295 throw new NullPointerException(); 1296 if (parallelism <= 0 || parallelism > MAX_WORKERS) 1297 throw new IllegalArgumentException(); 1298 this.parallelism = parallelism; 1299 this.factory = factory; 1300 this.ueh = handler; 1301 this.locallyFifo = asyncMode; 1302 int arraySize = initialArraySizeFor(parallelism); 1303 this.workers = new ForkJoinWorkerThread[arraySize]; 1304 this.submissionQueue = new LinkedTransferQueue<ForkJoinTask<?>>(); 1305 this.workerLock = new ReentrantLock(); 1306 this.termination = new Phaser(1); 1307 this.poolNumber = poolNumberGenerator.incrementAndGet(); 1308 } 1309 1310 /** 1311 * Returns initial power of two size for workers array. 1312 * @param pc the initial parallelism level 1313 */ 1314 private static int initialArraySizeFor(int pc) { 1315 // If possible, initially allocate enough space for one spare 1316 int size = pc < MAX_WORKERS ? pc + 1 : MAX_WORKERS; 1317 // See Hackers Delight, sec 3.2. We know MAX_WORKERS < (1 >>> 16) 1318 size |= size >>> 1; 1319 size |= size >>> 2; 1320 size |= size >>> 4; 1321 size |= size >>> 8; 1322 return size + 1; 1323 } 1324 1325 // Execution methods 1326 1327 /** 1328 * Common code for execute, invoke and submit 1329 */ 1330 private <T> void doSubmit(ForkJoinTask<T> task) { 1331 if (task == null) 1332 throw new NullPointerException(); 1333 if (runState >= SHUTDOWN) 1334 throw new RejectedExecutionException(); 1335 submissionQueue.offer(task); 1336 int c; // try to increment event count -- CAS failure OK 1337 UNSAFE.compareAndSwapInt(this, eventCountOffset, c = eventCount, c+1); 1338 helpMaintainParallelism(); // create, start, or resume some workers 1339 } 1340 1341 /** 1342 * Performs the given task, returning its result upon completion. 1343 * 1344 * @param task the task 1345 * @return the task's result 1346 * @throws NullPointerException if the task is null 1347 * @throws RejectedExecutionException if the task cannot be 1348 * scheduled for execution 1349 */ 1350 public <T> T invoke(ForkJoinTask<T> task) { 1351 doSubmit(task); 1352 return task.join(); 1353 } 1354 1355 /** 1356 * Arranges for (asynchronous) execution of the given task. 1357 * 1358 * @param task the task 1359 * @throws NullPointerException if the task is null 1360 * @throws RejectedExecutionException if the task cannot be 1361 * scheduled for execution 1362 */ 1363 public void execute(ForkJoinTask<?> task) { 1364 doSubmit(task); 1365 } 1366 1367 // AbstractExecutorService methods 1368 1369 /** 1370 * @throws NullPointerException if the task is null 1371 * @throws RejectedExecutionException if the task cannot be 1372 * scheduled for execution 1373 */ 1374 public void execute(Runnable task) { 1375 ForkJoinTask<?> job; 1376 if (task instanceof ForkJoinTask<?>) // avoid re-wrap 1377 job = (ForkJoinTask<?>) task; 1378 else 1379 job = ForkJoinTask.adapt(task, null); 1380 doSubmit(job); 1381 } 1382 1383 /** 1384 * Submits a ForkJoinTask for execution. 1385 * 1386 * @param task the task to submit 1387 * @return the task 1388 * @throws NullPointerException if the task is null 1389 * @throws RejectedExecutionException if the task cannot be 1390 * scheduled for execution 1391 */ 1392 public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) { 1393 doSubmit(task); 1394 return task; 1395 } 1396 1397 /** 1398 * @throws NullPointerException if the task is null 1399 * @throws RejectedExecutionException if the task cannot be 1400 * scheduled for execution 1401 */ 1402 public <T> ForkJoinTask<T> submit(Callable<T> task) { 1403 ForkJoinTask<T> job = ForkJoinTask.adapt(task); 1404 doSubmit(job); 1405 return job; 1406 } 1407 1408 /** 1409 * @throws NullPointerException if the task is null 1410 * @throws RejectedExecutionException if the task cannot be 1411 * scheduled for execution 1412 */ 1413 public <T> ForkJoinTask<T> submit(Runnable task, T result) { 1414 ForkJoinTask<T> job = ForkJoinTask.adapt(task, result); 1415 doSubmit(job); 1416 return job; 1417 } 1418 1419 /** 1420 * @throws NullPointerException if the task is null 1421 * @throws RejectedExecutionException if the task cannot be 1422 * scheduled for execution 1423 */ 1424 public ForkJoinTask<?> submit(Runnable task) { 1425 ForkJoinTask<?> job; 1426 if (task instanceof ForkJoinTask<?>) // avoid re-wrap 1427 job = (ForkJoinTask<?>) task; 1428 else 1429 job = ForkJoinTask.adapt(task, null); 1430 doSubmit(job); 1431 return job; 1432 } 1433 1434 /** 1435 * @throws NullPointerException {@inheritDoc} 1436 * @throws RejectedExecutionException {@inheritDoc} 1437 */ 1438 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) { 1439 ArrayList<ForkJoinTask<T>> forkJoinTasks = 1440 new ArrayList<ForkJoinTask<T>>(tasks.size()); 1441 for (Callable<T> task : tasks) 1442 forkJoinTasks.add(ForkJoinTask.adapt(task)); 1443 invoke(new InvokeAll<T>(forkJoinTasks)); 1444 1445 @SuppressWarnings({"unchecked", "rawtypes"}) 1446 List<Future<T>> futures = (List<Future<T>>) (List) forkJoinTasks; 1447 return futures; 1448 } 1449 1450 static final class InvokeAll<T> extends RecursiveAction { 1451 final ArrayList<ForkJoinTask<T>> tasks; 1452 InvokeAll(ArrayList<ForkJoinTask<T>> tasks) { this.tasks = tasks; } 1453 public void compute() { 1454 try { invokeAll(tasks); } 1455 catch (Exception ignore) {} 1456 } 1457 private static final long serialVersionUID = -7914297376763021607L; 1458 } 1459 1460 /** 1461 * Returns the factory used for constructing new workers. 1462 * 1463 * @return the factory used for constructing new workers 1464 */ 1465 public ForkJoinWorkerThreadFactory getFactory() { 1466 return factory; 1467 } 1468 1469 /** 1470 * Returns the handler for internal worker threads that terminate 1471 * due to unrecoverable errors encountered while executing tasks. 1472 * 1473 * @return the handler, or {@code null} if none 1474 */ 1475 public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() { 1476 return ueh; 1477 } 1478 1479 /** 1480 * Returns the targeted parallelism level of this pool. 1481 * 1482 * @return the targeted parallelism level of this pool 1483 */ 1484 public int getParallelism() { 1485 return parallelism; 1486 } 1487 1488 /** 1489 * Returns the number of worker threads that have started but not 1490 * yet terminated. The result returned by this method may differ 1491 * from {@link #getParallelism} when threads are created to 1492 * maintain parallelism when others are cooperatively blocked. 1493 * 1494 * @return the number of worker threads 1495 */ 1496 public int getPoolSize() { 1497 return workerCounts >>> TOTAL_COUNT_SHIFT; 1498 } 1499 1500 /** 1501 * Returns {@code true} if this pool uses local first-in-first-out 1502 * scheduling mode for forked tasks that are never joined. 1503 * 1504 * @return {@code true} if this pool uses async mode 1505 */ 1506 public boolean getAsyncMode() { 1507 return locallyFifo; 1508 } 1509 1510 /** 1511 * Returns an estimate of the number of worker threads that are 1512 * not blocked waiting to join tasks or for other managed 1513 * synchronization. This method may overestimate the 1514 * number of running threads. 1515 * 1516 * @return the number of worker threads 1517 */ 1518 public int getRunningThreadCount() { 1519 return workerCounts & RUNNING_COUNT_MASK; 1520 } 1521 1522 /** 1523 * Returns an estimate of the number of threads that are currently 1524 * stealing or executing tasks. This method may overestimate the 1525 * number of active threads. 1526 * 1527 * @return the number of active threads 1528 */ 1529 public int getActiveThreadCount() { 1530 return runState & ACTIVE_COUNT_MASK; 1531 } 1532 1533 /** 1534 * Returns {@code true} if all worker threads are currently idle. 1535 * An idle worker is one that cannot obtain a task to execute 1536 * because none are available to steal from other threads, and 1537 * there are no pending submissions to the pool. This method is 1538 * conservative; it might not return {@code true} immediately upon 1539 * idleness of all threads, but will eventually become true if 1540 * threads remain inactive. 1541 * 1542 * @return {@code true} if all threads are currently idle 1543 */ 1544 public boolean isQuiescent() { 1545 return (runState & ACTIVE_COUNT_MASK) == 0; 1546 } 1547 1548 /** 1549 * Returns an estimate of the total number of tasks stolen from 1550 * one thread's work queue by another. The reported value 1551 * underestimates the actual total number of steals when the pool 1552 * is not quiescent. This value may be useful for monitoring and 1553 * tuning fork/join programs: in general, steal counts should be 1554 * high enough to keep threads busy, but low enough to avoid 1555 * overhead and contention across threads. 1556 * 1557 * @return the number of steals 1558 */ 1559 public long getStealCount() { 1560 return stealCount; 1561 } 1562 1563 /** 1564 * Returns an estimate of the total number of tasks currently held 1565 * in queues by worker threads (but not including tasks submitted 1566 * to the pool that have not begun executing). This value is only 1567 * an approximation, obtained by iterating across all threads in 1568 * the pool. This method may be useful for tuning task 1569 * granularities. 1570 * 1571 * @return the number of queued tasks 1572 */ 1573 public long getQueuedTaskCount() { 1574 long count = 0; 1575 for (ForkJoinWorkerThread w : workers) 1576 if (w != null) 1577 count += w.getQueueSize(); 1578 return count; 1579 } 1580 1581 /** 1582 * Returns an estimate of the number of tasks submitted to this 1583 * pool that have not yet begun executing. This method takes time 1584 * proportional to the number of submissions. 1585 * 1586 * @return the number of queued submissions 1587 */ 1588 public int getQueuedSubmissionCount() { 1589 return submissionQueue.size(); 1590 } 1591 1592 /** 1593 * Returns {@code true} if there are any tasks submitted to this 1594 * pool that have not yet begun executing. 1595 * 1596 * @return {@code true} if there are any queued submissions 1597 */ 1598 public boolean hasQueuedSubmissions() { 1599 return !submissionQueue.isEmpty(); 1600 } 1601 1602 /** 1603 * Removes and returns the next unexecuted submission if one is 1604 * available. This method may be useful in extensions to this 1605 * class that re-assign work in systems with multiple pools. 1606 * 1607 * @return the next submission, or {@code null} if none 1608 */ 1609 protected ForkJoinTask<?> pollSubmission() { 1610 return submissionQueue.poll(); 1611 } 1612 1613 /** 1614 * Removes all available unexecuted submitted and forked tasks 1615 * from scheduling queues and adds them to the given collection, 1616 * without altering their execution status. These may include 1617 * artificially generated or wrapped tasks. This method is 1618 * designed to be invoked only when the pool is known to be 1619 * quiescent. Invocations at other times may not remove all 1620 * tasks. A failure encountered while attempting to add elements 1621 * to collection {@code c} may result in elements being in 1622 * neither, either or both collections when the associated 1623 * exception is thrown. The behavior of this operation is 1624 * undefined if the specified collection is modified while the 1625 * operation is in progress. 1626 * 1627 * @param c the collection to transfer elements into 1628 * @return the number of elements transferred 1629 */ 1630 protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) { 1631 int count = submissionQueue.drainTo(c); 1632 for (ForkJoinWorkerThread w : workers) 1633 if (w != null) 1634 count += w.drainTasksTo(c); 1635 return count; 1636 } 1637 1638 /** 1639 * Returns a string identifying this pool, as well as its state, 1640 * including indications of run state, parallelism level, and 1641 * worker and task counts. 1642 * 1643 * @return a string identifying this pool, as well as its state 1644 */ 1645 public String toString() { 1646 long st = getStealCount(); 1647 long qt = getQueuedTaskCount(); 1648 long qs = getQueuedSubmissionCount(); 1649 int wc = workerCounts; 1650 int tc = wc >>> TOTAL_COUNT_SHIFT; 1651 int rc = wc & RUNNING_COUNT_MASK; 1652 int pc = parallelism; 1653 int rs = runState; 1654 int ac = rs & ACTIVE_COUNT_MASK; 1655 return super.toString() + 1656 "[" + runLevelToString(rs) + 1657 ", parallelism = " + pc + 1658 ", size = " + tc + 1659 ", active = " + ac + 1660 ", running = " + rc + 1661 ", steals = " + st + 1662 ", tasks = " + qt + 1663 ", submissions = " + qs + 1664 "]"; 1665 } 1666 1667 private static String runLevelToString(int s) { 1668 return ((s & TERMINATED) != 0 ? "Terminated" : 1669 ((s & TERMINATING) != 0 ? "Terminating" : 1670 ((s & SHUTDOWN) != 0 ? "Shutting down" : 1671 "Running"))); 1672 } 1673 1674 /** 1675 * Initiates an orderly shutdown in which previously submitted 1676 * tasks are executed, but no new tasks will be accepted. 1677 * Invocation has no additional effect if already shut down. 1678 * Tasks that are in the process of being submitted concurrently 1679 * during the course of this method may or may not be rejected. 1680 * 1681 * @throws SecurityException if a security manager exists and 1682 * the caller is not permitted to modify threads 1683 * because it does not hold {@link 1684 * java.lang.RuntimePermission}{@code ("modifyThread")} 1685 */ 1686 public void shutdown() { 1687 checkPermission(); 1688 advanceRunLevel(SHUTDOWN); 1689 tryTerminate(false); 1690 } 1691 1692 /** 1693 * Attempts to cancel and/or stop all tasks, and reject all 1694 * subsequently submitted tasks. Tasks that are in the process of 1695 * being submitted or executed concurrently during the course of 1696 * this method may or may not be rejected. This method cancels 1697 * both existing and unexecuted tasks, in order to permit 1698 * termination in the presence of task dependencies. So the method 1699 * always returns an empty list (unlike the case for some other 1700 * Executors). 1701 * 1702 * @return an empty list 1703 * @throws SecurityException if a security manager exists and 1704 * the caller is not permitted to modify threads 1705 * because it does not hold {@link 1706 * java.lang.RuntimePermission}{@code ("modifyThread")} 1707 */ 1708 public List<Runnable> shutdownNow() { 1709 checkPermission(); 1710 tryTerminate(true); 1711 return Collections.emptyList(); 1712 } 1713 1714 /** 1715 * Returns {@code true} if all tasks have completed following shut down. 1716 * 1717 * @return {@code true} if all tasks have completed following shut down 1718 */ 1719 public boolean isTerminated() { 1720 return runState >= TERMINATED; 1721 } 1722 1723 /** 1724 * Returns {@code true} if the process of termination has 1725 * commenced but not yet completed. This method may be useful for 1726 * debugging. A return of {@code true} reported a sufficient 1727 * period after shutdown may indicate that submitted tasks have 1728 * ignored or suppressed interruption, causing this executor not 1729 * to properly terminate. 1730 * 1731 * @return {@code true} if terminating but not yet terminated 1732 */ 1733 public boolean isTerminating() { 1734 return (runState & (TERMINATING|TERMINATED)) == TERMINATING; 1735 } 1736 1737 /** 1738 * Returns true if terminating or terminated. Used by ForkJoinWorkerThread. 1739 */ 1740 final boolean isAtLeastTerminating() { 1741 return runState >= TERMINATING; 1742 } 1743 1744 /** 1745 * Returns {@code true} if this pool has been shut down. 1746 * 1747 * @return {@code true} if this pool has been shut down 1748 */ 1749 public boolean isShutdown() { 1750 return runState >= SHUTDOWN; 1751 } 1752 1753 /** 1754 * Blocks until all tasks have completed execution after a shutdown 1755 * request, or the timeout occurs, or the current thread is 1756 * interrupted, whichever happens first. 1757 * 1758 * @param timeout the maximum time to wait 1759 * @param unit the time unit of the timeout argument 1760 * @return {@code true} if this executor terminated and 1761 * {@code false} if the timeout elapsed before termination 1762 * @throws InterruptedException if interrupted while waiting 1763 */ 1764 public boolean awaitTermination(long timeout, TimeUnit unit) 1765 throws InterruptedException { 1766 try { 1767 return termination.awaitAdvanceInterruptibly(0, timeout, unit) > 0; 1768 } catch (TimeoutException ex) { 1769 return false; 1770 } 1771 } 1772 1773 /** 1774 * Interface for extending managed parallelism for tasks running 1775 * in {@link ForkJoinPool}s. 1776 * 1777 * <p>A {@code ManagedBlocker} provides two methods. Method 1778 * {@code isReleasable} must return {@code true} if blocking is 1779 * not necessary. Method {@code block} blocks the current thread 1780 * if necessary (perhaps internally invoking {@code isReleasable} 1781 * before actually blocking). The unusual methods in this API 1782 * accommodate synchronizers that may, but don't usually, block 1783 * for long periods. Similarly, they allow more efficient internal 1784 * handling of cases in which additional workers may be, but 1785 * usually are not, needed to ensure sufficient parallelism. 1786 * Toward this end, implementations of method {@code isReleasable} 1787 * must be amenable to repeated invocation. 1788 * 1789 * <p>For example, here is a ManagedBlocker based on a 1790 * ReentrantLock: 1791 * <pre> {@code 1792 * class ManagedLocker implements ManagedBlocker { 1793 * final ReentrantLock lock; 1794 * boolean hasLock = false; 1795 * ManagedLocker(ReentrantLock lock) { this.lock = lock; } 1796 * public boolean block() { 1797 * if (!hasLock) 1798 * lock.lock(); 1799 * return true; 1800 * } 1801 * public boolean isReleasable() { 1802 * return hasLock || (hasLock = lock.tryLock()); 1803 * } 1804 * }}</pre> 1805 * 1806 * <p>Here is a class that possibly blocks waiting for an 1807 * item on a given queue: 1808 * <pre> {@code 1809 * class QueueTaker<E> implements ManagedBlocker { 1810 * final BlockingQueue<E> queue; 1811 * volatile E item = null; 1812 * QueueTaker(BlockingQueue<E> q) { this.queue = q; } 1813 * public boolean block() throws InterruptedException { 1814 * if (item == null) 1815 * item = queue.take(); 1816 * return true; 1817 * } 1818 * public boolean isReleasable() { 1819 * return item != null || (item = queue.poll()) != null; 1820 * } 1821 * public E getItem() { // call after pool.managedBlock completes 1822 * return item; 1823 * } 1824 * }}</pre> 1825 */ 1826 public static interface ManagedBlocker { 1827 /** 1828 * Possibly blocks the current thread, for example waiting for 1829 * a lock or condition. 1830 * 1831 * @return {@code true} if no additional blocking is necessary 1832 * (i.e., if isReleasable would return true) 1833 * @throws InterruptedException if interrupted while waiting 1834 * (the method is not required to do so, but is allowed to) 1835 */ 1836 boolean block() throws InterruptedException; 1837 1838 /** 1839 * Returns {@code true} if blocking is unnecessary. 1840 */ 1841 boolean isReleasable(); 1842 } 1843 1844 /** 1845 * Blocks in accord with the given blocker. If the current thread 1846 * is a {@link ForkJoinWorkerThread}, this method possibly 1847 * arranges for a spare thread to be activated if necessary to 1848 * ensure sufficient parallelism while the current thread is blocked. 1849 * 1850 * <p>If the caller is not a {@link ForkJoinTask}, this method is 1851 * behaviorally equivalent to 1852 * <pre> {@code 1853 * while (!blocker.isReleasable()) 1854 * if (blocker.block()) 1855 * return; 1856 * }</pre> 1857 * 1858 * If the caller is a {@code ForkJoinTask}, then the pool may 1859 * first be expanded to ensure parallelism, and later adjusted. 1860 * 1861 * @param blocker the blocker 1862 * @throws InterruptedException if blocker.block did so 1863 */ 1864 public static void managedBlock(ManagedBlocker blocker) 1865 throws InterruptedException { 1866 Thread t = Thread.currentThread(); 1867 if (t instanceof ForkJoinWorkerThread) { 1868 ForkJoinWorkerThread w = (ForkJoinWorkerThread) t; 1869 w.pool.awaitBlocker(blocker); 1870 } 1871 else { 1872 do {} while (!blocker.isReleasable() && !blocker.block()); 1873 } 1874 } 1875 1876 // AbstractExecutorService overrides. These rely on undocumented 1877 // fact that ForkJoinTask.adapt returns ForkJoinTasks that also 1878 // implement RunnableFuture. 1879 1880 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { 1881 return (RunnableFuture<T>) ForkJoinTask.adapt(runnable, value); 1882 } 1883 1884 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { 1885 return (RunnableFuture<T>) ForkJoinTask.adapt(callable); 1886 } 1887 1888 // Unsafe mechanics 1889 1890 private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe(); 1891 private static final long workerCountsOffset = 1892 objectFieldOffset("workerCounts", ForkJoinPool.class); 1893 private static final long runStateOffset = 1894 objectFieldOffset("runState", ForkJoinPool.class); 1895 private static final long eventCountOffset = 1896 objectFieldOffset("eventCount", ForkJoinPool.class); 1897 private static final long eventWaitersOffset = 1898 objectFieldOffset("eventWaiters", ForkJoinPool.class); 1899 private static final long stealCountOffset = 1900 objectFieldOffset("stealCount", ForkJoinPool.class); 1901 private static final long spareWaitersOffset = 1902 objectFieldOffset("spareWaiters", ForkJoinPool.class); 1903 1904 private static long objectFieldOffset(String field, Class<?> klazz) { 1905 try { 1906 return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field)); 1907 } catch (NoSuchFieldException e) { 1908 // Convert Exception to corresponding Error 1909 NoSuchFieldError error = new NoSuchFieldError(field); 1910 error.initCause(e); 1911 throw error; 1912 } 1913 } 1914 }