Print this page
Split |
Close |
Expand all |
Collapse all |
--- old/src/share/classes/java/util/concurrent/ForkJoinPool.java
+++ new/src/share/classes/java/util/concurrent/ForkJoinPool.java
1 1 /*
2 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3 3 *
4 4 * This code is free software; you can redistribute it and/or modify it
5 5 * under the terms of the GNU General Public License version 2 only, as
6 6 * published by the Free Software Foundation. Oracle designates this
7 7 * particular file as subject to the "Classpath" exception as provided
8 8 * by Oracle in the LICENSE file that accompanied this code.
9 9 *
10 10 * This code is distributed in the hope that it will be useful, but WITHOUT
11 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 13 * version 2 for more details (a copy is included in the LICENSE file that
14 14 * accompanied this code).
15 15 *
16 16 * You should have received a copy of the GNU General Public License version
17 17 * 2 along with this work; if not, write to the Free Software Foundation,
18 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19 19 *
20 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21 21 * or visit www.oracle.com if you need additional information or have any
22 22 * questions.
23 23 */
24 24
25 25 /*
26 26 * This file is available under and governed by the GNU General Public
27 27 * License version 2 only, as published by the Free Software Foundation.
28 28 * However, the following notice accompanied the original version of this
29 29 * file:
30 30 *
31 31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 32 * Expert Group and released to the public domain, as explained at
33 33 * http://creativecommons.org/licenses/publicdomain
34 34 */
35 35
36 36 package java.util.concurrent;
37 37
38 38 import java.util.ArrayList;
39 39 import java.util.Arrays;
40 40 import java.util.Collection;
41 41 import java.util.Collections;
42 42 import java.util.List;
43 43 import java.util.concurrent.AbstractExecutorService;
44 44 import java.util.concurrent.Callable;
45 45 import java.util.concurrent.ExecutorService;
46 46 import java.util.concurrent.Future;
47 47 import java.util.concurrent.RejectedExecutionException;
48 48 import java.util.concurrent.RunnableFuture;
49 49 import java.util.concurrent.TimeUnit;
50 50 import java.util.concurrent.TimeoutException;
51 51 import java.util.concurrent.atomic.AtomicInteger;
52 52 import java.util.concurrent.locks.LockSupport;
53 53 import java.util.concurrent.locks.ReentrantLock;
54 54
55 55 /**
56 56 * An {@link ExecutorService} for running {@link ForkJoinTask}s.
57 57 * A {@code ForkJoinPool} provides the entry point for submissions
58 58 * from non-{@code ForkJoinTask} clients, as well as management and
59 59 * monitoring operations.
60 60 *
61 61 * <p>A {@code ForkJoinPool} differs from other kinds of {@link
62 62 * ExecutorService} mainly by virtue of employing
63 63 * <em>work-stealing</em>: all threads in the pool attempt to find and
64 64 * execute subtasks created by other active tasks (eventually blocking
65 65 * waiting for work if none exist). This enables efficient processing
66 66 * when most tasks spawn other subtasks (as do most {@code
67 67 * ForkJoinTask}s). When setting <em>asyncMode</em> to true in
68 68 * constructors, {@code ForkJoinPool}s may also be appropriate for use
69 69 * with event-style tasks that are never joined.
70 70 *
71 71 * <p>A {@code ForkJoinPool} is constructed with a given target
72 72 * parallelism level; by default, equal to the number of available
73 73 * processors. The pool attempts to maintain enough active (or
74 74 * available) threads by dynamically adding, suspending, or resuming
75 75 * internal worker threads, even if some tasks are stalled waiting to
76 76 * join others. However, no such adjustments are guaranteed in the
77 77 * face of blocked IO or other unmanaged synchronization. The nested
78 78 * {@link ManagedBlocker} interface enables extension of the kinds of
79 79 * synchronization accommodated.
80 80 *
81 81 * <p>In addition to execution and lifecycle control methods, this
82 82 * class provides status check methods (for example
83 83 * {@link #getStealCount}) that are intended to aid in developing,
84 84 * tuning, and monitoring fork/join applications. Also, method
85 85 * {@link #toString} returns indications of pool state in a
86 86 * convenient form for informal monitoring.
87 87 *
88 88 * <p> As is the case with other ExecutorServices, there are three
89 89 * main task execution methods summarized in the following
90 90 * table. These are designed to be used by clients not already engaged
91 91 * in fork/join computations in the current pool. The main forms of
92 92 * these methods accept instances of {@code ForkJoinTask}, but
93 93 * overloaded forms also allow mixed execution of plain {@code
94 94 * Runnable}- or {@code Callable}- based activities as well. However,
95 95 * tasks that are already executing in a pool should normally
96 96 * <em>NOT</em> use these pool execution methods, but instead use the
97 97 * within-computation forms listed in the table.
98 98 *
99 99 * <table BORDER CELLPADDING=3 CELLSPACING=1>
100 100 * <tr>
101 101 * <td></td>
102 102 * <td ALIGN=CENTER> <b>Call from non-fork/join clients</b></td>
103 103 * <td ALIGN=CENTER> <b>Call from within fork/join computations</b></td>
104 104 * </tr>
105 105 * <tr>
106 106 * <td> <b>Arrange async execution</td>
107 107 * <td> {@link #execute(ForkJoinTask)}</td>
108 108 * <td> {@link ForkJoinTask#fork}</td>
109 109 * </tr>
110 110 * <tr>
111 111 * <td> <b>Await and obtain result</td>
112 112 * <td> {@link #invoke(ForkJoinTask)}</td>
113 113 * <td> {@link ForkJoinTask#invoke}</td>
114 114 * </tr>
115 115 * <tr>
116 116 * <td> <b>Arrange exec and obtain Future</td>
117 117 * <td> {@link #submit(ForkJoinTask)}</td>
118 118 * <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td>
119 119 * </tr>
120 120 * </table>
121 121 *
122 122 * <p><b>Sample Usage.</b> Normally a single {@code ForkJoinPool} is
123 123 * used for all parallel task execution in a program or subsystem.
124 124 * Otherwise, use would not usually outweigh the construction and
125 125 * bookkeeping overhead of creating a large set of threads. For
126 126 * example, a common pool could be used for the {@code SortTasks}
127 127 * illustrated in {@link RecursiveAction}. Because {@code
128 128 * ForkJoinPool} uses threads in {@linkplain java.lang.Thread#isDaemon
129 129 * daemon} mode, there is typically no need to explicitly {@link
130 130 * #shutdown} such a pool upon program exit.
131 131 *
132 132 * <pre>
133 133 * static final ForkJoinPool mainPool = new ForkJoinPool();
134 134 * ...
135 135 * public void sort(long[] array) {
136 136 * mainPool.invoke(new SortTask(array, 0, array.length));
137 137 * }
138 138 * </pre>
139 139 *
140 140 * <p><b>Implementation notes</b>: This implementation restricts the
141 141 * maximum number of running threads to 32767. Attempts to create
142 142 * pools with greater than the maximum number result in
143 143 * {@code IllegalArgumentException}.
144 144 *
145 145 * <p>This implementation rejects submitted tasks (that is, by throwing
146 146 * {@link RejectedExecutionException}) only when the pool is shut down
147 147 * or internal resources have been exhausted.
148 148 *
149 149 * @since 1.7
150 150 * @author Doug Lea
151 151 */
152 152 public class ForkJoinPool extends AbstractExecutorService {
153 153
154 154 /*
155 155 * Implementation Overview
156 156 *
157 157 * This class provides the central bookkeeping and control for a
158 158 * set of worker threads: Submissions from non-FJ threads enter
159 159 * into a submission queue. Workers take these tasks and typically
160 160 * split them into subtasks that may be stolen by other workers.
161 161 * The main work-stealing mechanics implemented in class
162 162 * ForkJoinWorkerThread give first priority to processing tasks
163 163 * from their own queues (LIFO or FIFO, depending on mode), then
164 164 * to randomized FIFO steals of tasks in other worker queues, and
165 165 * lastly to new submissions. These mechanics do not consider
166 166 * affinities, loads, cache localities, etc, so rarely provide the
167 167 * best possible performance on a given machine, but portably
168 168 * provide good throughput by averaging over these factors.
169 169 * (Further, even if we did try to use such information, we do not
170 170 * usually have a basis for exploiting it. For example, some sets
171 171 * of tasks profit from cache affinities, but others are harmed by
172 172 * cache pollution effects.)
173 173 *
174 174 * Beyond work-stealing support and essential bookkeeping, the
175 175 * main responsibility of this framework is to take actions when
176 176 * one worker is waiting to join a task stolen (or always held by)
177 177 * another. Because we are multiplexing many tasks on to a pool
178 178 * of workers, we can't just let them block (as in Thread.join).
179 179 * We also cannot just reassign the joiner's run-time stack with
180 180 * another and replace it later, which would be a form of
181 181 * "continuation", that even if possible is not necessarily a good
182 182 * idea. Given that the creation costs of most threads on most
183 183 * systems mainly surrounds setting up runtime stacks, thread
184 184 * creation and switching is usually not much more expensive than
185 185 * stack creation and switching, and is more flexible). Instead we
186 186 * combine two tactics:
187 187 *
188 188 * Helping: Arranging for the joiner to execute some task that it
189 189 * would be running if the steal had not occurred. Method
190 190 * ForkJoinWorkerThread.helpJoinTask tracks joining->stealing
191 191 * links to try to find such a task.
192 192 *
193 193 * Compensating: Unless there are already enough live threads,
194 194 * method helpMaintainParallelism() may create or
195 195 * re-activate a spare thread to compensate for blocked
196 196 * joiners until they unblock.
197 197 *
198 198 * It is impossible to keep exactly the target (parallelism)
199 199 * number of threads running at any given time. Determining
200 200 * existence of conservatively safe helping targets, the
201 201 * availability of already-created spares, and the apparent need
202 202 * to create new spares are all racy and require heuristic
203 203 * guidance, so we rely on multiple retries of each. Compensation
204 204 * occurs in slow-motion. It is triggered only upon timeouts of
205 205 * Object.wait used for joins. This reduces poor decisions that
206 206 * would otherwise be made when threads are waiting for others
207 207 * that are stalled because of unrelated activities such as
208 208 * garbage collection.
209 209 *
210 210 * The ManagedBlocker extension API can't use helping so relies
211 211 * only on compensation in method awaitBlocker.
212 212 *
213 213 * The main throughput advantages of work-stealing stem from
214 214 * decentralized control -- workers mostly steal tasks from each
215 215 * other. We do not want to negate this by creating bottlenecks
216 216 * implementing other management responsibilities. So we use a
217 217 * collection of techniques that avoid, reduce, or cope well with
218 218 * contention. These entail several instances of bit-packing into
219 219 * CASable fields to maintain only the minimally required
220 220 * atomicity. To enable such packing, we restrict maximum
221 221 * parallelism to (1<<15)-1 (enabling twice this (to accommodate
222 222 * unbalanced increments and decrements) to fit into a 16 bit
223 223 * field, which is far in excess of normal operating range. Even
224 224 * though updates to some of these bookkeeping fields do sometimes
225 225 * contend with each other, they don't normally cache-contend with
226 226 * updates to others enough to warrant memory padding or
227 227 * isolation. So they are all held as fields of ForkJoinPool
228 228 * objects. The main capabilities are as follows:
229 229 *
230 230 * 1. Creating and removing workers. Workers are recorded in the
231 231 * "workers" array. This is an array as opposed to some other data
232 232 * structure to support index-based random steals by workers.
233 233 * Updates to the array recording new workers and unrecording
234 234 * terminated ones are protected from each other by a lock
235 235 * (workerLock) but the array is otherwise concurrently readable,
236 236 * and accessed directly by workers. To simplify index-based
237 237 * operations, the array size is always a power of two, and all
238 238 * readers must tolerate null slots. Currently, all worker thread
239 239 * creation is on-demand, triggered by task submissions,
240 240 * replacement of terminated workers, and/or compensation for
241 241 * blocked workers. However, all other support code is set up to
242 242 * work with other policies.
243 243 *
244 244 * To ensure that we do not hold on to worker references that
245 245 * would prevent GC, ALL accesses to workers are via indices into
246 246 * the workers array (which is one source of some of the unusual
247 247 * code constructions here). In essence, the workers array serves
248 248 * as a WeakReference mechanism. Thus for example the event queue
249 249 * stores worker indices, not worker references. Access to the
250 250 * workers in associated methods (for example releaseEventWaiters)
251 251 * must both index-check and null-check the IDs. All such accesses
252 252 * ignore bad IDs by returning out early from what they are doing,
253 253 * since this can only be associated with shutdown, in which case
254 254 * it is OK to give up. On termination, we just clobber these
255 255 * data structures without trying to use them.
256 256 *
257 257 * 2. Bookkeeping for dynamically adding and removing workers. We
258 258 * aim to approximately maintain the given level of parallelism.
259 259 * When some workers are known to be blocked (on joins or via
260 260 * ManagedBlocker), we may create or resume others to take their
261 261 * place until they unblock (see below). Implementing this
262 262 * requires counts of the number of "running" threads (i.e., those
263 263 * that are neither blocked nor artificially suspended) as well as
264 264 * the total number. These two values are packed into one field,
265 265 * "workerCounts" because we need accurate snapshots when deciding
266 266 * to create, resume or suspend. Note however that the
267 267 * correspondence of these counts to reality is not guaranteed. In
268 268 * particular updates for unblocked threads may lag until they
269 269 * actually wake up.
270 270 *
271 271 * 3. Maintaining global run state. The run state of the pool
272 272 * consists of a runLevel (SHUTDOWN, TERMINATING, etc) similar to
273 273 * those in other Executor implementations, as well as a count of
274 274 * "active" workers -- those that are, or soon will be, or
275 275 * recently were executing tasks. The runLevel and active count
276 276 * are packed together in order to correctly trigger shutdown and
277 277 * termination. Without care, active counts can be subject to very
278 278 * high contention. We substantially reduce this contention by
279 279 * relaxing update rules. A worker must claim active status
280 280 * prospectively, by activating if it sees that a submitted or
281 281 * stealable task exists (it may find after activating that the
282 282 * task no longer exists). It stays active while processing this
283 283 * task (if it exists) and any other local subtasks it produces,
284 284 * until it cannot find any other tasks. It then tries
285 285 * inactivating (see method preStep), but upon update contention
286 286 * instead scans for more tasks, later retrying inactivation if it
287 287 * doesn't find any.
288 288 *
289 289 * 4. Managing idle workers waiting for tasks. We cannot let
290 290 * workers spin indefinitely scanning for tasks when none are
291 291 * available. On the other hand, we must quickly prod them into
292 292 * action when new tasks are submitted or generated. We
293 293 * park/unpark these idle workers using an event-count scheme.
294 294 * Field eventCount is incremented upon events that may enable
295 295 * workers that previously could not find a task to now find one:
296 296 * Submission of a new task to the pool, or another worker pushing
297 297 * a task onto a previously empty queue. (We also use this
298 298 * mechanism for configuration and termination actions that
299 299 * require wakeups of idle workers). Each worker maintains its
300 300 * last known event count, and blocks when a scan for work did not
301 301 * find a task AND its lastEventCount matches the current
302 302 * eventCount. Waiting idle workers are recorded in a variant of
303 303 * Treiber stack headed by field eventWaiters which, when nonzero,
304 304 * encodes the thread index and count awaited for by the worker
305 305 * thread most recently calling eventSync. This thread in turn has
306 306 * a record (field nextEventWaiter) for the next waiting worker.
307 307 * In addition to allowing simpler decisions about need for
308 308 * wakeup, the event count bits in eventWaiters serve the role of
309 309 * tags to avoid ABA errors in Treiber stacks. Upon any wakeup,
310 310 * released threads also try to release at most two others. The
311 311 * net effect is a tree-like diffusion of signals, where released
312 312 * threads (and possibly others) help with unparks. To further
313 313 * reduce contention effects a bit, failed CASes to increment
314 314 * field eventCount are tolerated without retries in signalWork.
315 315 * Conceptually they are merged into the same event, which is OK
316 316 * when their only purpose is to enable workers to scan for work.
317 317 *
318 318 * 5. Managing suspension of extra workers. When a worker notices
319 319 * (usually upon timeout of a wait()) that there are too few
320 320 * running threads, we may create a new thread to maintain
321 321 * parallelism level, or at least avoid starvation. Usually, extra
322 322 * threads are needed for only very short periods, yet join
323 323 * dependencies are such that we sometimes need them in
324 324 * bursts. Rather than create new threads each time this happens,
325 325 * we suspend no-longer-needed extra ones as "spares". For most
326 326 * purposes, we don't distinguish "extra" spare threads from
327 327 * normal "core" threads: On each call to preStep (the only point
328 328 * at which we can do this) a worker checks to see if there are
329 329 * now too many running workers, and if so, suspends itself.
330 330 * Method helpMaintainParallelism looks for suspended threads to
331 331 * resume before considering creating a new replacement. The
332 332 * spares themselves are encoded on another variant of a Treiber
333 333 * Stack, headed at field "spareWaiters". Note that the use of
334 334 * spares is intrinsically racy. One thread may become a spare at
335 335 * about the same time as another is needlessly being created. We
336 336 * counteract this and related slop in part by requiring resumed
337 337 * spares to immediately recheck (in preStep) to see whether they
338 338 * should re-suspend.
339 339 *
340 340 * 6. Killing off unneeded workers. A timeout mechanism is used to
341 341 * shed unused workers: The oldest (first) event queue waiter uses
342 342 * a timed rather than hard wait. When this wait times out without
343 343 * a normal wakeup, it tries to shutdown any one (for convenience
344 344 * the newest) other spare or event waiter via
345 345 * tryShutdownUnusedWorker. This eventually reduces the number of
346 346 * worker threads to a minimum of one after a long enough period
347 347 * without use.
348 348 *
349 349 * 7. Deciding when to create new workers. The main dynamic
350 350 * control in this class is deciding when to create extra threads
351 351 * in method helpMaintainParallelism. We would like to keep
352 352 * exactly #parallelism threads running, which is an impossible
353 353 * task. We always need to create one when the number of running
354 354 * threads would become zero and all workers are busy. Beyond
355 355 * this, we must rely on heuristics that work well in the
356 356 * presence of transient phenomena such as GC stalls, dynamic
357 357 * compilation, and wake-up lags. These transients are extremely
358 358 * common -- we are normally trying to fully saturate the CPUs on
359 359 * a machine, so almost any activity other than running tasks
360 360 * impedes accuracy. Our main defense is to allow parallelism to
361 361 * lapse for a while during joins, and use a timeout to see if,
362 362 * after the resulting settling, there is still a need for
363 363 * additional workers. This also better copes with the fact that
364 364 * some of the methods in this class tend to never become compiled
365 365 * (but are interpreted), so some components of the entire set of
366 366 * controls might execute 100 times faster than others. And
367 367 * similarly for cases where the apparent lack of work is just due
368 368 * to GC stalls and other transient system activity.
369 369 *
370 370 * Beware that there is a lot of representation-level coupling
371 371 * among classes ForkJoinPool, ForkJoinWorkerThread, and
372 372 * ForkJoinTask. For example, direct access to "workers" array by
373 373 * workers, and direct access to ForkJoinTask.status by both
374 374 * ForkJoinPool and ForkJoinWorkerThread. There is little point
375 375 * trying to reduce this, since any associated future changes in
376 376 * representations will need to be accompanied by algorithmic
377 377 * changes anyway.
378 378 *
379 379 * Style notes: There are lots of inline assignments (of form
380 380 * "while ((local = field) != 0)") which are usually the simplest
381 381 * way to ensure the required read orderings (which are sometimes
382 382 * critical). Also several occurrences of the unusual "do {}
383 383 * while (!cas...)" which is the simplest way to force an update of
384 384 * a CAS'ed variable. There are also other coding oddities that
385 385 * help some methods perform reasonably even when interpreted (not
386 386 * compiled), at the expense of some messy constructions that
387 387 * reduce byte code counts.
388 388 *
389 389 * The order of declarations in this file is: (1) statics (2)
390 390 * fields (along with constants used when unpacking some of them)
391 391 * (3) internal control methods (4) callbacks and other support
392 392 * for ForkJoinTask and ForkJoinWorkerThread classes, (5) exported
393 393 * methods (plus a few little helpers).
394 394 */
395 395
396 396 /**
397 397 * Factory for creating new {@link ForkJoinWorkerThread}s.
398 398 * A {@code ForkJoinWorkerThreadFactory} must be defined and used
399 399 * for {@code ForkJoinWorkerThread} subclasses that extend base
400 400 * functionality or initialize threads with different contexts.
401 401 */
402 402 public static interface ForkJoinWorkerThreadFactory {
403 403 /**
404 404 * Returns a new worker thread operating in the given pool.
405 405 *
406 406 * @param pool the pool this thread works in
407 407 * @throws NullPointerException if the pool is null
408 408 */
409 409 public ForkJoinWorkerThread newThread(ForkJoinPool pool);
410 410 }
411 411
412 412 /**
413 413 * Default ForkJoinWorkerThreadFactory implementation; creates a
414 414 * new ForkJoinWorkerThread.
415 415 */
416 416 static class DefaultForkJoinWorkerThreadFactory
417 417 implements ForkJoinWorkerThreadFactory {
418 418 public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
419 419 return new ForkJoinWorkerThread(pool);
420 420 }
421 421 }
422 422
423 423 /**
424 424 * Creates a new ForkJoinWorkerThread. This factory is used unless
425 425 * overridden in ForkJoinPool constructors.
426 426 */
427 427 public static final ForkJoinWorkerThreadFactory
428 428 defaultForkJoinWorkerThreadFactory =
429 429 new DefaultForkJoinWorkerThreadFactory();
430 430
431 431 /**
432 432 * Permission required for callers of methods that may start or
433 433 * kill threads.
434 434 */
435 435 private static final RuntimePermission modifyThreadPermission =
436 436 new RuntimePermission("modifyThread");
437 437
438 438 /**
439 439 * If there is a security manager, makes sure caller has
440 440 * permission to modify threads.
441 441 */
442 442 private static void checkPermission() {
443 443 SecurityManager security = System.getSecurityManager();
444 444 if (security != null)
445 445 security.checkPermission(modifyThreadPermission);
446 446 }
447 447
448 448 /**
449 449 * Generator for assigning sequence numbers as pool names.
450 450 */
451 451 private static final AtomicInteger poolNumberGenerator =
452 452 new AtomicInteger();
453 453
454 454 /**
455 455 * The time to block in a join (see awaitJoin) before checking if
456 456 * a new worker should be (re)started to maintain parallelism
457 457 * level. The value should be short enough to maintain global
458 458 * responsiveness and progress but long enough to avoid
459 459 * counterproductive firings during GC stalls or unrelated system
460 460 * activity, and to not bog down systems with continual re-firings
461 461 * on GCs or legitimately long waits.
462 462 */
463 463 private static final long JOIN_TIMEOUT_MILLIS = 250L; // 4 per second
464 464
465 465 /**
466 466 * The wakeup interval (in nanoseconds) for the oldest worker
467 467 * waiting for an event to invoke tryShutdownUnusedWorker to
468 468 * shrink the number of workers. The exact value does not matter
469 469 * too much. It must be short enough to release resources during
470 470 * sustained periods of idleness, but not so short that threads
471 471 * are continually re-created.
472 472 */
473 473 private static final long SHRINK_RATE_NANOS =
474 474 30L * 1000L * 1000L * 1000L; // 2 per minute
475 475
476 476 /**
477 477 * Absolute bound for parallelism level. Twice this number plus
478 478 * one (i.e., 0xfff) must fit into a 16bit field to enable
479 479 * word-packing for some counts and indices.
480 480 */
481 481 private static final int MAX_WORKERS = 0x7fff;
482 482
483 483 /**
484 484 * Array holding all worker threads in the pool. Array size must
485 485 * be a power of two. Updates and replacements are protected by
486 486 * workerLock, but the array is always kept in a consistent enough
487 487 * state to be randomly accessed without locking by workers
488 488 * performing work-stealing, as well as other traversal-based
489 489 * methods in this class. All readers must tolerate that some
490 490 * array slots may be null.
491 491 */
492 492 volatile ForkJoinWorkerThread[] workers;
493 493
494 494 /**
495 495 * Queue for external submissions.
496 496 */
497 497 private final LinkedTransferQueue<ForkJoinTask<?>> submissionQueue;
498 498
499 499 /**
500 500 * Lock protecting updates to workers array.
501 501 */
502 502 private final ReentrantLock workerLock;
503 503
504 504 /**
505 505 * Latch released upon termination.
506 506 */
507 507 private final Phaser termination;
508 508
509 509 /**
510 510 * Creation factory for worker threads.
511 511 */
512 512 private final ForkJoinWorkerThreadFactory factory;
513 513
514 514 /**
515 515 * Sum of per-thread steal counts, updated only when threads are
516 516 * idle or terminating.
517 517 */
↓ open down ↓ |
517 lines elided |
↑ open up ↑ |
518 518 private volatile long stealCount;
519 519
520 520 /**
521 521 * Encoded record of top of Treiber stack of threads waiting for
522 522 * events. The top 32 bits contain the count being waited for. The
523 523 * bottom 16 bits contains one plus the pool index of waiting
524 524 * worker thread. (Bits 16-31 are unused.)
525 525 */
526 526 private volatile long eventWaiters;
527 527
528 - private static final int EVENT_COUNT_SHIFT = 32;
529 - private static final long WAITER_ID_MASK = (1L << 16) - 1L;
528 + private static final int EVENT_COUNT_SHIFT = 32;
529 + private static final int WAITER_ID_MASK = (1 << 16) - 1;
530 530
531 531 /**
532 532 * A counter for events that may wake up worker threads:
533 533 * - Submission of a new task to the pool
534 534 * - A worker pushing a task on an empty queue
535 535 * - termination
536 536 */
537 537 private volatile int eventCount;
538 538
539 539 /**
540 540 * Encoded record of top of Treiber stack of spare threads waiting
541 541 * for resumption. The top 16 bits contain an arbitrary count to
542 542 * avoid ABA effects. The bottom 16bits contains one plus the pool
543 543 * index of waiting worker thread.
544 544 */
545 545 private volatile int spareWaiters;
546 546
547 547 private static final int SPARE_COUNT_SHIFT = 16;
548 548 private static final int SPARE_ID_MASK = (1 << 16) - 1;
549 549
550 550 /**
551 551 * Lifecycle control. The low word contains the number of workers
552 552 * that are (probably) executing tasks. This value is atomically
553 553 * incremented before a worker gets a task to run, and decremented
554 554 * when a worker has no tasks and cannot find any. Bits 16-18
555 555 * contain runLevel value. When all are zero, the pool is
556 556 * running. Level transitions are monotonic (running -> shutdown
557 557 * -> terminating -> terminated) so each transition adds a bit.
558 558 * These are bundled together to ensure consistent read for
559 559 * termination checks (i.e., that runLevel is at least SHUTDOWN
560 560 * and active threads is zero).
561 561 *
562 562 * Notes: Most direct CASes are dependent on these bitfield
563 563 * positions. Also, this field is non-private to enable direct
564 564 * performance-sensitive CASes in ForkJoinWorkerThread.
565 565 */
566 566 volatile int runState;
567 567
568 568 // Note: The order among run level values matters.
569 569 private static final int RUNLEVEL_SHIFT = 16;
570 570 private static final int SHUTDOWN = 1 << RUNLEVEL_SHIFT;
571 571 private static final int TERMINATING = 1 << (RUNLEVEL_SHIFT + 1);
572 572 private static final int TERMINATED = 1 << (RUNLEVEL_SHIFT + 2);
573 573 private static final int ACTIVE_COUNT_MASK = (1 << RUNLEVEL_SHIFT) - 1;
574 574
575 575 /**
576 576 * Holds number of total (i.e., created and not yet terminated)
577 577 * and running (i.e., not blocked on joins or other managed sync)
578 578 * threads, packed together to ensure consistent snapshot when
579 579 * making decisions about creating and suspending spare
580 580 * threads. Updated only by CAS. Note that adding a new worker
581 581 * requires incrementing both counts, since workers start off in
582 582 * running state.
583 583 */
584 584 private volatile int workerCounts;
585 585
586 586 private static final int TOTAL_COUNT_SHIFT = 16;
587 587 private static final int RUNNING_COUNT_MASK = (1 << TOTAL_COUNT_SHIFT) - 1;
588 588 private static final int ONE_RUNNING = 1;
589 589 private static final int ONE_TOTAL = 1 << TOTAL_COUNT_SHIFT;
590 590
591 591 /**
592 592 * The target parallelism level.
593 593 * Accessed directly by ForkJoinWorkerThreads.
594 594 */
595 595 final int parallelism;
596 596
597 597 /**
598 598 * True if use local fifo, not default lifo, for local polling
599 599 * Read by, and replicated by ForkJoinWorkerThreads
600 600 */
601 601 final boolean locallyFifo;
602 602
603 603 /**
604 604 * The uncaught exception handler used when any worker abruptly
605 605 * terminates.
606 606 */
607 607 private final Thread.UncaughtExceptionHandler ueh;
↓ open down ↓ |
68 lines elided |
↑ open up ↑ |
608 608
609 609 /**
610 610 * Pool number, just for assigning useful names to worker threads
611 611 */
612 612 private final int poolNumber;
613 613
614 614 // Utilities for CASing fields. Note that most of these
615 615 // are usually manually inlined by callers
616 616
617 617 /**
618 - * Increments running count part of workerCounts
618 + * Increments running count part of workerCounts.
619 619 */
620 620 final void incrementRunningCount() {
621 621 int c;
622 622 do {} while (!UNSAFE.compareAndSwapInt(this, workerCountsOffset,
623 623 c = workerCounts,
624 624 c + ONE_RUNNING));
625 625 }
626 626
627 627 /**
628 - * Tries to decrement running count unless already zero
628 + * Tries to increment running count part of workerCounts.
629 629 */
630 + final boolean tryIncrementRunningCount() {
631 + int c;
632 + return UNSAFE.compareAndSwapInt(this, workerCountsOffset,
633 + c = workerCounts,
634 + c + ONE_RUNNING);
635 + }
636 +
637 + /**
638 + * Tries to decrement running count unless already zero.
639 + */
630 640 final boolean tryDecrementRunningCount() {
631 641 int wc = workerCounts;
632 642 if ((wc & RUNNING_COUNT_MASK) == 0)
633 643 return false;
634 644 return UNSAFE.compareAndSwapInt(this, workerCountsOffset,
635 645 wc, wc - ONE_RUNNING);
636 646 }
637 647
638 648 /**
639 649 * Forces decrement of encoded workerCounts, awaiting nonzero if
640 650 * (rarely) necessary when other count updates lag.
641 651 *
642 652 * @param dr -- either zero or ONE_RUNNING
643 653 * @param dt -- either zero or ONE_TOTAL
644 654 */
645 655 private void decrementWorkerCounts(int dr, int dt) {
646 656 for (;;) {
647 657 int wc = workerCounts;
648 658 if ((wc & RUNNING_COUNT_MASK) - dr < 0 ||
649 659 (wc >>> TOTAL_COUNT_SHIFT) - dt < 0) {
650 660 if ((runState & TERMINATED) != 0)
651 661 return; // lagging termination on a backout
652 662 Thread.yield();
653 663 }
654 664 if (UNSAFE.compareAndSwapInt(this, workerCountsOffset,
655 665 wc, wc - (dr + dt)))
656 666 return;
657 667 }
658 668 }
659 669
660 670 /**
661 671 * Tries decrementing active count; fails on contention.
662 672 * Called when workers cannot find tasks to run.
663 673 */
664 674 final boolean tryDecrementActiveCount() {
665 675 int c;
666 676 return UNSAFE.compareAndSwapInt(this, runStateOffset,
667 677 c = runState, c - 1);
668 678 }
669 679
670 680 /**
671 681 * Advances to at least the given level. Returns true if not
672 682 * already in at least the given level.
673 683 */
674 684 private boolean advanceRunLevel(int level) {
675 685 for (;;) {
676 686 int s = runState;
677 687 if ((s & level) != 0)
678 688 return false;
679 689 if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, s | level))
680 690 return true;
681 691 }
682 692 }
683 693
684 694 // workers array maintenance
685 695
686 696 /**
687 697 * Records and returns a workers array index for new worker.
688 698 */
689 699 private int recordWorker(ForkJoinWorkerThread w) {
690 700 // Try using slot totalCount-1. If not available, scan and/or resize
↓ open down ↓ |
51 lines elided |
↑ open up ↑ |
691 701 int k = (workerCounts >>> TOTAL_COUNT_SHIFT) - 1;
692 702 final ReentrantLock lock = this.workerLock;
693 703 lock.lock();
694 704 try {
695 705 ForkJoinWorkerThread[] ws = workers;
696 706 int n = ws.length;
697 707 if (k < 0 || k >= n || ws[k] != null) {
698 708 for (k = 0; k < n && ws[k] != null; ++k)
699 709 ;
700 710 if (k == n)
701 - ws = Arrays.copyOf(ws, n << 1);
711 + ws = workers = Arrays.copyOf(ws, n << 1);
702 712 }
703 713 ws[k] = w;
704 - workers = ws; // volatile array write ensures slot visibility
714 + int c = eventCount; // advance event count to ensure visibility
715 + UNSAFE.compareAndSwapInt(this, eventCountOffset, c, c+1);
705 716 } finally {
706 717 lock.unlock();
707 718 }
708 719 return k;
709 720 }
710 721
711 722 /**
712 723 * Nulls out record of worker in workers array.
713 724 */
714 725 private void forgetWorker(ForkJoinWorkerThread w) {
715 726 int idx = w.poolIndex;
716 727 // Locking helps method recordWorker avoid unnecessary expansion
717 728 final ReentrantLock lock = this.workerLock;
718 729 lock.lock();
719 730 try {
720 731 ForkJoinWorkerThread[] ws = workers;
721 732 if (idx >= 0 && idx < ws.length && ws[idx] == w) // verify
722 733 ws[idx] = null;
723 734 } finally {
724 735 lock.unlock();
725 736 }
726 737 }
↓ open down ↓ |
12 lines elided |
↑ open up ↑ |
727 738
728 739 /**
729 740 * Final callback from terminating worker. Removes record of
730 741 * worker from array, and adjusts counts. If pool is shutting
731 742 * down, tries to complete termination.
732 743 *
733 744 * @param w the worker
734 745 */
735 746 final void workerTerminated(ForkJoinWorkerThread w) {
736 747 forgetWorker(w);
737 - decrementWorkerCounts(w.isTrimmed()? 0 : ONE_RUNNING, ONE_TOTAL);
748 + decrementWorkerCounts(w.isTrimmed() ? 0 : ONE_RUNNING, ONE_TOTAL);
738 749 while (w.stealCount != 0) // collect final count
739 750 tryAccumulateStealCount(w);
740 751 tryTerminate(false);
741 752 }
742 753
743 754 // Waiting for and signalling events
744 755
745 756 /**
746 757 * Releases workers blocked on a count not equal to current count.
747 758 * Normally called after precheck that eventWaiters isn't zero to
748 759 * avoid wasted array checks. Gives up upon a change in count or
749 - * upon releasing two workers, letting others take over.
760 + * upon releasing four workers, letting others take over.
750 761 */
751 762 private void releaseEventWaiters() {
752 763 ForkJoinWorkerThread[] ws = workers;
753 764 int n = ws.length;
754 765 long h = eventWaiters;
755 766 int ec = eventCount;
756 - boolean releasedOne = false;
767 + int releases = 4;
757 768 ForkJoinWorkerThread w; int id;
758 - while ((id = ((int)(h & WAITER_ID_MASK)) - 1) >= 0 &&
769 + while ((id = (((int)h) & WAITER_ID_MASK) - 1) >= 0 &&
759 770 (int)(h >>> EVENT_COUNT_SHIFT) != ec &&
760 771 id < n && (w = ws[id]) != null) {
761 772 if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
762 773 h, w.nextWaiter)) {
763 774 LockSupport.unpark(w);
764 - if (releasedOne) // exit on second release
775 + if (--releases == 0)
765 776 break;
766 - releasedOne = true;
767 777 }
768 778 if (eventCount != ec)
769 779 break;
770 780 h = eventWaiters;
771 781 }
772 782 }
773 783
774 784 /**
775 785 * Tries to advance eventCount and releases waiters. Called only
776 786 * from workers.
777 787 */
778 788 final void signalWork() {
779 789 int c; // try to increment event count -- CAS failure OK
780 790 UNSAFE.compareAndSwapInt(this, eventCountOffset, c = eventCount, c+1);
781 791 if (eventWaiters != 0L)
782 792 releaseEventWaiters();
783 793 }
784 794
785 795 /**
↓ open down ↓ |
9 lines elided |
↑ open up ↑ |
786 796 * Adds the given worker to event queue and blocks until
787 797 * terminating or event count advances from the given value
788 798 *
789 799 * @param w the calling worker thread
790 800 * @param ec the count
791 801 */
792 802 private void eventSync(ForkJoinWorkerThread w, int ec) {
793 803 long nh = (((long)ec) << EVENT_COUNT_SHIFT) | ((long)(w.poolIndex+1));
794 804 long h;
795 805 while ((runState < SHUTDOWN || !tryTerminate(false)) &&
796 - (((int)((h = eventWaiters) & WAITER_ID_MASK)) == 0 ||
806 + (((int)(h = eventWaiters) & WAITER_ID_MASK) == 0 ||
797 807 (int)(h >>> EVENT_COUNT_SHIFT) == ec) &&
798 808 eventCount == ec) {
799 809 if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
800 810 w.nextWaiter = h, nh)) {
801 811 awaitEvent(w, ec);
802 812 break;
803 813 }
804 814 }
805 815 }
806 816
807 817 /**
808 818 * Blocks the given worker (that has already been entered as an
809 819 * event waiter) until terminating or event count advances from
810 820 * the given value. The oldest (first) waiter uses a timed wait to
811 821 * occasionally one-by-one shrink the number of workers (to a
812 822 * minimum of one) if the pool has not been used for extended
↓ open down ↓ |
6 lines elided |
↑ open up ↑ |
813 823 * periods.
814 824 *
815 825 * @param w the calling worker thread
816 826 * @param ec the count
817 827 */
818 828 private void awaitEvent(ForkJoinWorkerThread w, int ec) {
819 829 while (eventCount == ec) {
820 830 if (tryAccumulateStealCount(w)) { // transfer while idle
821 831 boolean untimed = (w.nextWaiter != 0L ||
822 832 (workerCounts & RUNNING_COUNT_MASK) <= 1);
823 - long startTime = untimed? 0 : System.nanoTime();
833 + long startTime = untimed ? 0 : System.nanoTime();
824 834 Thread.interrupted(); // clear/ignore interrupt
825 - if (eventCount != ec || w.isTerminating())
835 + if (w.isTerminating() || eventCount != ec)
826 836 break; // recheck after clear
827 837 if (untimed)
828 838 LockSupport.park(w);
829 839 else {
830 840 LockSupport.parkNanos(w, SHRINK_RATE_NANOS);
831 841 if (eventCount != ec || w.isTerminating())
832 842 break;
833 843 if (System.nanoTime() - startTime >= SHRINK_RATE_NANOS)
834 844 tryShutdownUnusedWorker(ec);
835 845 }
836 846 }
837 847 }
838 848 }
839 849
840 850 // Maintaining parallelism
841 851
842 852 /**
843 853 * Pushes worker onto the spare stack.
844 854 */
845 855 final void pushSpare(ForkJoinWorkerThread w) {
846 856 int ns = (++w.spareCount << SPARE_COUNT_SHIFT) | (w.poolIndex + 1);
847 857 do {} while (!UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
848 858 w.nextSpare = spareWaiters,ns));
849 859 }
850 860
851 861 /**
852 862 * Tries (once) to resume a spare if the number of running
↓ open down ↓ |
17 lines elided |
↑ open up ↑ |
853 863 * threads is less than target.
854 864 */
855 865 private void tryResumeSpare() {
856 866 int sw, id;
857 867 ForkJoinWorkerThread[] ws = workers;
858 868 int n = ws.length;
859 869 ForkJoinWorkerThread w;
860 870 if ((sw = spareWaiters) != 0 &&
861 871 (id = (sw & SPARE_ID_MASK) - 1) >= 0 &&
862 872 id < n && (w = ws[id]) != null &&
863 - (workerCounts & RUNNING_COUNT_MASK) < parallelism &&
873 + (runState >= TERMINATING ||
874 + (workerCounts & RUNNING_COUNT_MASK) < parallelism) &&
864 875 spareWaiters == sw &&
865 876 UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
866 877 sw, w.nextSpare)) {
867 878 int c; // increment running count before resume
868 879 do {} while (!UNSAFE.compareAndSwapInt
869 880 (this, workerCountsOffset,
870 881 c = workerCounts, c + ONE_RUNNING));
871 882 if (w.tryUnsuspend())
872 883 LockSupport.unpark(w);
873 884 else // back out if w was shutdown
874 885 decrementWorkerCounts(ONE_RUNNING, 0);
875 886 }
876 887 }
877 888
878 889 /**
879 890 * Tries to increase the number of running workers if below target
880 891 * parallelism: If a spare exists tries to resume it via
881 892 * tryResumeSpare. Otherwise, if not enough total workers or all
882 893 * existing workers are busy, adds a new worker. In all cases also
883 894 * helps wake up releasable workers waiting for work.
884 895 */
885 896 private void helpMaintainParallelism() {
886 897 int pc = parallelism;
887 898 int wc, rs, tc;
888 899 while (((wc = workerCounts) & RUNNING_COUNT_MASK) < pc &&
889 900 (rs = runState) < TERMINATING) {
890 901 if (spareWaiters != 0)
891 902 tryResumeSpare();
892 903 else if ((tc = wc >>> TOTAL_COUNT_SHIFT) >= MAX_WORKERS ||
893 904 (tc >= pc && (rs & ACTIVE_COUNT_MASK) != tc))
894 905 break; // enough total
895 906 else if (runState == rs && workerCounts == wc &&
896 907 UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
897 908 wc + (ONE_RUNNING|ONE_TOTAL))) {
898 909 ForkJoinWorkerThread w = null;
899 910 Throwable fail = null;
900 911 try {
901 912 w = factory.newThread(this);
902 913 } catch (Throwable ex) {
903 914 fail = ex;
904 915 }
905 916 if (w == null) { // null or exceptional factory return
906 917 decrementWorkerCounts(ONE_RUNNING, ONE_TOTAL);
↓ open down ↓ |
33 lines elided |
↑ open up ↑ |
907 918 tryTerminate(false); // handle failure during shutdown
908 919 // If originating from an external caller,
909 920 // propagate exception, else ignore
910 921 if (fail != null && runState < TERMINATING &&
911 922 !(Thread.currentThread() instanceof
912 923 ForkJoinWorkerThread))
913 924 UNSAFE.throwException(fail);
914 925 break;
915 926 }
916 927 w.start(recordWorker(w), ueh);
917 - if ((workerCounts >>> TOTAL_COUNT_SHIFT) >= pc) {
918 - int c; // advance event count
919 - UNSAFE.compareAndSwapInt(this, eventCountOffset,
920 - c = eventCount, c+1);
928 + if ((workerCounts >>> TOTAL_COUNT_SHIFT) >= pc)
921 929 break; // add at most one unless total below target
922 - }
923 930 }
924 931 }
925 932 if (eventWaiters != 0L)
926 933 releaseEventWaiters();
927 934 }
928 935
929 936 /**
930 937 * Callback from the oldest waiter in awaitEvent waking up after a
931 938 * period of non-use. If all workers are idle, tries (once) to
932 939 * shutdown an event waiter or a spare, if one exists. Note that
933 940 * we don't need CAS or locks here because the method is called
934 941 * only from one thread occasionally waking (and even misfires are
935 942 * OK). Note that until the shutdown worker fully terminates,
936 943 * workerCounts will overestimate total count, which is tolerable.
937 944 *
938 945 * @param ec the event count waited on by caller (to abort
939 946 * attempt if count has since changed).
940 947 */
941 948 private void tryShutdownUnusedWorker(int ec) {
942 949 if (runState == 0 && eventCount == ec) { // only trigger if all idle
943 950 ForkJoinWorkerThread[] ws = workers;
944 951 int n = ws.length;
945 952 ForkJoinWorkerThread w = null;
946 953 boolean shutdown = false;
947 954 int sw;
↓ open down ↓ |
15 lines elided |
↑ open up ↑ |
948 955 long h;
949 956 if ((sw = spareWaiters) != 0) { // prefer killing spares
950 957 int id = (sw & SPARE_ID_MASK) - 1;
951 958 if (id >= 0 && id < n && (w = ws[id]) != null &&
952 959 UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
953 960 sw, w.nextSpare))
954 961 shutdown = true;
955 962 }
956 963 else if ((h = eventWaiters) != 0L) {
957 964 long nh;
958 - int id = ((int)(h & WAITER_ID_MASK)) - 1;
965 + int id = (((int)h) & WAITER_ID_MASK) - 1;
959 966 if (id >= 0 && id < n && (w = ws[id]) != null &&
960 967 (nh = w.nextWaiter) != 0L && // keep at least one worker
961 968 UNSAFE.compareAndSwapLong(this, eventWaitersOffset, h, nh))
962 969 shutdown = true;
963 970 }
964 971 if (w != null && shutdown) {
965 972 w.shutdown();
966 973 LockSupport.unpark(w);
967 974 }
968 975 }
969 976 releaseEventWaiters(); // in case of interference
970 977 }
971 978
972 979 /**
973 980 * Callback from workers invoked upon each top-level action (i.e.,
974 981 * stealing a task or taking a submission and running it).
975 982 * Performs one or more of the following:
976 983 *
977 984 * 1. If the worker is active and either did not run a task
978 985 * or there are too many workers, try to set its active status
979 986 * to inactive and update activeCount. On contention, we may
980 987 * try again in this or a subsequent call.
981 988 *
982 989 * 2. If not enough total workers, help create some.
983 990 *
984 991 * 3. If there are too many running workers, suspend this worker
985 992 * (first forcing inactive if necessary). If it is not needed,
986 993 * it may be shutdown while suspended (via
987 994 * tryShutdownUnusedWorker). Otherwise, upon resume it
988 995 * rechecks running thread count and need for event sync.
989 996 *
990 997 * 4. If worker did not run a task, await the next task event via
991 998 * eventSync if necessary (first forcing inactivation), upon
992 999 * which the worker may be shutdown via
993 1000 * tryShutdownUnusedWorker. Otherwise, help release any
994 1001 * existing event waiters that are now releasable,
995 1002 *
↓ open down ↓ |
27 lines elided |
↑ open up ↑ |
996 1003 * @param w the worker
997 1004 * @param ran true if worker ran a task since last call to this method
998 1005 */
999 1006 final void preStep(ForkJoinWorkerThread w, boolean ran) {
1000 1007 int wec = w.lastEventCount;
1001 1008 boolean active = w.active;
1002 1009 boolean inactivate = false;
1003 1010 int pc = parallelism;
1004 1011 while (w.runState == 0) {
1005 1012 int rs = runState;
1006 - if (rs >= TERMINATING) { // propagate shutdown
1013 + if (rs >= TERMINATING) { // propagate shutdown
1007 1014 w.shutdown();
1008 1015 break;
1009 1016 }
1010 1017 if ((inactivate || (active && (rs & ACTIVE_COUNT_MASK) >= pc)) &&
1011 - UNSAFE.compareAndSwapInt(this, runStateOffset, rs, rs - 1))
1018 + UNSAFE.compareAndSwapInt(this, runStateOffset, rs, --rs)) {
1012 1019 inactivate = active = w.active = false;
1013 - int wc = workerCounts;
1020 + if (rs == SHUTDOWN) { // all inactive and shut down
1021 + tryTerminate(false);
1022 + continue;
1023 + }
1024 + }
1025 + int wc = workerCounts; // try to suspend as spare
1014 1026 if ((wc & RUNNING_COUNT_MASK) > pc) {
1015 1027 if (!(inactivate |= active) && // must inactivate to suspend
1016 - workerCounts == wc && // try to suspend as spare
1028 + workerCounts == wc &&
1017 1029 UNSAFE.compareAndSwapInt(this, workerCountsOffset,
1018 1030 wc, wc - ONE_RUNNING))
1019 1031 w.suspendAsSpare();
1020 1032 }
1021 1033 else if ((wc >>> TOTAL_COUNT_SHIFT) < pc)
1022 1034 helpMaintainParallelism(); // not enough workers
1023 - else if (!ran) {
1035 + else if (ran)
1036 + break;
1037 + else {
1024 1038 long h = eventWaiters;
1025 1039 int ec = eventCount;
1026 1040 if (h != 0L && (int)(h >>> EVENT_COUNT_SHIFT) != ec)
1027 1041 releaseEventWaiters(); // release others before waiting
1028 1042 else if (ec != wec) {
1029 1043 w.lastEventCount = ec; // no need to wait
1030 1044 break;
1031 1045 }
1032 1046 else if (!(inactivate |= active))
1033 1047 eventSync(w, wec); // must inactivate before sync
1034 1048 }
1035 - else
1036 - break;
1037 1049 }
1038 1050 }
1039 1051
1040 1052 /**
1041 1053 * Helps and/or blocks awaiting join of the given task.
1042 1054 * See above for explanation.
1043 1055 *
1044 1056 * @param joinMe the task to join
1045 1057 * @param worker the current worker thread
1058 + * @param timed true if wait should time out
1059 + * @param nanos timeout value if timed
1046 1060 */
1047 - final void awaitJoin(ForkJoinTask<?> joinMe, ForkJoinWorkerThread worker) {
1061 + final void awaitJoin(ForkJoinTask<?> joinMe, ForkJoinWorkerThread worker,
1062 + boolean timed, long nanos) {
1063 + long startTime = timed ? System.nanoTime() : 0L;
1048 1064 int retries = 2 + (parallelism >> 2); // #helpJoins before blocking
1065 + boolean running = true; // false when count decremented
1049 1066 while (joinMe.status >= 0) {
1050 - int wc;
1051 - worker.helpJoinTask(joinMe);
1067 + if (runState >= TERMINATING) {
1068 + joinMe.cancelIgnoringExceptions();
1069 + break;
1070 + }
1071 + running = worker.helpJoinTask(joinMe, running);
1052 1072 if (joinMe.status < 0)
1053 1073 break;
1054 - else if (retries > 0)
1074 + if (retries > 0) {
1055 1075 --retries;
1056 - else if (((wc = workerCounts) & RUNNING_COUNT_MASK) != 0 &&
1057 - UNSAFE.compareAndSwapInt(this, workerCountsOffset,
1058 - wc, wc - ONE_RUNNING)) {
1059 - int stat, c; long h;
1060 - while ((stat = joinMe.status) >= 0 &&
1061 - (h = eventWaiters) != 0L && // help release others
1062 - (int)(h >>> EVENT_COUNT_SHIFT) != eventCount)
1076 + continue;
1077 + }
1078 + int wc = workerCounts;
1079 + if ((wc & RUNNING_COUNT_MASK) != 0) {
1080 + if (running) {
1081 + if (!UNSAFE.compareAndSwapInt(this, workerCountsOffset,
1082 + wc, wc - ONE_RUNNING))
1083 + continue;
1084 + running = false;
1085 + }
1086 + long h = eventWaiters;
1087 + if (h != 0L && (int)(h >>> EVENT_COUNT_SHIFT) != eventCount)
1063 1088 releaseEventWaiters();
1064 - if (stat >= 0 &&
1065 - ((workerCounts & RUNNING_COUNT_MASK) == 0 ||
1066 - (stat =
1067 - joinMe.internalAwaitDone(JOIN_TIMEOUT_MILLIS)) >= 0))
1068 - helpMaintainParallelism(); // timeout or no running workers
1069 - do {} while (!UNSAFE.compareAndSwapInt
1070 - (this, workerCountsOffset,
1071 - c = workerCounts, c + ONE_RUNNING));
1072 - if (stat < 0)
1073 - break; // else restart
1089 + if ((workerCounts & RUNNING_COUNT_MASK) != 0) {
1090 + long ms; int ns;
1091 + if (!timed) {
1092 + ms = JOIN_TIMEOUT_MILLIS;
1093 + ns = 0;
1094 + }
1095 + else { // at most JOIN_TIMEOUT_MILLIS per wait
1096 + long nt = nanos - (System.nanoTime() - startTime);
1097 + if (nt <= 0L)
1098 + break;
1099 + ms = nt / 1000000;
1100 + if (ms > JOIN_TIMEOUT_MILLIS) {
1101 + ms = JOIN_TIMEOUT_MILLIS;
1102 + ns = 0;
1103 + }
1104 + else
1105 + ns = (int) (nt % 1000000);
1106 + }
1107 + joinMe.internalAwaitDone(ms, ns);
1108 + }
1109 + if (joinMe.status < 0)
1110 + break;
1074 1111 }
1112 + helpMaintainParallelism();
1075 1113 }
1114 + if (!running) {
1115 + int c;
1116 + do {} while (!UNSAFE.compareAndSwapInt
1117 + (this, workerCountsOffset,
1118 + c = workerCounts, c + ONE_RUNNING));
1119 + }
1076 1120 }
1077 1121
1078 1122 /**
1079 1123 * Same idea as awaitJoin, but no helping, retries, or timeouts.
1080 1124 */
1081 1125 final void awaitBlocker(ManagedBlocker blocker)
1082 1126 throws InterruptedException {
1083 1127 while (!blocker.isReleasable()) {
1084 1128 int wc = workerCounts;
1085 - if ((wc & RUNNING_COUNT_MASK) != 0 &&
1086 - UNSAFE.compareAndSwapInt(this, workerCountsOffset,
1087 - wc, wc - ONE_RUNNING)) {
1129 + if ((wc & RUNNING_COUNT_MASK) == 0)
1130 + helpMaintainParallelism();
1131 + else if (UNSAFE.compareAndSwapInt(this, workerCountsOffset,
1132 + wc, wc - ONE_RUNNING)) {
1088 1133 try {
1089 1134 while (!blocker.isReleasable()) {
1090 1135 long h = eventWaiters;
1091 1136 if (h != 0L &&
1092 1137 (int)(h >>> EVENT_COUNT_SHIFT) != eventCount)
1093 1138 releaseEventWaiters();
1094 1139 else if ((workerCounts & RUNNING_COUNT_MASK) == 0 &&
1095 1140 runState < TERMINATING)
1096 1141 helpMaintainParallelism();
1097 1142 else if (blocker.block())
1098 1143 break;
1099 1144 }
1100 1145 } finally {
1101 1146 int c;
1102 1147 do {} while (!UNSAFE.compareAndSwapInt
1103 1148 (this, workerCountsOffset,
1104 1149 c = workerCounts, c + ONE_RUNNING));
1105 1150 }
1106 1151 break;
1107 1152 }
1108 1153 }
1109 1154 }
1110 1155
1111 1156 /**
1112 1157 * Possibly initiates and/or completes termination.
1113 1158 *
1114 1159 * @param now if true, unconditionally terminate, else only
1115 1160 * if shutdown and empty queue and no active workers
1116 1161 * @return true if now terminating or terminated
1117 1162 */
1118 1163 private boolean tryTerminate(boolean now) {
1119 1164 if (now)
1120 1165 advanceRunLevel(SHUTDOWN); // ensure at least SHUTDOWN
1121 1166 else if (runState < SHUTDOWN ||
↓ open down ↓ |
24 lines elided |
↑ open up ↑ |
1122 1167 !submissionQueue.isEmpty() ||
1123 1168 (runState & ACTIVE_COUNT_MASK) != 0)
1124 1169 return false;
1125 1170
1126 1171 if (advanceRunLevel(TERMINATING))
1127 1172 startTerminating();
1128 1173
1129 1174 // Finish now if all threads terminated; else in some subsequent call
1130 1175 if ((workerCounts >>> TOTAL_COUNT_SHIFT) == 0) {
1131 1176 advanceRunLevel(TERMINATED);
1132 - termination.arrive();
1177 + termination.forceTermination();
1133 1178 }
1134 1179 return true;
1135 1180 }
1136 1181
1137 -
1138 1182 /**
1139 1183 * Actions on transition to TERMINATING
1140 1184 *
1141 1185 * Runs up to four passes through workers: (0) shutting down each
1142 1186 * (without waking up if parked) to quickly spread notifications
1143 1187 * without unnecessary bouncing around event queues etc (1) wake
1144 1188 * up and help cancel tasks (2) interrupt (3) mop up races with
1145 1189 * interrupted workers
1146 1190 */
1147 1191 private void startTerminating() {
1148 1192 cancelSubmissions();
1149 1193 for (int passes = 0; passes < 4 && workerCounts != 0; ++passes) {
1150 1194 int c; // advance event count
1151 1195 UNSAFE.compareAndSwapInt(this, eventCountOffset,
1152 1196 c = eventCount, c+1);
1153 1197 eventWaiters = 0L; // clobber lists
1154 1198 spareWaiters = 0;
1155 1199 for (ForkJoinWorkerThread w : workers) {
1156 1200 if (w != null) {
1157 1201 w.shutdown();
1158 1202 if (passes > 0 && !w.isTerminated()) {
1159 1203 w.cancelTasks();
1160 1204 LockSupport.unpark(w);
1161 1205 if (passes > 1 && !w.isInterrupted()) {
1162 1206 try {
1163 1207 w.interrupt();
1164 1208 } catch (SecurityException ignore) {
1165 1209 }
1166 1210 }
1167 1211 }
1168 1212 }
1169 1213 }
1170 1214 }
1171 1215 }
1172 1216
1173 1217 /**
1174 1218 * Clears out and cancels submissions, ignoring exceptions.
1175 1219 */
1176 1220 private void cancelSubmissions() {
1177 1221 ForkJoinTask<?> task;
1178 1222 while ((task = submissionQueue.poll()) != null) {
1179 1223 try {
1180 1224 task.cancel(false);
1181 1225 } catch (Throwable ignore) {
1182 1226 }
1183 1227 }
1184 1228 }
1185 1229
1186 1230 // misc support for ForkJoinWorkerThread
1187 1231
1188 1232 /**
1189 1233 * Returns pool number.
1190 1234 */
1191 1235 final int getPoolNumber() {
1192 1236 return poolNumber;
1193 1237 }
1194 1238
1195 1239 /**
1196 1240 * Tries to accumulate steal count from a worker, clearing
1197 1241 * the worker's value if successful.
1198 1242 *
1199 1243 * @return true if worker steal count now zero
1200 1244 */
1201 1245 final boolean tryAccumulateStealCount(ForkJoinWorkerThread w) {
1202 1246 int sc = w.stealCount;
1203 1247 long c = stealCount;
1204 1248 // CAS even if zero, for fence effects
1205 1249 if (UNSAFE.compareAndSwapLong(this, stealCountOffset, c, c + sc)) {
1206 1250 if (sc != 0)
1207 1251 w.stealCount = 0;
1208 1252 return true;
1209 1253 }
1210 1254 return sc == 0;
1211 1255 }
1212 1256
1213 1257 /**
1214 1258 * Returns the approximate (non-atomic) number of idle threads per
1215 1259 * active thread.
1216 1260 */
1217 1261 final int idlePerActive() {
1218 1262 int pc = parallelism; // use parallelism, not rc
1219 1263 int ac = runState; // no mask -- artificially boosts during shutdown
1220 1264 // Use exact results for small values, saturate past 4
1221 1265 return ((pc <= ac) ? 0 :
1222 1266 (pc >>> 1 <= ac) ? 1 :
1223 1267 (pc >>> 2 <= ac) ? 3 :
1224 1268 pc >>> 3);
1225 1269 }
1226 1270
1227 1271 // Public and protected methods
1228 1272
1229 1273 // Constructors
1230 1274
1231 1275 /**
1232 1276 * Creates a {@code ForkJoinPool} with parallelism equal to {@link
1233 1277 * java.lang.Runtime#availableProcessors}, using the {@linkplain
1234 1278 * #defaultForkJoinWorkerThreadFactory default thread factory},
1235 1279 * no UncaughtExceptionHandler, and non-async LIFO processing mode.
1236 1280 *
1237 1281 * @throws SecurityException if a security manager exists and
1238 1282 * the caller is not permitted to modify threads
1239 1283 * because it does not hold {@link
1240 1284 * java.lang.RuntimePermission}{@code ("modifyThread")}
1241 1285 */
1242 1286 public ForkJoinPool() {
1243 1287 this(Runtime.getRuntime().availableProcessors(),
1244 1288 defaultForkJoinWorkerThreadFactory, null, false);
1245 1289 }
1246 1290
1247 1291 /**
1248 1292 * Creates a {@code ForkJoinPool} with the indicated parallelism
1249 1293 * level, the {@linkplain
1250 1294 * #defaultForkJoinWorkerThreadFactory default thread factory},
1251 1295 * no UncaughtExceptionHandler, and non-async LIFO processing mode.
1252 1296 *
1253 1297 * @param parallelism the parallelism level
1254 1298 * @throws IllegalArgumentException if parallelism less than or
1255 1299 * equal to zero, or greater than implementation limit
1256 1300 * @throws SecurityException if a security manager exists and
1257 1301 * the caller is not permitted to modify threads
1258 1302 * because it does not hold {@link
1259 1303 * java.lang.RuntimePermission}{@code ("modifyThread")}
1260 1304 */
1261 1305 public ForkJoinPool(int parallelism) {
1262 1306 this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
1263 1307 }
1264 1308
1265 1309 /**
1266 1310 * Creates a {@code ForkJoinPool} with the given parameters.
1267 1311 *
1268 1312 * @param parallelism the parallelism level. For default value,
1269 1313 * use {@link java.lang.Runtime#availableProcessors}.
1270 1314 * @param factory the factory for creating new threads. For default value,
1271 1315 * use {@link #defaultForkJoinWorkerThreadFactory}.
1272 1316 * @param handler the handler for internal worker threads that
1273 1317 * terminate due to unrecoverable errors encountered while executing
1274 1318 * tasks. For default value, use {@code null}.
1275 1319 * @param asyncMode if true,
1276 1320 * establishes local first-in-first-out scheduling mode for forked
1277 1321 * tasks that are never joined. This mode may be more appropriate
1278 1322 * than default locally stack-based mode in applications in which
1279 1323 * worker threads only process event-style asynchronous tasks.
1280 1324 * For default value, use {@code false}.
1281 1325 * @throws IllegalArgumentException if parallelism less than or
1282 1326 * equal to zero, or greater than implementation limit
1283 1327 * @throws NullPointerException if the factory is null
1284 1328 * @throws SecurityException if a security manager exists and
1285 1329 * the caller is not permitted to modify threads
1286 1330 * because it does not hold {@link
1287 1331 * java.lang.RuntimePermission}{@code ("modifyThread")}
1288 1332 */
1289 1333 public ForkJoinPool(int parallelism,
1290 1334 ForkJoinWorkerThreadFactory factory,
1291 1335 Thread.UncaughtExceptionHandler handler,
1292 1336 boolean asyncMode) {
1293 1337 checkPermission();
1294 1338 if (factory == null)
1295 1339 throw new NullPointerException();
1296 1340 if (parallelism <= 0 || parallelism > MAX_WORKERS)
1297 1341 throw new IllegalArgumentException();
1298 1342 this.parallelism = parallelism;
1299 1343 this.factory = factory;
1300 1344 this.ueh = handler;
1301 1345 this.locallyFifo = asyncMode;
1302 1346 int arraySize = initialArraySizeFor(parallelism);
1303 1347 this.workers = new ForkJoinWorkerThread[arraySize];
1304 1348 this.submissionQueue = new LinkedTransferQueue<ForkJoinTask<?>>();
1305 1349 this.workerLock = new ReentrantLock();
1306 1350 this.termination = new Phaser(1);
1307 1351 this.poolNumber = poolNumberGenerator.incrementAndGet();
1308 1352 }
1309 1353
1310 1354 /**
1311 1355 * Returns initial power of two size for workers array.
1312 1356 * @param pc the initial parallelism level
1313 1357 */
1314 1358 private static int initialArraySizeFor(int pc) {
1315 1359 // If possible, initially allocate enough space for one spare
1316 1360 int size = pc < MAX_WORKERS ? pc + 1 : MAX_WORKERS;
1317 1361 // See Hackers Delight, sec 3.2. We know MAX_WORKERS < (1 >>> 16)
↓ open down ↓ |
170 lines elided |
↑ open up ↑ |
1318 1362 size |= size >>> 1;
1319 1363 size |= size >>> 2;
1320 1364 size |= size >>> 4;
1321 1365 size |= size >>> 8;
1322 1366 return size + 1;
1323 1367 }
1324 1368
1325 1369 // Execution methods
1326 1370
1327 1371 /**
1328 - * Common code for execute, invoke and submit
1372 + * Submits task and creates, starts, or resumes some workers if necessary
1329 1373 */
1330 1374 private <T> void doSubmit(ForkJoinTask<T> task) {
1331 - if (task == null)
1332 - throw new NullPointerException();
1333 - if (runState >= SHUTDOWN)
1334 - throw new RejectedExecutionException();
1335 1375 submissionQueue.offer(task);
1336 1376 int c; // try to increment event count -- CAS failure OK
1337 1377 UNSAFE.compareAndSwapInt(this, eventCountOffset, c = eventCount, c+1);
1338 - helpMaintainParallelism(); // create, start, or resume some workers
1378 + helpMaintainParallelism();
1339 1379 }
1340 1380
1341 1381 /**
1342 1382 * Performs the given task, returning its result upon completion.
1343 1383 *
1344 1384 * @param task the task
1345 1385 * @return the task's result
1346 1386 * @throws NullPointerException if the task is null
1347 1387 * @throws RejectedExecutionException if the task cannot be
1348 1388 * scheduled for execution
1349 1389 */
1350 1390 public <T> T invoke(ForkJoinTask<T> task) {
1351 - doSubmit(task);
1352 - return task.join();
1391 + if (task == null)
1392 + throw new NullPointerException();
1393 + if (runState >= SHUTDOWN)
1394 + throw new RejectedExecutionException();
1395 + Thread t = Thread.currentThread();
1396 + if ((t instanceof ForkJoinWorkerThread) &&
1397 + ((ForkJoinWorkerThread)t).pool == this)
1398 + return task.invoke(); // bypass submit if in same pool
1399 + else {
1400 + doSubmit(task);
1401 + return task.join();
1402 + }
1353 1403 }
1354 1404
1355 1405 /**
1406 + * Unless terminating, forks task if within an ongoing FJ
1407 + * computation in the current pool, else submits as external task.
1408 + */
1409 + private <T> void forkOrSubmit(ForkJoinTask<T> task) {
1410 + if (runState >= SHUTDOWN)
1411 + throw new RejectedExecutionException();
1412 + Thread t = Thread.currentThread();
1413 + if ((t instanceof ForkJoinWorkerThread) &&
1414 + ((ForkJoinWorkerThread)t).pool == this)
1415 + task.fork();
1416 + else
1417 + doSubmit(task);
1418 + }
1419 +
1420 + /**
1356 1421 * Arranges for (asynchronous) execution of the given task.
1357 1422 *
1358 1423 * @param task the task
1359 1424 * @throws NullPointerException if the task is null
1360 1425 * @throws RejectedExecutionException if the task cannot be
1361 1426 * scheduled for execution
1362 1427 */
1363 1428 public void execute(ForkJoinTask<?> task) {
1364 - doSubmit(task);
1429 + if (task == null)
1430 + throw new NullPointerException();
1431 + forkOrSubmit(task);
1365 1432 }
1366 1433
1367 1434 // AbstractExecutorService methods
1368 1435
1369 1436 /**
1370 1437 * @throws NullPointerException if the task is null
1371 1438 * @throws RejectedExecutionException if the task cannot be
1372 1439 * scheduled for execution
1373 1440 */
1374 1441 public void execute(Runnable task) {
1442 + if (task == null)
1443 + throw new NullPointerException();
1375 1444 ForkJoinTask<?> job;
1376 1445 if (task instanceof ForkJoinTask<?>) // avoid re-wrap
1377 1446 job = (ForkJoinTask<?>) task;
1378 1447 else
1379 1448 job = ForkJoinTask.adapt(task, null);
1380 - doSubmit(job);
1449 + forkOrSubmit(job);
1381 1450 }
1382 1451
1383 1452 /**
1384 1453 * Submits a ForkJoinTask for execution.
1385 1454 *
1386 1455 * @param task the task to submit
1387 1456 * @return the task
1388 1457 * @throws NullPointerException if the task is null
1389 1458 * @throws RejectedExecutionException if the task cannot be
1390 1459 * scheduled for execution
1391 1460 */
1392 1461 public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
1393 - doSubmit(task);
1462 + if (task == null)
1463 + throw new NullPointerException();
1464 + forkOrSubmit(task);
1394 1465 return task;
1395 1466 }
1396 1467
1397 1468 /**
1398 1469 * @throws NullPointerException if the task is null
1399 1470 * @throws RejectedExecutionException if the task cannot be
1400 1471 * scheduled for execution
1401 1472 */
1402 1473 public <T> ForkJoinTask<T> submit(Callable<T> task) {
1474 + if (task == null)
1475 + throw new NullPointerException();
1403 1476 ForkJoinTask<T> job = ForkJoinTask.adapt(task);
1404 - doSubmit(job);
1477 + forkOrSubmit(job);
1405 1478 return job;
1406 1479 }
1407 1480
1408 1481 /**
1409 1482 * @throws NullPointerException if the task is null
1410 1483 * @throws RejectedExecutionException if the task cannot be
1411 1484 * scheduled for execution
1412 1485 */
1413 1486 public <T> ForkJoinTask<T> submit(Runnable task, T result) {
1487 + if (task == null)
1488 + throw new NullPointerException();
1414 1489 ForkJoinTask<T> job = ForkJoinTask.adapt(task, result);
1415 - doSubmit(job);
1490 + forkOrSubmit(job);
1416 1491 return job;
1417 1492 }
1418 1493
1419 1494 /**
1420 1495 * @throws NullPointerException if the task is null
1421 1496 * @throws RejectedExecutionException if the task cannot be
1422 1497 * scheduled for execution
1423 1498 */
1424 1499 public ForkJoinTask<?> submit(Runnable task) {
1500 + if (task == null)
1501 + throw new NullPointerException();
1425 1502 ForkJoinTask<?> job;
1426 1503 if (task instanceof ForkJoinTask<?>) // avoid re-wrap
1427 1504 job = (ForkJoinTask<?>) task;
1428 1505 else
1429 1506 job = ForkJoinTask.adapt(task, null);
1430 - doSubmit(job);
1507 + forkOrSubmit(job);
1431 1508 return job;
1432 1509 }
1433 1510
1434 1511 /**
1435 1512 * @throws NullPointerException {@inheritDoc}
1436 1513 * @throws RejectedExecutionException {@inheritDoc}
1437 1514 */
1438 1515 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
1439 1516 ArrayList<ForkJoinTask<T>> forkJoinTasks =
1440 1517 new ArrayList<ForkJoinTask<T>>(tasks.size());
1441 1518 for (Callable<T> task : tasks)
1442 1519 forkJoinTasks.add(ForkJoinTask.adapt(task));
1443 1520 invoke(new InvokeAll<T>(forkJoinTasks));
1444 1521
1445 1522 @SuppressWarnings({"unchecked", "rawtypes"})
1446 1523 List<Future<T>> futures = (List<Future<T>>) (List) forkJoinTasks;
1447 1524 return futures;
1448 1525 }
1449 1526
1450 1527 static final class InvokeAll<T> extends RecursiveAction {
1451 1528 final ArrayList<ForkJoinTask<T>> tasks;
1452 1529 InvokeAll(ArrayList<ForkJoinTask<T>> tasks) { this.tasks = tasks; }
1453 1530 public void compute() {
1454 1531 try { invokeAll(tasks); }
1455 1532 catch (Exception ignore) {}
1456 1533 }
1457 1534 private static final long serialVersionUID = -7914297376763021607L;
1458 1535 }
1459 1536
1460 1537 /**
1461 1538 * Returns the factory used for constructing new workers.
1462 1539 *
1463 1540 * @return the factory used for constructing new workers
1464 1541 */
1465 1542 public ForkJoinWorkerThreadFactory getFactory() {
1466 1543 return factory;
1467 1544 }
1468 1545
1469 1546 /**
1470 1547 * Returns the handler for internal worker threads that terminate
1471 1548 * due to unrecoverable errors encountered while executing tasks.
1472 1549 *
1473 1550 * @return the handler, or {@code null} if none
1474 1551 */
1475 1552 public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
1476 1553 return ueh;
1477 1554 }
1478 1555
1479 1556 /**
1480 1557 * Returns the targeted parallelism level of this pool.
1481 1558 *
1482 1559 * @return the targeted parallelism level of this pool
1483 1560 */
1484 1561 public int getParallelism() {
1485 1562 return parallelism;
1486 1563 }
1487 1564
1488 1565 /**
1489 1566 * Returns the number of worker threads that have started but not
1490 1567 * yet terminated. The result returned by this method may differ
1491 1568 * from {@link #getParallelism} when threads are created to
1492 1569 * maintain parallelism when others are cooperatively blocked.
1493 1570 *
1494 1571 * @return the number of worker threads
1495 1572 */
1496 1573 public int getPoolSize() {
1497 1574 return workerCounts >>> TOTAL_COUNT_SHIFT;
1498 1575 }
1499 1576
1500 1577 /**
1501 1578 * Returns {@code true} if this pool uses local first-in-first-out
1502 1579 * scheduling mode for forked tasks that are never joined.
1503 1580 *
1504 1581 * @return {@code true} if this pool uses async mode
1505 1582 */
1506 1583 public boolean getAsyncMode() {
1507 1584 return locallyFifo;
1508 1585 }
1509 1586
1510 1587 /**
1511 1588 * Returns an estimate of the number of worker threads that are
1512 1589 * not blocked waiting to join tasks or for other managed
1513 1590 * synchronization. This method may overestimate the
1514 1591 * number of running threads.
1515 1592 *
1516 1593 * @return the number of worker threads
1517 1594 */
1518 1595 public int getRunningThreadCount() {
1519 1596 return workerCounts & RUNNING_COUNT_MASK;
1520 1597 }
1521 1598
1522 1599 /**
1523 1600 * Returns an estimate of the number of threads that are currently
1524 1601 * stealing or executing tasks. This method may overestimate the
1525 1602 * number of active threads.
1526 1603 *
1527 1604 * @return the number of active threads
1528 1605 */
1529 1606 public int getActiveThreadCount() {
1530 1607 return runState & ACTIVE_COUNT_MASK;
1531 1608 }
1532 1609
1533 1610 /**
1534 1611 * Returns {@code true} if all worker threads are currently idle.
1535 1612 * An idle worker is one that cannot obtain a task to execute
1536 1613 * because none are available to steal from other threads, and
1537 1614 * there are no pending submissions to the pool. This method is
1538 1615 * conservative; it might not return {@code true} immediately upon
1539 1616 * idleness of all threads, but will eventually become true if
1540 1617 * threads remain inactive.
1541 1618 *
1542 1619 * @return {@code true} if all threads are currently idle
1543 1620 */
1544 1621 public boolean isQuiescent() {
1545 1622 return (runState & ACTIVE_COUNT_MASK) == 0;
1546 1623 }
1547 1624
1548 1625 /**
1549 1626 * Returns an estimate of the total number of tasks stolen from
1550 1627 * one thread's work queue by another. The reported value
1551 1628 * underestimates the actual total number of steals when the pool
1552 1629 * is not quiescent. This value may be useful for monitoring and
1553 1630 * tuning fork/join programs: in general, steal counts should be
1554 1631 * high enough to keep threads busy, but low enough to avoid
1555 1632 * overhead and contention across threads.
1556 1633 *
1557 1634 * @return the number of steals
1558 1635 */
1559 1636 public long getStealCount() {
1560 1637 return stealCount;
1561 1638 }
1562 1639
1563 1640 /**
1564 1641 * Returns an estimate of the total number of tasks currently held
1565 1642 * in queues by worker threads (but not including tasks submitted
1566 1643 * to the pool that have not begun executing). This value is only
1567 1644 * an approximation, obtained by iterating across all threads in
1568 1645 * the pool. This method may be useful for tuning task
1569 1646 * granularities.
1570 1647 *
1571 1648 * @return the number of queued tasks
1572 1649 */
1573 1650 public long getQueuedTaskCount() {
1574 1651 long count = 0;
1575 1652 for (ForkJoinWorkerThread w : workers)
1576 1653 if (w != null)
1577 1654 count += w.getQueueSize();
1578 1655 return count;
1579 1656 }
1580 1657
1581 1658 /**
1582 1659 * Returns an estimate of the number of tasks submitted to this
1583 1660 * pool that have not yet begun executing. This method takes time
1584 1661 * proportional to the number of submissions.
1585 1662 *
1586 1663 * @return the number of queued submissions
1587 1664 */
1588 1665 public int getQueuedSubmissionCount() {
1589 1666 return submissionQueue.size();
1590 1667 }
1591 1668
1592 1669 /**
1593 1670 * Returns {@code true} if there are any tasks submitted to this
1594 1671 * pool that have not yet begun executing.
1595 1672 *
1596 1673 * @return {@code true} if there are any queued submissions
1597 1674 */
1598 1675 public boolean hasQueuedSubmissions() {
1599 1676 return !submissionQueue.isEmpty();
1600 1677 }
1601 1678
1602 1679 /**
1603 1680 * Removes and returns the next unexecuted submission if one is
1604 1681 * available. This method may be useful in extensions to this
1605 1682 * class that re-assign work in systems with multiple pools.
1606 1683 *
1607 1684 * @return the next submission, or {@code null} if none
1608 1685 */
1609 1686 protected ForkJoinTask<?> pollSubmission() {
1610 1687 return submissionQueue.poll();
1611 1688 }
1612 1689
1613 1690 /**
1614 1691 * Removes all available unexecuted submitted and forked tasks
1615 1692 * from scheduling queues and adds them to the given collection,
1616 1693 * without altering their execution status. These may include
1617 1694 * artificially generated or wrapped tasks. This method is
1618 1695 * designed to be invoked only when the pool is known to be
1619 1696 * quiescent. Invocations at other times may not remove all
1620 1697 * tasks. A failure encountered while attempting to add elements
1621 1698 * to collection {@code c} may result in elements being in
1622 1699 * neither, either or both collections when the associated
1623 1700 * exception is thrown. The behavior of this operation is
1624 1701 * undefined if the specified collection is modified while the
1625 1702 * operation is in progress.
1626 1703 *
1627 1704 * @param c the collection to transfer elements into
1628 1705 * @return the number of elements transferred
1629 1706 */
1630 1707 protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
1631 1708 int count = submissionQueue.drainTo(c);
1632 1709 for (ForkJoinWorkerThread w : workers)
1633 1710 if (w != null)
1634 1711 count += w.drainTasksTo(c);
1635 1712 return count;
1636 1713 }
1637 1714
1638 1715 /**
1639 1716 * Returns a string identifying this pool, as well as its state,
1640 1717 * including indications of run state, parallelism level, and
1641 1718 * worker and task counts.
1642 1719 *
1643 1720 * @return a string identifying this pool, as well as its state
1644 1721 */
1645 1722 public String toString() {
1646 1723 long st = getStealCount();
1647 1724 long qt = getQueuedTaskCount();
1648 1725 long qs = getQueuedSubmissionCount();
1649 1726 int wc = workerCounts;
1650 1727 int tc = wc >>> TOTAL_COUNT_SHIFT;
1651 1728 int rc = wc & RUNNING_COUNT_MASK;
1652 1729 int pc = parallelism;
1653 1730 int rs = runState;
1654 1731 int ac = rs & ACTIVE_COUNT_MASK;
1655 1732 return super.toString() +
1656 1733 "[" + runLevelToString(rs) +
1657 1734 ", parallelism = " + pc +
1658 1735 ", size = " + tc +
1659 1736 ", active = " + ac +
1660 1737 ", running = " + rc +
1661 1738 ", steals = " + st +
1662 1739 ", tasks = " + qt +
1663 1740 ", submissions = " + qs +
1664 1741 "]";
1665 1742 }
1666 1743
1667 1744 private static String runLevelToString(int s) {
1668 1745 return ((s & TERMINATED) != 0 ? "Terminated" :
1669 1746 ((s & TERMINATING) != 0 ? "Terminating" :
1670 1747 ((s & SHUTDOWN) != 0 ? "Shutting down" :
1671 1748 "Running")));
1672 1749 }
1673 1750
1674 1751 /**
1675 1752 * Initiates an orderly shutdown in which previously submitted
1676 1753 * tasks are executed, but no new tasks will be accepted.
1677 1754 * Invocation has no additional effect if already shut down.
1678 1755 * Tasks that are in the process of being submitted concurrently
1679 1756 * during the course of this method may or may not be rejected.
1680 1757 *
1681 1758 * @throws SecurityException if a security manager exists and
1682 1759 * the caller is not permitted to modify threads
1683 1760 * because it does not hold {@link
1684 1761 * java.lang.RuntimePermission}{@code ("modifyThread")}
1685 1762 */
1686 1763 public void shutdown() {
1687 1764 checkPermission();
1688 1765 advanceRunLevel(SHUTDOWN);
1689 1766 tryTerminate(false);
1690 1767 }
1691 1768
1692 1769 /**
1693 1770 * Attempts to cancel and/or stop all tasks, and reject all
1694 1771 * subsequently submitted tasks. Tasks that are in the process of
1695 1772 * being submitted or executed concurrently during the course of
1696 1773 * this method may or may not be rejected. This method cancels
1697 1774 * both existing and unexecuted tasks, in order to permit
1698 1775 * termination in the presence of task dependencies. So the method
1699 1776 * always returns an empty list (unlike the case for some other
1700 1777 * Executors).
1701 1778 *
1702 1779 * @return an empty list
1703 1780 * @throws SecurityException if a security manager exists and
1704 1781 * the caller is not permitted to modify threads
1705 1782 * because it does not hold {@link
1706 1783 * java.lang.RuntimePermission}{@code ("modifyThread")}
1707 1784 */
1708 1785 public List<Runnable> shutdownNow() {
1709 1786 checkPermission();
1710 1787 tryTerminate(true);
1711 1788 return Collections.emptyList();
1712 1789 }
1713 1790
1714 1791 /**
1715 1792 * Returns {@code true} if all tasks have completed following shut down.
1716 1793 *
1717 1794 * @return {@code true} if all tasks have completed following shut down
↓ open down ↓ |
277 lines elided |
↑ open up ↑ |
1718 1795 */
1719 1796 public boolean isTerminated() {
1720 1797 return runState >= TERMINATED;
1721 1798 }
1722 1799
1723 1800 /**
1724 1801 * Returns {@code true} if the process of termination has
1725 1802 * commenced but not yet completed. This method may be useful for
1726 1803 * debugging. A return of {@code true} reported a sufficient
1727 1804 * period after shutdown may indicate that submitted tasks have
1728 - * ignored or suppressed interruption, causing this executor not
1729 - * to properly terminate.
1805 + * ignored or suppressed interruption, or are waiting for IO,
1806 + * causing this executor not to properly terminate. (See the
1807 + * advisory notes for class {@link ForkJoinTask} stating that
1808 + * tasks should not normally entail blocking operations. But if
1809 + * they do, they must abort them on interrupt.)
1730 1810 *
1731 1811 * @return {@code true} if terminating but not yet terminated
1732 1812 */
1733 1813 public boolean isTerminating() {
1734 1814 return (runState & (TERMINATING|TERMINATED)) == TERMINATING;
1735 1815 }
1736 1816
1737 1817 /**
1738 1818 * Returns true if terminating or terminated. Used by ForkJoinWorkerThread.
1739 1819 */
1740 1820 final boolean isAtLeastTerminating() {
1741 1821 return runState >= TERMINATING;
1742 1822 }
1743 1823
1744 1824 /**
1745 1825 * Returns {@code true} if this pool has been shut down.
1746 1826 *
1747 1827 * @return {@code true} if this pool has been shut down
1748 1828 */
1749 1829 public boolean isShutdown() {
1750 1830 return runState >= SHUTDOWN;
1751 1831 }
1752 1832
1753 1833 /**
1754 1834 * Blocks until all tasks have completed execution after a shutdown
1755 1835 * request, or the timeout occurs, or the current thread is
1756 1836 * interrupted, whichever happens first.
↓ open down ↓ |
17 lines elided |
↑ open up ↑ |
1757 1837 *
1758 1838 * @param timeout the maximum time to wait
1759 1839 * @param unit the time unit of the timeout argument
1760 1840 * @return {@code true} if this executor terminated and
1761 1841 * {@code false} if the timeout elapsed before termination
1762 1842 * @throws InterruptedException if interrupted while waiting
1763 1843 */
1764 1844 public boolean awaitTermination(long timeout, TimeUnit unit)
1765 1845 throws InterruptedException {
1766 1846 try {
1767 - return termination.awaitAdvanceInterruptibly(0, timeout, unit) > 0;
1847 + termination.awaitAdvanceInterruptibly(0, timeout, unit);
1768 1848 } catch (TimeoutException ex) {
1769 1849 return false;
1770 1850 }
1851 + return true;
1771 1852 }
1772 1853
1773 1854 /**
1774 1855 * Interface for extending managed parallelism for tasks running
1775 1856 * in {@link ForkJoinPool}s.
1776 1857 *
1777 1858 * <p>A {@code ManagedBlocker} provides two methods. Method
1778 1859 * {@code isReleasable} must return {@code true} if blocking is
1779 1860 * not necessary. Method {@code block} blocks the current thread
1780 1861 * if necessary (perhaps internally invoking {@code isReleasable}
1781 1862 * before actually blocking). The unusual methods in this API
1782 1863 * accommodate synchronizers that may, but don't usually, block
1783 1864 * for long periods. Similarly, they allow more efficient internal
1784 1865 * handling of cases in which additional workers may be, but
1785 1866 * usually are not, needed to ensure sufficient parallelism.
1786 1867 * Toward this end, implementations of method {@code isReleasable}
1787 1868 * must be amenable to repeated invocation.
1788 1869 *
1789 1870 * <p>For example, here is a ManagedBlocker based on a
1790 1871 * ReentrantLock:
1791 1872 * <pre> {@code
1792 1873 * class ManagedLocker implements ManagedBlocker {
1793 1874 * final ReentrantLock lock;
1794 1875 * boolean hasLock = false;
1795 1876 * ManagedLocker(ReentrantLock lock) { this.lock = lock; }
1796 1877 * public boolean block() {
1797 1878 * if (!hasLock)
1798 1879 * lock.lock();
1799 1880 * return true;
1800 1881 * }
1801 1882 * public boolean isReleasable() {
1802 1883 * return hasLock || (hasLock = lock.tryLock());
1803 1884 * }
1804 1885 * }}</pre>
1805 1886 *
1806 1887 * <p>Here is a class that possibly blocks waiting for an
1807 1888 * item on a given queue:
1808 1889 * <pre> {@code
1809 1890 * class QueueTaker<E> implements ManagedBlocker {
1810 1891 * final BlockingQueue<E> queue;
1811 1892 * volatile E item = null;
1812 1893 * QueueTaker(BlockingQueue<E> q) { this.queue = q; }
1813 1894 * public boolean block() throws InterruptedException {
1814 1895 * if (item == null)
1815 1896 * item = queue.take();
1816 1897 * return true;
1817 1898 * }
1818 1899 * public boolean isReleasable() {
1819 1900 * return item != null || (item = queue.poll()) != null;
1820 1901 * }
1821 1902 * public E getItem() { // call after pool.managedBlock completes
1822 1903 * return item;
1823 1904 * }
1824 1905 * }}</pre>
1825 1906 */
1826 1907 public static interface ManagedBlocker {
1827 1908 /**
1828 1909 * Possibly blocks the current thread, for example waiting for
1829 1910 * a lock or condition.
1830 1911 *
1831 1912 * @return {@code true} if no additional blocking is necessary
1832 1913 * (i.e., if isReleasable would return true)
1833 1914 * @throws InterruptedException if interrupted while waiting
1834 1915 * (the method is not required to do so, but is allowed to)
1835 1916 */
1836 1917 boolean block() throws InterruptedException;
1837 1918
1838 1919 /**
1839 1920 * Returns {@code true} if blocking is unnecessary.
1840 1921 */
1841 1922 boolean isReleasable();
1842 1923 }
1843 1924
1844 1925 /**
1845 1926 * Blocks in accord with the given blocker. If the current thread
1846 1927 * is a {@link ForkJoinWorkerThread}, this method possibly
1847 1928 * arranges for a spare thread to be activated if necessary to
1848 1929 * ensure sufficient parallelism while the current thread is blocked.
1849 1930 *
1850 1931 * <p>If the caller is not a {@link ForkJoinTask}, this method is
1851 1932 * behaviorally equivalent to
1852 1933 * <pre> {@code
1853 1934 * while (!blocker.isReleasable())
1854 1935 * if (blocker.block())
1855 1936 * return;
1856 1937 * }</pre>
1857 1938 *
1858 1939 * If the caller is a {@code ForkJoinTask}, then the pool may
1859 1940 * first be expanded to ensure parallelism, and later adjusted.
1860 1941 *
1861 1942 * @param blocker the blocker
1862 1943 * @throws InterruptedException if blocker.block did so
1863 1944 */
1864 1945 public static void managedBlock(ManagedBlocker blocker)
1865 1946 throws InterruptedException {
1866 1947 Thread t = Thread.currentThread();
1867 1948 if (t instanceof ForkJoinWorkerThread) {
1868 1949 ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
1869 1950 w.pool.awaitBlocker(blocker);
1870 1951 }
1871 1952 else {
1872 1953 do {} while (!blocker.isReleasable() && !blocker.block());
1873 1954 }
1874 1955 }
1875 1956
1876 1957 // AbstractExecutorService overrides. These rely on undocumented
1877 1958 // fact that ForkJoinTask.adapt returns ForkJoinTasks that also
1878 1959 // implement RunnableFuture.
1879 1960
1880 1961 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
1881 1962 return (RunnableFuture<T>) ForkJoinTask.adapt(runnable, value);
1882 1963 }
1883 1964
1884 1965 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
1885 1966 return (RunnableFuture<T>) ForkJoinTask.adapt(callable);
1886 1967 }
1887 1968
1888 1969 // Unsafe mechanics
1889 1970
1890 1971 private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
1891 1972 private static final long workerCountsOffset =
1892 1973 objectFieldOffset("workerCounts", ForkJoinPool.class);
1893 1974 private static final long runStateOffset =
1894 1975 objectFieldOffset("runState", ForkJoinPool.class);
1895 1976 private static final long eventCountOffset =
1896 1977 objectFieldOffset("eventCount", ForkJoinPool.class);
1897 1978 private static final long eventWaitersOffset =
1898 1979 objectFieldOffset("eventWaiters", ForkJoinPool.class);
1899 1980 private static final long stealCountOffset =
1900 1981 objectFieldOffset("stealCount", ForkJoinPool.class);
1901 1982 private static final long spareWaitersOffset =
1902 1983 objectFieldOffset("spareWaiters", ForkJoinPool.class);
1903 1984
1904 1985 private static long objectFieldOffset(String field, Class<?> klazz) {
1905 1986 try {
1906 1987 return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
1907 1988 } catch (NoSuchFieldException e) {
1908 1989 // Convert Exception to corresponding Error
1909 1990 NoSuchFieldError error = new NoSuchFieldError(field);
1910 1991 error.initCause(e);
1911 1992 throw error;
1912 1993 }
1913 1994 }
1914 1995 }
↓ open down ↓ |
134 lines elided |
↑ open up ↑ |
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX