Print this page
Split |
Close |
Expand all |
Collapse all |
--- old/src/share/classes/java/util/concurrent/ForkJoinTask.java
+++ new/src/share/classes/java/util/concurrent/ForkJoinTask.java
1 1 /*
2 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3 3 *
4 4 * This code is free software; you can redistribute it and/or modify it
5 5 * under the terms of the GNU General Public License version 2 only, as
6 6 * published by the Free Software Foundation. Oracle designates this
7 7 * particular file as subject to the "Classpath" exception as provided
8 8 * by Oracle in the LICENSE file that accompanied this code.
9 9 *
10 10 * This code is distributed in the hope that it will be useful, but WITHOUT
11 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 13 * version 2 for more details (a copy is included in the LICENSE file that
14 14 * accompanied this code).
15 15 *
16 16 * You should have received a copy of the GNU General Public License version
17 17 * 2 along with this work; if not, write to the Free Software Foundation,
18 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19 19 *
20 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21 21 * or visit www.oracle.com if you need additional information or have any
22 22 * questions.
23 23 */
24 24
25 25 /*
26 26 * This file is available under and governed by the GNU General Public
27 27 * License version 2 only, as published by the Free Software Foundation.
28 28 * However, the following notice accompanied the original version of this
29 29 * file:
30 30 *
31 31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 32 * Expert Group and released to the public domain, as explained at
33 33 * http://creativecommons.org/licenses/publicdomain
34 34 */
↓ open down ↓ |
34 lines elided |
↑ open up ↑ |
35 35
36 36 package java.util.concurrent;
37 37
38 38 import java.io.Serializable;
39 39 import java.util.Collection;
40 40 import java.util.Collections;
41 41 import java.util.List;
42 42 import java.util.RandomAccess;
43 43 import java.util.Map;
44 44 import java.util.WeakHashMap;
45 +import java.util.concurrent.Callable;
46 +import java.util.concurrent.CancellationException;
47 +import java.util.concurrent.ExecutionException;
48 +import java.util.concurrent.Executor;
49 +import java.util.concurrent.ExecutorService;
50 +import java.util.concurrent.Future;
51 +import java.util.concurrent.RejectedExecutionException;
52 +import java.util.concurrent.RunnableFuture;
53 +import java.util.concurrent.TimeUnit;
54 +import java.util.concurrent.TimeoutException;
45 55
46 56 /**
47 57 * Abstract base class for tasks that run within a {@link ForkJoinPool}.
48 58 * A {@code ForkJoinTask} is a thread-like entity that is much
49 59 * lighter weight than a normal thread. Huge numbers of tasks and
50 60 * subtasks may be hosted by a small number of actual threads in a
51 61 * ForkJoinPool, at the price of some usage limitations.
52 62 *
53 63 * <p>A "main" {@code ForkJoinTask} begins execution when submitted
54 64 * to a {@link ForkJoinPool}. Once started, it will usually in turn
55 65 * start other subtasks. As indicated by the name of this class,
56 66 * many programs using {@code ForkJoinTask} employ only methods
57 67 * {@link #fork} and {@link #join}, or derivatives such as {@link
58 68 * #invokeAll(ForkJoinTask...) invokeAll}. However, this class also
59 69 * provides a number of other methods that can come into play in
60 70 * advanced usages, as well as extension mechanics that allow
61 71 * support of new forms of fork/join processing.
62 72 *
63 73 * <p>A {@code ForkJoinTask} is a lightweight form of {@link Future}.
64 74 * The efficiency of {@code ForkJoinTask}s stems from a set of
65 75 * restrictions (that are only partially statically enforceable)
66 76 * reflecting their intended use as computational tasks calculating
67 77 * pure functions or operating on purely isolated objects. The
68 78 * primary coordination mechanisms are {@link #fork}, that arranges
69 79 * asynchronous execution, and {@link #join}, that doesn't proceed
70 80 * until the task's result has been computed. Computations should
71 81 * avoid {@code synchronized} methods or blocks, and should minimize
72 82 * other blocking synchronization apart from joining other tasks or
73 83 * using synchronizers such as Phasers that are advertised to
74 84 * cooperate with fork/join scheduling. Tasks should also not perform
75 85 * blocking IO, and should ideally access variables that are
76 86 * completely independent of those accessed by other running
77 87 * tasks. Minor breaches of these restrictions, for example using
78 88 * shared output streams, may be tolerable in practice, but frequent
79 89 * use may result in poor performance, and the potential to
80 90 * indefinitely stall if the number of threads not waiting for IO or
81 91 * other external synchronization becomes exhausted. This usage
82 92 * restriction is in part enforced by not permitting checked
83 93 * exceptions such as {@code IOExceptions} to be thrown. However,
84 94 * computations may still encounter unchecked exceptions, that are
85 95 * rethrown to callers attempting to join them. These exceptions may
86 96 * additionally include {@link RejectedExecutionException} stemming
87 97 * from internal resource exhaustion, such as failure to allocate
88 98 * internal task queues.
89 99 *
90 100 * <p>The primary method for awaiting completion and extracting
91 101 * results of a task is {@link #join}, but there are several variants:
92 102 * The {@link Future#get} methods support interruptible and/or timed
93 103 * waits for completion and report results using {@code Future}
94 104 * conventions. Method {@link #invoke} is semantically
95 105 * equivalent to {@code fork(); join()} but always attempts to begin
96 106 * execution in the current thread. The "<em>quiet</em>" forms of
97 107 * these methods do not extract results or report exceptions. These
98 108 * may be useful when a set of tasks are being executed, and you need
99 109 * to delay processing of results or exceptions until all complete.
100 110 * Method {@code invokeAll} (available in multiple versions)
101 111 * performs the most common form of parallel invocation: forking a set
102 112 * of tasks and joining them all.
103 113 *
104 114 * <p>The execution status of tasks may be queried at several levels
105 115 * of detail: {@link #isDone} is true if a task completed in any way
106 116 * (including the case where a task was cancelled without executing);
107 117 * {@link #isCompletedNormally} is true if a task completed without
108 118 * cancellation or encountering an exception; {@link #isCancelled} is
109 119 * true if the task was cancelled (in which case {@link #getException}
110 120 * returns a {@link java.util.concurrent.CancellationException}); and
111 121 * {@link #isCompletedAbnormally} is true if a task was either
112 122 * cancelled or encountered an exception, in which case {@link
113 123 * #getException} will return either the encountered exception or
114 124 * {@link java.util.concurrent.CancellationException}.
115 125 *
116 126 * <p>The ForkJoinTask class is not usually directly subclassed.
117 127 * Instead, you subclass one of the abstract classes that support a
118 128 * particular style of fork/join processing, typically {@link
119 129 * RecursiveAction} for computations that do not return results, or
120 130 * {@link RecursiveTask} for those that do. Normally, a concrete
121 131 * ForkJoinTask subclass declares fields comprising its parameters,
↓ open down ↓ |
67 lines elided |
↑ open up ↑ |
122 132 * established in a constructor, and then defines a {@code compute}
123 133 * method that somehow uses the control methods supplied by this base
124 134 * class. While these methods have {@code public} access (to allow
125 135 * instances of different task subclasses to call each other's
126 136 * methods), some of them may only be called from within other
127 137 * ForkJoinTasks (as may be determined using method {@link
128 138 * #inForkJoinPool}). Attempts to invoke them in other contexts
129 139 * result in exceptions or errors, possibly including
130 140 * {@code ClassCastException}.
131 141 *
142 + * <p>Method {@link #join} and its variants are appropriate for use
143 + * only when completion dependencies are acyclic; that is, the
144 + * parallel computation can be described as a directed acyclic graph
145 + * (DAG). Otherwise, executions may encounter a form of deadlock as
146 + * tasks cyclically wait for each other. However, this framework
147 + * supports other methods and techniques (for example the use of
148 + * {@link Phaser}, {@link #helpQuiesce}, and {@link #complete}) that
149 + * may be of use in constructing custom subclasses for problems that
150 + * are not statically structured as DAGs.
151 + *
132 152 * <p>Most base support methods are {@code final}, to prevent
133 153 * overriding of implementations that are intrinsically tied to the
134 154 * underlying lightweight task scheduling framework. Developers
135 155 * creating new basic styles of fork/join processing should minimally
136 156 * implement {@code protected} methods {@link #exec}, {@link
137 157 * #setRawResult}, and {@link #getRawResult}, while also introducing
138 158 * an abstract computational method that can be implemented in its
139 159 * subclasses, possibly relying on other {@code protected} methods
140 160 * provided by this class.
141 161 *
142 162 * <p>ForkJoinTasks should perform relatively small amounts of
143 163 * computation. Large tasks should be split into smaller subtasks,
144 164 * usually via recursive decomposition. As a very rough rule of thumb,
145 165 * a task should perform more than 100 and less than 10000 basic
146 - * computational steps. If tasks are too big, then parallelism cannot
147 - * improve throughput. If too small, then memory and internal task
148 - * maintenance overhead may overwhelm processing.
166 + * computational steps, and should avoid indefinite looping. If tasks
167 + * are too big, then parallelism cannot improve throughput. If too
168 + * small, then memory and internal task maintenance overhead may
169 + * overwhelm processing.
149 170 *
150 171 * <p>This class provides {@code adapt} methods for {@link Runnable}
151 172 * and {@link Callable}, that may be of use when mixing execution of
152 173 * {@code ForkJoinTasks} with other kinds of tasks. When all tasks are
153 174 * of this form, consider using a pool constructed in <em>asyncMode</em>.
154 175 *
155 176 * <p>ForkJoinTasks are {@code Serializable}, which enables them to be
156 177 * used in extensions such as remote execution frameworks. It is
157 178 * sensible to serialize tasks only before or after, but not during,
158 179 * execution. Serialization is not relied on during execution itself.
159 180 *
160 181 * @since 1.7
161 182 * @author Doug Lea
162 183 */
163 184 public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
164 185
165 186 /*
166 187 * See the internal documentation of class ForkJoinPool for a
167 188 * general implementation overview. ForkJoinTasks are mainly
168 189 * responsible for maintaining their "status" field amidst relays
169 190 * to methods in ForkJoinWorkerThread and ForkJoinPool. The
170 191 * methods of this class are more-or-less layered into (1) basic
171 192 * status maintenance (2) execution and awaiting completion (3)
172 193 * user-level methods that additionally report results. This is
173 194 * sometimes hard to see because this file orders exported methods
174 195 * in a way that flows well in javadocs. In particular, most
175 196 * join mechanics are in method quietlyJoin, below.
176 197 */
177 198
178 199 /*
179 200 * The status field holds run control status bits packed into a
180 201 * single int to minimize footprint and to ensure atomicity (via
181 202 * CAS). Status is initially zero, and takes on nonnegative
182 203 * values until completed, upon which status holds value
183 204 * NORMAL, CANCELLED, or EXCEPTIONAL. Tasks undergoing blocking
184 205 * waits by other threads have the SIGNAL bit set. Completion of
185 206 * a stolen task with SIGNAL set awakens any waiters via
186 207 * notifyAll. Even though suboptimal for some purposes, we use
187 208 * basic builtin wait/notify to take advantage of "monitor
188 209 * inflation" in JVMs that we would otherwise need to emulate to
189 210 * avoid adding further per-task bookkeeping overhead. We want
190 211 * these monitors to be "fat", i.e., not use biasing or thin-lock
191 212 * techniques, so use some odd coding idioms that tend to avoid
192 213 * them.
193 214 */
194 215
195 216 /** The run status of this task */
196 217 volatile int status; // accessed directly by pool and workers
197 218
198 219 private static final int NORMAL = -1;
199 220 private static final int CANCELLED = -2;
200 221 private static final int EXCEPTIONAL = -3;
201 222 private static final int SIGNAL = 1;
202 223
203 224 /**
204 225 * Table of exceptions thrown by tasks, to enable reporting by
205 226 * callers. Because exceptions are rare, we don't directly keep
206 227 * them with task objects, but instead use a weak ref table. Note
207 228 * that cancellation exceptions don't appear in the table, but are
208 229 * instead recorded as status values.
209 230 * TODO: Use ConcurrentReferenceHashMap
210 231 */
211 232 static final Map<ForkJoinTask<?>, Throwable> exceptionMap =
212 233 Collections.synchronizedMap
213 234 (new WeakHashMap<ForkJoinTask<?>, Throwable>());
214 235
215 236 // Maintaining completion status
216 237
217 238 /**
218 239 * Marks completion and wakes up threads waiting to join this task,
219 240 * also clearing signal request bits.
220 241 *
221 242 * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
222 243 */
223 244 private void setCompletion(int completion) {
224 245 int s;
225 246 while ((s = status) >= 0) {
226 247 if (UNSAFE.compareAndSwapInt(this, statusOffset, s, completion)) {
227 248 if (s != 0)
228 249 synchronized (this) { notifyAll(); }
229 250 break;
230 251 }
231 252 }
232 253 }
233 254
234 255 /**
↓ open down ↓ |
76 lines elided |
↑ open up ↑ |
235 256 * Records exception and sets exceptional completion.
236 257 *
237 258 * @return status on exit
238 259 */
239 260 private void setExceptionalCompletion(Throwable rex) {
240 261 exceptionMap.put(this, rex);
241 262 setCompletion(EXCEPTIONAL);
242 263 }
243 264
244 265 /**
245 - * Blocks a worker thread until completion. Called only by
246 - * pool. Currently unused -- pool-based waits use timeout
247 - * version below.
248 - */
249 - final void internalAwaitDone() {
250 - int s; // the odd construction reduces lock bias effects
251 - while ((s = status) >= 0) {
252 - try {
253 - synchronized (this) {
254 - if (UNSAFE.compareAndSwapInt(this, statusOffset, s,SIGNAL))
255 - wait();
256 - }
257 - } catch (InterruptedException ie) {
258 - cancelIfTerminating();
259 - }
260 - }
261 - }
262 -
263 - /**
264 266 * Blocks a worker thread until completed or timed out. Called
265 267 * only by pool.
266 - *
267 - * @return status on exit
268 268 */
269 - final int internalAwaitDone(long millis) {
270 - int s;
271 - if ((s = status) >= 0) {
272 - try {
269 + final void internalAwaitDone(long millis, int nanos) {
270 + int s = status;
271 + if ((s == 0 &&
272 + UNSAFE.compareAndSwapInt(this, statusOffset, 0, SIGNAL)) ||
273 + s > 0) {
274 + try { // the odd construction reduces lock bias effects
273 275 synchronized (this) {
274 - if (UNSAFE.compareAndSwapInt(this, statusOffset, s,SIGNAL))
275 - wait(millis, 0);
276 + if (status > 0)
277 + wait(millis, nanos);
278 + else
279 + notifyAll();
276 280 }
277 281 } catch (InterruptedException ie) {
278 282 cancelIfTerminating();
279 283 }
280 - s = status;
281 284 }
282 - return s;
283 285 }
284 286
285 287 /**
286 288 * Blocks a non-worker-thread until completion.
287 289 */
288 290 private void externalAwaitDone() {
289 - int s;
290 - while ((s = status) >= 0) {
291 + if (status >= 0) {
292 + boolean interrupted = false;
291 293 synchronized (this) {
292 - if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)){
293 - boolean interrupted = false;
294 - while (status >= 0) {
294 + for (;;) {
295 + int s = status;
296 + if (s == 0)
297 + UNSAFE.compareAndSwapInt(this, statusOffset,
298 + 0, SIGNAL);
299 + else if (s < 0) {
300 + notifyAll();
301 + break;
302 + }
303 + else {
295 304 try {
296 305 wait();
297 306 } catch (InterruptedException ie) {
298 307 interrupted = true;
299 308 }
300 309 }
301 - if (interrupted)
302 - Thread.currentThread().interrupt();
303 - break;
304 310 }
305 311 }
312 + if (interrupted)
313 + Thread.currentThread().interrupt();
306 314 }
307 315 }
308 316
309 317 /**
318 + * Blocks a non-worker-thread until completion or interruption or timeout.
319 + */
320 + private void externalInterruptibleAwaitDone(boolean timed, long nanos)
321 + throws InterruptedException {
322 + if (Thread.interrupted())
323 + throw new InterruptedException();
324 + if (status >= 0) {
325 + long startTime = timed ? System.nanoTime() : 0L;
326 + synchronized (this) {
327 + for (;;) {
328 + long nt;
329 + int s = status;
330 + if (s == 0)
331 + UNSAFE.compareAndSwapInt(this, statusOffset,
332 + 0, SIGNAL);
333 + else if (s < 0) {
334 + notifyAll();
335 + break;
336 + }
337 + else if (!timed)
338 + wait();
339 + else if ((nt = nanos - (System.nanoTime()-startTime)) > 0L)
340 + wait(nt / 1000000, (int)(nt % 1000000));
341 + else
342 + break;
343 + }
344 + }
345 + }
346 + }
347 +
348 + /**
310 349 * Unless done, calls exec and records status if completed, but
311 350 * doesn't wait for completion otherwise. Primary execution method
312 351 * for ForkJoinWorkerThread.
313 352 */
314 353 final void quietlyExec() {
315 354 try {
316 355 if (status < 0 || !exec())
317 356 return;
318 357 } catch (Throwable rex) {
319 358 setExceptionalCompletion(rex);
320 359 return;
321 360 }
322 361 setCompletion(NORMAL); // must be outside try block
323 362 }
324 363
325 364 // public methods
326 365
327 366 /**
↓ open down ↓ |
8 lines elided |
↑ open up ↑ |
328 367 * Arranges to asynchronously execute this task. While it is not
329 368 * necessarily enforced, it is a usage error to fork a task more
330 369 * than once unless it has completed and been reinitialized.
331 370 * Subsequent modifications to the state of this task or any data
332 371 * it operates on are not necessarily consistently observable by
333 372 * any thread other than the one executing it unless preceded by a
334 373 * call to {@link #join} or related methods, or a call to {@link
335 374 * #isDone} returning {@code true}.
336 375 *
337 376 * <p>This method may be invoked only from within {@code
338 - * ForkJoinTask} computations (as may be determined using method
377 + * ForkJoinPool} computations (as may be determined using method
339 378 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
340 379 * result in exceptions or errors, possibly including {@code
341 380 * ClassCastException}.
342 381 *
343 382 * @return {@code this}, to simplify usage
344 383 */
345 384 public final ForkJoinTask<V> fork() {
346 385 ((ForkJoinWorkerThread) Thread.currentThread())
347 386 .pushTask(this);
348 387 return this;
349 388 }
350 389
351 390 /**
352 - * Returns the result of the computation when it {@link #isDone is done}.
353 - * This method differs from {@link #get()} in that
391 + * Returns the result of the computation when it {@link #isDone is
392 + * done}. This method differs from {@link #get()} in that
354 393 * abnormal completion results in {@code RuntimeException} or
355 - * {@code Error}, not {@code ExecutionException}.
394 + * {@code Error}, not {@code ExecutionException}, and that
395 + * interrupts of the calling thread do <em>not</em> cause the
396 + * method to abruptly return by throwing {@code
397 + * InterruptedException}.
356 398 *
357 399 * @return the computed result
358 400 */
359 401 public final V join() {
360 402 quietlyJoin();
361 403 Throwable ex;
362 404 if (status < NORMAL && (ex = getException()) != null)
363 405 UNSAFE.throwException(ex);
364 406 return getRawResult();
365 407 }
366 408
367 409 /**
368 410 * Commences performing this task, awaits its completion if
369 411 * necessary, and returns its result, or throws an (unchecked)
370 412 * {@code RuntimeException} or {@code Error} if the underlying
371 413 * computation did so.
372 414 *
373 415 * @return the computed result
374 416 */
375 417 public final V invoke() {
376 418 quietlyInvoke();
377 419 Throwable ex;
378 420 if (status < NORMAL && (ex = getException()) != null)
379 421 UNSAFE.throwException(ex);
380 422 return getRawResult();
381 423 }
382 424
383 425 /**
384 426 * Forks the given tasks, returning when {@code isDone} holds for
385 427 * each task or an (unchecked) exception is encountered, in which
386 428 * case the exception is rethrown. If more than one task
↓ open down ↓ |
21 lines elided |
↑ open up ↑ |
387 429 * encounters an exception, then this method throws any one of
388 430 * these exceptions. If any task encounters an exception, the
389 431 * other may be cancelled. However, the execution status of
390 432 * individual tasks is not guaranteed upon exceptional return. The
391 433 * status of each task may be obtained using {@link
392 434 * #getException()} and related methods to check if they have been
393 435 * cancelled, completed normally or exceptionally, or left
394 436 * unprocessed.
395 437 *
396 438 * <p>This method may be invoked only from within {@code
397 - * ForkJoinTask} computations (as may be determined using method
439 + * ForkJoinPool} computations (as may be determined using method
398 440 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
399 441 * result in exceptions or errors, possibly including {@code
400 442 * ClassCastException}.
401 443 *
402 444 * @param t1 the first task
403 445 * @param t2 the second task
404 446 * @throws NullPointerException if any task is null
405 447 */
406 448 public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
407 449 t2.fork();
408 450 t1.invoke();
409 451 t2.join();
410 452 }
411 453
412 454 /**
413 455 * Forks the given tasks, returning when {@code isDone} holds for
414 456 * each task or an (unchecked) exception is encountered, in which
↓ open down ↓ |
7 lines elided |
↑ open up ↑ |
415 457 * case the exception is rethrown. If more than one task
416 458 * encounters an exception, then this method throws any one of
417 459 * these exceptions. If any task encounters an exception, others
418 460 * may be cancelled. However, the execution status of individual
419 461 * tasks is not guaranteed upon exceptional return. The status of
420 462 * each task may be obtained using {@link #getException()} and
421 463 * related methods to check if they have been cancelled, completed
422 464 * normally or exceptionally, or left unprocessed.
423 465 *
424 466 * <p>This method may be invoked only from within {@code
425 - * ForkJoinTask} computations (as may be determined using method
467 + * ForkJoinPool} computations (as may be determined using method
426 468 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
427 469 * result in exceptions or errors, possibly including {@code
428 470 * ClassCastException}.
429 471 *
430 472 * @param tasks the tasks
431 473 * @throws NullPointerException if any task is null
432 474 */
433 475 public static void invokeAll(ForkJoinTask<?>... tasks) {
434 476 Throwable ex = null;
435 477 int last = tasks.length - 1;
436 478 for (int i = last; i >= 0; --i) {
437 479 ForkJoinTask<?> t = tasks[i];
438 480 if (t == null) {
439 481 if (ex == null)
440 482 ex = new NullPointerException();
441 483 }
442 484 else if (i != 0)
443 485 t.fork();
444 486 else {
445 487 t.quietlyInvoke();
446 488 if (ex == null && t.status < NORMAL)
447 489 ex = t.getException();
448 490 }
449 491 }
450 492 for (int i = 1; i <= last; ++i) {
451 493 ForkJoinTask<?> t = tasks[i];
452 494 if (t != null) {
453 495 if (ex != null)
454 496 t.cancel(false);
455 497 else {
456 498 t.quietlyJoin();
457 499 if (ex == null && t.status < NORMAL)
458 500 ex = t.getException();
459 501 }
460 502 }
461 503 }
462 504 if (ex != null)
463 505 UNSAFE.throwException(ex);
464 506 }
465 507
466 508 /**
467 509 * Forks all tasks in the specified collection, returning when
468 510 * {@code isDone} holds for each task or an (unchecked) exception
469 511 * is encountered, in which case the exception is rethrown. If
↓ open down ↓ |
34 lines elided |
↑ open up ↑ |
470 512 * more than one task encounters an exception, then this method
471 513 * throws any one of these exceptions. If any task encounters an
472 514 * exception, others may be cancelled. However, the execution
473 515 * status of individual tasks is not guaranteed upon exceptional
474 516 * return. The status of each task may be obtained using {@link
475 517 * #getException()} and related methods to check if they have been
476 518 * cancelled, completed normally or exceptionally, or left
477 519 * unprocessed.
478 520 *
479 521 * <p>This method may be invoked only from within {@code
480 - * ForkJoinTask} computations (as may be determined using method
522 + * ForkJoinPool} computations (as may be determined using method
481 523 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
482 524 * result in exceptions or errors, possibly including {@code
483 525 * ClassCastException}.
484 526 *
485 527 * @param tasks the collection of tasks
486 528 * @return the tasks argument, to simplify usage
487 529 * @throws NullPointerException if tasks or any element are null
488 530 */
489 531 public static <T extends ForkJoinTask<?>> Collection<T> invokeAll(Collection<T> tasks) {
490 532 if (!(tasks instanceof RandomAccess) || !(tasks instanceof List<?>)) {
491 533 invokeAll(tasks.toArray(new ForkJoinTask<?>[tasks.size()]));
492 534 return tasks;
493 535 }
494 536 @SuppressWarnings("unchecked")
495 537 List<? extends ForkJoinTask<?>> ts =
496 538 (List<? extends ForkJoinTask<?>>) tasks;
497 539 Throwable ex = null;
498 540 int last = ts.size() - 1;
499 541 for (int i = last; i >= 0; --i) {
500 542 ForkJoinTask<?> t = ts.get(i);
501 543 if (t == null) {
502 544 if (ex == null)
503 545 ex = new NullPointerException();
504 546 }
505 547 else if (i != 0)
506 548 t.fork();
507 549 else {
508 550 t.quietlyInvoke();
509 551 if (ex == null && t.status < NORMAL)
510 552 ex = t.getException();
511 553 }
512 554 }
513 555 for (int i = 1; i <= last; ++i) {
514 556 ForkJoinTask<?> t = ts.get(i);
515 557 if (t != null) {
516 558 if (ex != null)
517 559 t.cancel(false);
518 560 else {
519 561 t.quietlyJoin();
520 562 if (ex == null && t.status < NORMAL)
521 563 ex = t.getException();
↓ open down ↓ |
31 lines elided |
↑ open up ↑ |
522 564 }
523 565 }
524 566 }
525 567 if (ex != null)
526 568 UNSAFE.throwException(ex);
527 569 return tasks;
528 570 }
529 571
530 572 /**
531 573 * Attempts to cancel execution of this task. This attempt will
532 - * fail if the task has already completed, has already been
533 - * cancelled, or could not be cancelled for some other reason. If
534 - * successful, and this task has not started when cancel is
535 - * called, execution of this task is suppressed, {@link
536 - * #isCancelled} will report true, and {@link #join} will result
537 - * in a {@code CancellationException} being thrown.
574 + * fail if the task has already completed or could not be
575 + * cancelled for some other reason. If successful, and this task
576 + * has not started when {@code cancel} is called, execution of
577 + * this task is suppressed. After this method returns
578 + * successfully, unless there is an intervening call to {@link
579 + * #reinitialize}, subsequent calls to {@link #isCancelled},
580 + * {@link #isDone}, and {@code cancel} will return {@code true}
581 + * and calls to {@link #join} and related methods will result in
582 + * {@code CancellationException}.
538 583 *
539 584 * <p>This method may be overridden in subclasses, but if so, must
540 - * still ensure that these minimal properties hold. In particular,
541 - * the {@code cancel} method itself must not throw exceptions.
585 + * still ensure that these properties hold. In particular, the
586 + * {@code cancel} method itself must not throw exceptions.
542 587 *
543 588 * <p>This method is designed to be invoked by <em>other</em>
544 589 * tasks. To terminate the current task, you can just return or
545 590 * throw an unchecked exception from its computation method, or
546 591 * invoke {@link #completeExceptionally}.
547 592 *
548 - * @param mayInterruptIfRunning this value is ignored in the
549 - * default implementation because tasks are not
550 - * cancelled via interruption
593 + * @param mayInterruptIfRunning this value has no effect in the
594 + * default implementation because interrupts are not used to
595 + * control cancellation.
551 596 *
552 597 * @return {@code true} if this task is now cancelled
553 598 */
554 599 public boolean cancel(boolean mayInterruptIfRunning) {
555 600 setCompletion(CANCELLED);
556 601 return status == CANCELLED;
557 602 }
558 603
559 604 /**
560 605 * Cancels, ignoring any exceptions thrown by cancel. Used during
561 606 * worker and pool shutdown. Cancel is spec'ed not to throw any
562 607 * exceptions, but if it does anyway, we have no recourse during
563 608 * shutdown, so guard against this case.
564 609 */
565 610 final void cancelIgnoringExceptions() {
566 611 try {
567 612 cancel(false);
568 613 } catch (Throwable ignore) {
569 614 }
570 615 }
571 616
572 617 /**
573 618 * Cancels if current thread is a terminating worker thread,
574 619 * ignoring any exceptions thrown by cancel.
575 620 */
576 621 final void cancelIfTerminating() {
577 622 Thread t = Thread.currentThread();
578 623 if ((t instanceof ForkJoinWorkerThread) &&
579 624 ((ForkJoinWorkerThread) t).isTerminating()) {
580 625 try {
581 626 cancel(false);
582 627 } catch (Throwable ignore) {
583 628 }
584 629 }
585 630 }
586 631
587 632 public final boolean isDone() {
588 633 return status < 0;
589 634 }
590 635
591 636 public final boolean isCancelled() {
592 637 return status == CANCELLED;
593 638 }
594 639
595 640 /**
596 641 * Returns {@code true} if this task threw an exception or was cancelled.
597 642 *
598 643 * @return {@code true} if this task threw an exception or was cancelled
599 644 */
600 645 public final boolean isCompletedAbnormally() {
601 646 return status < NORMAL;
602 647 }
603 648
604 649 /**
605 650 * Returns {@code true} if this task completed without throwing an
606 651 * exception and was not cancelled.
607 652 *
608 653 * @return {@code true} if this task completed without throwing an
609 654 * exception and was not cancelled
610 655 */
611 656 public final boolean isCompletedNormally() {
612 657 return status == NORMAL;
613 658 }
614 659
615 660 /**
616 661 * Returns the exception thrown by the base computation, or a
617 662 * {@code CancellationException} if cancelled, or {@code null} if
618 663 * none or if the method has not yet completed.
619 664 *
620 665 * @return the exception, or {@code null} if none
621 666 */
622 667 public final Throwable getException() {
623 668 int s = status;
624 669 return ((s >= NORMAL) ? null :
625 670 (s == CANCELLED) ? new CancellationException() :
626 671 exceptionMap.get(this));
627 672 }
628 673
629 674 /**
630 675 * Completes this task abnormally, and if not already aborted or
631 676 * cancelled, causes it to throw the given exception upon
632 677 * {@code join} and related operations. This method may be used
633 678 * to induce exceptions in asynchronous tasks, or to force
634 679 * completion of tasks that would not otherwise complete. Its use
635 680 * in other situations is discouraged. This method is
636 681 * overridable, but overridden versions must invoke {@code super}
637 682 * implementation to maintain guarantees.
638 683 *
639 684 * @param ex the exception to throw. If this exception is not a
640 685 * {@code RuntimeException} or {@code Error}, the actual exception
641 686 * thrown will be a {@code RuntimeException} with cause {@code ex}.
642 687 */
643 688 public void completeExceptionally(Throwable ex) {
644 689 setExceptionalCompletion((ex instanceof RuntimeException) ||
645 690 (ex instanceof Error) ? ex :
646 691 new RuntimeException(ex));
647 692 }
648 693
649 694 /**
650 695 * Completes this task, and if not already aborted or cancelled,
651 696 * returning the given value as the result of subsequent
652 697 * invocations of {@code join} and related operations. This method
653 698 * may be used to provide results for asynchronous tasks, or to
654 699 * provide alternative handling for tasks that would not otherwise
655 700 * complete normally. Its use in other situations is
656 701 * discouraged. This method is overridable, but overridden
657 702 * versions must invoke {@code super} implementation to maintain
658 703 * guarantees.
659 704 *
660 705 * @param value the result value for this task
661 706 */
662 707 public void complete(V value) {
663 708 try {
664 709 setRawResult(value);
665 710 } catch (Throwable rex) {
666 711 setExceptionalCompletion(rex);
667 712 return;
668 713 }
669 714 setCompletion(NORMAL);
670 715 }
671 716
672 717 /**
673 718 * Waits if necessary for the computation to complete, and then
↓ open down ↓ |
113 lines elided |
↑ open up ↑ |
674 719 * retrieves its result.
675 720 *
676 721 * @return the computed result
677 722 * @throws CancellationException if the computation was cancelled
678 723 * @throws ExecutionException if the computation threw an
679 724 * exception
680 725 * @throws InterruptedException if the current thread is not a
681 726 * member of a ForkJoinPool and was interrupted while waiting
682 727 */
683 728 public final V get() throws InterruptedException, ExecutionException {
684 - int s;
685 - if (Thread.currentThread() instanceof ForkJoinWorkerThread) {
729 + Thread t = Thread.currentThread();
730 + if (t instanceof ForkJoinWorkerThread)
686 731 quietlyJoin();
687 - s = status;
688 - }
689 - else {
690 - while ((s = status) >= 0) {
691 - synchronized (this) { // interruptible form of awaitDone
692 - if (UNSAFE.compareAndSwapInt(this, statusOffset,
693 - s, SIGNAL)) {
694 - while (status >= 0)
695 - wait();
696 - }
697 - }
698 - }
699 - }
700 - if (s < NORMAL) {
732 + else
733 + externalInterruptibleAwaitDone(false, 0L);
734 + int s = status;
735 + if (s != NORMAL) {
701 736 Throwable ex;
702 737 if (s == CANCELLED)
703 738 throw new CancellationException();
704 739 if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
705 740 throw new ExecutionException(ex);
706 741 }
707 742 return getRawResult();
708 743 }
709 744
710 745 /**
711 746 * Waits if necessary for at most the given time for the computation
712 747 * to complete, and then retrieves its result, if available.
713 748 *
714 749 * @param timeout the maximum time to wait
715 750 * @param unit the time unit of the timeout argument
↓ open down ↓ |
5 lines elided |
↑ open up ↑ |
716 751 * @return the computed result
717 752 * @throws CancellationException if the computation was cancelled
718 753 * @throws ExecutionException if the computation threw an
719 754 * exception
720 755 * @throws InterruptedException if the current thread is not a
721 756 * member of a ForkJoinPool and was interrupted while waiting
722 757 * @throws TimeoutException if the wait timed out
723 758 */
724 759 public final V get(long timeout, TimeUnit unit)
725 760 throws InterruptedException, ExecutionException, TimeoutException {
761 + long nanos = unit.toNanos(timeout);
726 762 Thread t = Thread.currentThread();
727 - ForkJoinPool pool;
728 - if (t instanceof ForkJoinWorkerThread) {
729 - ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
730 - if (status >= 0 && w.unpushTask(this))
731 - quietlyExec();
732 - pool = w.pool;
733 - }
763 + if (t instanceof ForkJoinWorkerThread)
764 + ((ForkJoinWorkerThread)t).joinTask(this, true, nanos);
734 765 else
735 - pool = null;
736 - /*
737 - * Timed wait loop intermixes cases for FJ (pool != null) and
738 - * non FJ threads. For FJ, decrement pool count but don't try
739 - * for replacement; increment count on completion. For non-FJ,
740 - * deal with interrupts. This is messy, but a little less so
741 - * than is splitting the FJ and nonFJ cases.
742 - */
743 - boolean interrupted = false;
744 - boolean dec = false; // true if pool count decremented
745 - long nanos = unit.toNanos(timeout);
746 - for (;;) {
747 - if (pool == null && Thread.interrupted()) {
748 - interrupted = true;
749 - break;
750 - }
751 - int s = status;
752 - if (s < 0)
753 - break;
754 - if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)) {
755 - long startTime = System.nanoTime();
756 - long nt; // wait time
757 - while (status >= 0 &&
758 - (nt = nanos - (System.nanoTime() - startTime)) > 0) {
759 - if (pool != null && !dec)
760 - dec = pool.tryDecrementRunningCount();
761 - else {
762 - long ms = nt / 1000000;
763 - int ns = (int) (nt % 1000000);
764 - try {
765 - synchronized (this) {
766 - if (status >= 0)
767 - wait(ms, ns);
768 - }
769 - } catch (InterruptedException ie) {
770 - if (pool != null)
771 - cancelIfTerminating();
772 - else {
773 - interrupted = true;
774 - break;
775 - }
776 - }
777 - }
778 - }
779 - break;
780 - }
781 - }
782 - if (pool != null && dec)
783 - pool.incrementRunningCount();
784 - if (interrupted)
785 - throw new InterruptedException();
786 - int es = status;
787 - if (es != NORMAL) {
766 + externalInterruptibleAwaitDone(true, nanos);
767 + int s = status;
768 + if (s != NORMAL) {
788 769 Throwable ex;
789 - if (es == CANCELLED)
770 + if (s == CANCELLED)
790 771 throw new CancellationException();
791 - if (es == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
772 + if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
792 773 throw new ExecutionException(ex);
793 774 throw new TimeoutException();
794 775 }
795 776 return getRawResult();
796 777 }
797 778
798 779 /**
799 780 * Joins this task, without returning its result or throwing its
800 781 * exception. This method may be useful when processing
801 782 * collections of tasks when some have been cancelled or otherwise
802 783 * known to have aborted.
803 784 */
804 785 public final void quietlyJoin() {
805 786 Thread t;
806 787 if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
807 788 ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
808 789 if (status >= 0) {
809 790 if (w.unpushTask(this)) {
810 791 boolean completed;
811 792 try {
↓ open down ↓ |
10 lines elided |
↑ open up ↑ |
812 793 completed = exec();
813 794 } catch (Throwable rex) {
814 795 setExceptionalCompletion(rex);
815 796 return;
816 797 }
817 798 if (completed) {
818 799 setCompletion(NORMAL);
819 800 return;
820 801 }
821 802 }
822 - w.joinTask(this);
803 + w.joinTask(this, false, 0L);
823 804 }
824 805 }
825 806 else
826 807 externalAwaitDone();
827 808 }
828 809
829 810 /**
830 811 * Commences performing this task and awaits its completion if
831 812 * necessary, without returning its result or throwing its
832 813 * exception.
833 814 */
834 815 public final void quietlyInvoke() {
835 816 if (status >= 0) {
836 817 boolean completed;
837 818 try {
838 819 completed = exec();
839 820 } catch (Throwable rex) {
840 821 setExceptionalCompletion(rex);
841 822 return;
842 823 }
843 824 if (completed)
844 825 setCompletion(NORMAL);
845 826 else
846 827 quietlyJoin();
847 828 }
↓ open down ↓ |
15 lines elided |
↑ open up ↑ |
848 829 }
849 830
850 831 /**
851 832 * Possibly executes tasks until the pool hosting the current task
852 833 * {@link ForkJoinPool#isQuiescent is quiescent}. This method may
853 834 * be of use in designs in which many tasks are forked, but none
854 835 * are explicitly joined, instead executing them until all are
855 836 * processed.
856 837 *
857 838 * <p>This method may be invoked only from within {@code
858 - * ForkJoinTask} computations (as may be determined using method
839 + * ForkJoinPool} computations (as may be determined using method
859 840 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
860 841 * result in exceptions or errors, possibly including {@code
861 842 * ClassCastException}.
862 843 */
863 844 public static void helpQuiesce() {
864 845 ((ForkJoinWorkerThread) Thread.currentThread())
865 846 .helpQuiescePool();
866 847 }
867 848
868 849 /**
869 850 * Resets the internal bookkeeping state of this task, allowing a
870 851 * subsequent {@code fork}. This method allows repeated reuse of
871 852 * this task, but only if reuse occurs when this task has either
872 853 * never been forked, or has been forked, then completed and all
873 854 * outstanding joins of this task have also completed. Effects
874 855 * under any other usage conditions are not guaranteed.
875 856 * This method may be useful when executing
876 857 * pre-constructed trees of subtasks in loops.
858 + *
859 + * <p>Upon completion of this method, {@code isDone()} reports
860 + * {@code false}, and {@code getException()} reports {@code
861 + * null}. However, the value returned by {@code getRawResult} is
862 + * unaffected. To clear this value, you can invoke {@code
863 + * setRawResult(null)}.
877 864 */
878 865 public void reinitialize() {
879 866 if (status == EXCEPTIONAL)
880 867 exceptionMap.remove(this);
881 868 status = 0;
882 869 }
883 870
884 871 /**
885 872 * Returns the pool hosting the current task execution, or null
886 873 * if this task is executing outside of any ForkJoinPool.
887 874 *
↓ open down ↓ |
1 lines elided |
↑ open up ↑ |
888 875 * @see #inForkJoinPool
889 876 * @return the pool, or {@code null} if none
890 877 */
891 878 public static ForkJoinPool getPool() {
892 879 Thread t = Thread.currentThread();
893 880 return (t instanceof ForkJoinWorkerThread) ?
894 881 ((ForkJoinWorkerThread) t).pool : null;
895 882 }
896 883
897 884 /**
898 - * Returns {@code true} if the current thread is executing as a
899 - * ForkJoinPool computation.
885 + * Returns {@code true} if the current thread is a {@link
886 + * ForkJoinWorkerThread} executing as a ForkJoinPool computation.
900 887 *
901 - * @return {@code true} if the current thread is executing as a
902 - * ForkJoinPool computation, or false otherwise
888 + * @return {@code true} if the current thread is a {@link
889 + * ForkJoinWorkerThread} executing as a ForkJoinPool computation,
890 + * or {@code false} otherwise
903 891 */
904 892 public static boolean inForkJoinPool() {
905 893 return Thread.currentThread() instanceof ForkJoinWorkerThread;
906 894 }
907 895
908 896 /**
909 897 * Tries to unschedule this task for execution. This method will
910 898 * typically succeed if this task is the most recently forked task
911 899 * by the current thread, and has not commenced executing in
912 900 * another thread. This method may be useful when arranging
913 901 * alternative local processing of tasks that could have been, but
914 902 * were not, stolen.
915 903 *
916 904 * <p>This method may be invoked only from within {@code
917 - * ForkJoinTask} computations (as may be determined using method
905 + * ForkJoinPool} computations (as may be determined using method
918 906 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
919 907 * result in exceptions or errors, possibly including {@code
920 908 * ClassCastException}.
921 909 *
922 910 * @return {@code true} if unforked
923 911 */
924 912 public boolean tryUnfork() {
925 913 return ((ForkJoinWorkerThread) Thread.currentThread())
926 914 .unpushTask(this);
927 915 }
928 916
929 917 /**
930 918 * Returns an estimate of the number of tasks that have been
931 919 * forked by the current worker thread but not yet executed. This
932 920 * value may be useful for heuristic decisions about whether to
933 921 * fork other tasks.
934 922 *
935 923 * <p>This method may be invoked only from within {@code
936 - * ForkJoinTask} computations (as may be determined using method
924 + * ForkJoinPool} computations (as may be determined using method
937 925 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
938 926 * result in exceptions or errors, possibly including {@code
939 927 * ClassCastException}.
940 928 *
941 929 * @return the number of tasks
942 930 */
943 931 public static int getQueuedTaskCount() {
944 932 return ((ForkJoinWorkerThread) Thread.currentThread())
945 933 .getQueueSize();
946 934 }
947 935
948 936 /**
↓ open down ↓ |
2 lines elided |
↑ open up ↑ |
949 937 * Returns an estimate of how many more locally queued tasks are
950 938 * held by the current worker thread than there are other worker
951 939 * threads that might steal them. This value may be useful for
952 940 * heuristic decisions about whether to fork other tasks. In many
953 941 * usages of ForkJoinTasks, at steady state, each worker should
954 942 * aim to maintain a small constant surplus (for example, 3) of
955 943 * tasks, and to process computations locally if this threshold is
956 944 * exceeded.
957 945 *
958 946 * <p>This method may be invoked only from within {@code
959 - * ForkJoinTask} computations (as may be determined using method
947 + * ForkJoinPool} computations (as may be determined using method
960 948 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
961 949 * result in exceptions or errors, possibly including {@code
962 950 * ClassCastException}.
963 951 *
964 952 * @return the surplus number of tasks, which may be negative
965 953 */
966 954 public static int getSurplusQueuedTaskCount() {
967 955 return ((ForkJoinWorkerThread) Thread.currentThread())
968 956 .getEstimatedSurplusTaskCount();
969 957 }
970 958
971 959 // Extension methods
972 960
973 961 /**
974 962 * Returns the result that would be returned by {@link #join}, even
975 963 * if this task completed abnormally, or {@code null} if this task
976 964 * is not known to have been completed. This method is designed
977 965 * to aid debugging, as well as to support extensions. Its use in
978 966 * any other context is discouraged.
979 967 *
980 968 * @return the result, or {@code null} if not completed
981 969 */
982 970 public abstract V getRawResult();
983 971
984 972 /**
985 973 * Forces the given value to be returned as a result. This method
986 974 * is designed to support extensions, and should not in general be
987 975 * called otherwise.
988 976 *
989 977 * @param value the value
990 978 */
991 979 protected abstract void setRawResult(V value);
992 980
993 981 /**
994 982 * Immediately performs the base action of this task. This method
995 983 * is designed to support extensions, and should not in general be
996 984 * called otherwise. The return value controls whether this task
997 985 * is considered to be done normally. It may return false in
998 986 * asynchronous actions that require explicit invocations of
999 987 * {@link #complete} to become joinable. It may also throw an
1000 988 * (unchecked) exception to indicate abnormal exit.
1001 989 *
1002 990 * @return {@code true} if completed normally
1003 991 */
1004 992 protected abstract boolean exec();
1005 993
1006 994 /**
↓ open down ↓ |
37 lines elided |
↑ open up ↑ |
1007 995 * Returns, but does not unschedule or execute, a task queued by
1008 996 * the current thread but not yet executed, if one is immediately
1009 997 * available. There is no guarantee that this task will actually
1010 998 * be polled or executed next. Conversely, this method may return
1011 999 * null even if a task exists but cannot be accessed without
1012 1000 * contention with other threads. This method is designed
1013 1001 * primarily to support extensions, and is unlikely to be useful
1014 1002 * otherwise.
1015 1003 *
1016 1004 * <p>This method may be invoked only from within {@code
1017 - * ForkJoinTask} computations (as may be determined using method
1005 + * ForkJoinPool} computations (as may be determined using method
1018 1006 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
1019 1007 * result in exceptions or errors, possibly including {@code
1020 1008 * ClassCastException}.
1021 1009 *
1022 1010 * @return the next task, or {@code null} if none are available
1023 1011 */
1024 1012 protected static ForkJoinTask<?> peekNextLocalTask() {
1025 1013 return ((ForkJoinWorkerThread) Thread.currentThread())
1026 1014 .peekTask();
1027 1015 }
1028 1016
1029 1017 /**
1030 1018 * Unschedules and returns, without executing, the next task
1031 1019 * queued by the current thread but not yet executed. This method
1032 1020 * is designed primarily to support extensions, and is unlikely to
1033 1021 * be useful otherwise.
1034 1022 *
1035 1023 * <p>This method may be invoked only from within {@code
1036 - * ForkJoinTask} computations (as may be determined using method
1024 + * ForkJoinPool} computations (as may be determined using method
1037 1025 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
1038 1026 * result in exceptions or errors, possibly including {@code
1039 1027 * ClassCastException}.
1040 1028 *
1041 1029 * @return the next task, or {@code null} if none are available
1042 1030 */
1043 1031 protected static ForkJoinTask<?> pollNextLocalTask() {
1044 1032 return ((ForkJoinWorkerThread) Thread.currentThread())
1045 1033 .pollLocalTask();
1046 1034 }
1047 1035
1048 1036 /**
↓ open down ↓ |
2 lines elided |
↑ open up ↑ |
1049 1037 * Unschedules and returns, without executing, the next task
1050 1038 * queued by the current thread but not yet executed, if one is
1051 1039 * available, or if not available, a task that was forked by some
1052 1040 * other thread, if available. Availability may be transient, so a
1053 1041 * {@code null} result does not necessarily imply quiescence
1054 1042 * of the pool this task is operating in. This method is designed
1055 1043 * primarily to support extensions, and is unlikely to be useful
1056 1044 * otherwise.
1057 1045 *
1058 1046 * <p>This method may be invoked only from within {@code
1059 - * ForkJoinTask} computations (as may be determined using method
1047 + * ForkJoinPool} computations (as may be determined using method
1060 1048 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
1061 1049 * result in exceptions or errors, possibly including {@code
1062 1050 * ClassCastException}.
1063 1051 *
1064 1052 * @return a task, or {@code null} if none are available
1065 1053 */
1066 1054 protected static ForkJoinTask<?> pollTask() {
1067 1055 return ((ForkJoinWorkerThread) Thread.currentThread())
1068 1056 .pollTask();
1069 1057 }
1070 1058
1071 1059 /**
1072 1060 * Adaptor for Runnables. This implements RunnableFuture
1073 1061 * to be compliant with AbstractExecutorService constraints
1074 1062 * when used in ForkJoinPool.
1075 1063 */
1076 1064 static final class AdaptedRunnable<T> extends ForkJoinTask<T>
1077 1065 implements RunnableFuture<T> {
1078 1066 final Runnable runnable;
1079 1067 final T resultOnCompletion;
1080 1068 T result;
1081 1069 AdaptedRunnable(Runnable runnable, T result) {
1082 1070 if (runnable == null) throw new NullPointerException();
1083 1071 this.runnable = runnable;
1084 1072 this.resultOnCompletion = result;
1085 1073 }
1086 1074 public T getRawResult() { return result; }
1087 1075 public void setRawResult(T v) { result = v; }
1088 1076 public boolean exec() {
1089 1077 runnable.run();
1090 1078 result = resultOnCompletion;
1091 1079 return true;
1092 1080 }
1093 1081 public void run() { invoke(); }
1094 1082 private static final long serialVersionUID = 5232453952276885070L;
1095 1083 }
1096 1084
1097 1085 /**
1098 1086 * Adaptor for Callables
1099 1087 */
1100 1088 static final class AdaptedCallable<T> extends ForkJoinTask<T>
1101 1089 implements RunnableFuture<T> {
1102 1090 final Callable<? extends T> callable;
1103 1091 T result;
1104 1092 AdaptedCallable(Callable<? extends T> callable) {
1105 1093 if (callable == null) throw new NullPointerException();
1106 1094 this.callable = callable;
1107 1095 }
1108 1096 public T getRawResult() { return result; }
1109 1097 public void setRawResult(T v) { result = v; }
1110 1098 public boolean exec() {
1111 1099 try {
1112 1100 result = callable.call();
1113 1101 return true;
1114 1102 } catch (Error err) {
1115 1103 throw err;
1116 1104 } catch (RuntimeException rex) {
1117 1105 throw rex;
1118 1106 } catch (Exception ex) {
1119 1107 throw new RuntimeException(ex);
1120 1108 }
1121 1109 }
1122 1110 public void run() { invoke(); }
1123 1111 private static final long serialVersionUID = 2838392045355241008L;
1124 1112 }
1125 1113
1126 1114 /**
1127 1115 * Returns a new {@code ForkJoinTask} that performs the {@code run}
1128 1116 * method of the given {@code Runnable} as its action, and returns
1129 1117 * a null result upon {@link #join}.
1130 1118 *
1131 1119 * @param runnable the runnable action
1132 1120 * @return the task
1133 1121 */
1134 1122 public static ForkJoinTask<?> adapt(Runnable runnable) {
1135 1123 return new AdaptedRunnable<Void>(runnable, null);
1136 1124 }
1137 1125
1138 1126 /**
1139 1127 * Returns a new {@code ForkJoinTask} that performs the {@code run}
1140 1128 * method of the given {@code Runnable} as its action, and returns
1141 1129 * the given result upon {@link #join}.
1142 1130 *
1143 1131 * @param runnable the runnable action
1144 1132 * @param result the result upon completion
1145 1133 * @return the task
1146 1134 */
1147 1135 public static <T> ForkJoinTask<T> adapt(Runnable runnable, T result) {
1148 1136 return new AdaptedRunnable<T>(runnable, result);
1149 1137 }
1150 1138
1151 1139 /**
1152 1140 * Returns a new {@code ForkJoinTask} that performs the {@code call}
1153 1141 * method of the given {@code Callable} as its action, and returns
1154 1142 * its result upon {@link #join}, translating any checked exceptions
1155 1143 * encountered into {@code RuntimeException}.
1156 1144 *
1157 1145 * @param callable the callable action
1158 1146 * @return the task
1159 1147 */
1160 1148 public static <T> ForkJoinTask<T> adapt(Callable<? extends T> callable) {
1161 1149 return new AdaptedCallable<T>(callable);
1162 1150 }
1163 1151
1164 1152 // Serialization support
1165 1153
1166 1154 private static final long serialVersionUID = -7721805057305804111L;
1167 1155
1168 1156 /**
1169 1157 * Saves the state to a stream (that is, serializes it).
1170 1158 *
1171 1159 * @serialData the current run status and the exception thrown
1172 1160 * during execution, or {@code null} if none
1173 1161 * @param s the stream
1174 1162 */
1175 1163 private void writeObject(java.io.ObjectOutputStream s)
1176 1164 throws java.io.IOException {
1177 1165 s.defaultWriteObject();
1178 1166 s.writeObject(getException());
1179 1167 }
1180 1168
1181 1169 /**
1182 1170 * Reconstitutes the instance from a stream (that is, deserializes it).
1183 1171 *
1184 1172 * @param s the stream
1185 1173 */
1186 1174 private void readObject(java.io.ObjectInputStream s)
1187 1175 throws java.io.IOException, ClassNotFoundException {
1188 1176 s.defaultReadObject();
1189 1177 Object ex = s.readObject();
1190 1178 if (ex != null)
1191 1179 setExceptionalCompletion((Throwable) ex);
1192 1180 }
1193 1181
1194 1182 // Unsafe mechanics
1195 1183
1196 1184 private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
1197 1185 private static final long statusOffset =
1198 1186 objectFieldOffset("status", ForkJoinTask.class);
1199 1187
1200 1188 private static long objectFieldOffset(String field, Class<?> klazz) {
1201 1189 try {
1202 1190 return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
1203 1191 } catch (NoSuchFieldException e) {
1204 1192 // Convert Exception to corresponding Error
1205 1193 NoSuchFieldError error = new NoSuchFieldError(field);
1206 1194 error.initCause(e);
1207 1195 throw error;
1208 1196 }
1209 1197 }
1210 1198 }
↓ open down ↓ |
141 lines elided |
↑ open up ↑ |
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX