1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent; 37 38 import java.io.Serializable; 39 import java.lang.invoke.MethodHandles; 40 import java.lang.invoke.VarHandle; 41 import java.lang.ref.ReferenceQueue; 42 import java.lang.ref.WeakReference; 43 import java.lang.reflect.Constructor; 44 import java.util.Collection; 45 import java.util.List; 46 import java.util.RandomAccess; 47 import java.util.concurrent.locks.ReentrantLock; 48 49 /** 50 * Abstract base class for tasks that run within a {@link ForkJoinPool}. 51 * A {@code ForkJoinTask} is a thread-like entity that is much 52 * lighter weight than a normal thread. Huge numbers of tasks and 53 * subtasks may be hosted by a small number of actual threads in a 54 * ForkJoinPool, at the price of some usage limitations. 55 * 56 * <p>A "main" {@code ForkJoinTask} begins execution when it is 57 * explicitly submitted to a {@link ForkJoinPool}, or, if not already 58 * engaged in a ForkJoin computation, commenced in the {@link 59 * ForkJoinPool#commonPool()} via {@link #fork}, {@link #invoke}, or 60 * related methods. Once started, it will usually in turn start other 61 * subtasks. As indicated by the name of this class, many programs 62 * using {@code ForkJoinTask} employ only methods {@link #fork} and 63 * {@link #join}, or derivatives such as {@link 64 * #invokeAll(ForkJoinTask...) invokeAll}. However, this class also 65 * provides a number of other methods that can come into play in 66 * advanced usages, as well as extension mechanics that allow support 67 * of new forms of fork/join processing. 68 * 69 * <p>A {@code ForkJoinTask} is a lightweight form of {@link Future}. 70 * The efficiency of {@code ForkJoinTask}s stems from a set of 71 * restrictions (that are only partially statically enforceable) 72 * reflecting their main use as computational tasks calculating pure 73 * functions or operating on purely isolated objects. The primary 74 * coordination mechanisms are {@link #fork}, that arranges 75 * asynchronous execution, and {@link #join}, that doesn't proceed 76 * until the task's result has been computed. Computations should 77 * ideally avoid {@code synchronized} methods or blocks, and should 78 * minimize other blocking synchronization apart from joining other 79 * tasks or using synchronizers such as Phasers that are advertised to 80 * cooperate with fork/join scheduling. Subdividable tasks should also 81 * not perform blocking I/O, and should ideally access variables that 82 * are completely independent of those accessed by other running 83 * tasks. These guidelines are loosely enforced by not permitting 84 * checked exceptions such as {@code IOExceptions} to be 85 * thrown. However, computations may still encounter unchecked 86 * exceptions, that are rethrown to callers attempting to join 87 * them. These exceptions may additionally include {@link 88 * RejectedExecutionException} stemming from internal resource 89 * exhaustion, such as failure to allocate internal task 90 * queues. Rethrown exceptions behave in the same way as regular 91 * exceptions, but, when possible, contain stack traces (as displayed 92 * for example using {@code ex.printStackTrace()}) of both the thread 93 * that initiated the computation as well as the thread actually 94 * encountering the exception; minimally only the latter. 95 * 96 * <p>It is possible to define and use ForkJoinTasks that may block, 97 * but doing so requires three further considerations: (1) Completion 98 * of few if any <em>other</em> tasks should be dependent on a task 99 * that blocks on external synchronization or I/O. Event-style async 100 * tasks that are never joined (for example, those subclassing {@link 101 * CountedCompleter}) often fall into this category. (2) To minimize 102 * resource impact, tasks should be small; ideally performing only the 103 * (possibly) blocking action. (3) Unless the {@link 104 * ForkJoinPool.ManagedBlocker} API is used, or the number of possibly 105 * blocked tasks is known to be less than the pool's {@link 106 * ForkJoinPool#getParallelism} level, the pool cannot guarantee that 107 * enough threads will be available to ensure progress or good 108 * performance. 109 * 110 * <p>The primary method for awaiting completion and extracting 111 * results of a task is {@link #join}, but there are several variants: 112 * The {@link Future#get} methods support interruptible and/or timed 113 * waits for completion and report results using {@code Future} 114 * conventions. Method {@link #invoke} is semantically 115 * equivalent to {@code fork(); join()} but always attempts to begin 116 * execution in the current thread. The "<em>quiet</em>" forms of 117 * these methods do not extract results or report exceptions. These 118 * may be useful when a set of tasks are being executed, and you need 119 * to delay processing of results or exceptions until all complete. 120 * Method {@code invokeAll} (available in multiple versions) 121 * performs the most common form of parallel invocation: forking a set 122 * of tasks and joining them all. 123 * 124 * <p>In the most typical usages, a fork-join pair act like a call 125 * (fork) and return (join) from a parallel recursive function. As is 126 * the case with other forms of recursive calls, returns (joins) 127 * should be performed innermost-first. For example, {@code a.fork(); 128 * b.fork(); b.join(); a.join();} is likely to be substantially more 129 * efficient than joining {@code a} before {@code b}. 130 * 131 * <p>The execution status of tasks may be queried at several levels 132 * of detail: {@link #isDone} is true if a task completed in any way 133 * (including the case where a task was cancelled without executing); 134 * {@link #isCompletedNormally} is true if a task completed without 135 * cancellation or encountering an exception; {@link #isCancelled} is 136 * true if the task was cancelled (in which case {@link #getException} 137 * returns a {@link CancellationException}); and 138 * {@link #isCompletedAbnormally} is true if a task was either 139 * cancelled or encountered an exception, in which case {@link 140 * #getException} will return either the encountered exception or 141 * {@link CancellationException}. 142 * 143 * <p>The ForkJoinTask class is not usually directly subclassed. 144 * Instead, you subclass one of the abstract classes that support a 145 * particular style of fork/join processing, typically {@link 146 * RecursiveAction} for most computations that do not return results, 147 * {@link RecursiveTask} for those that do, and {@link 148 * CountedCompleter} for those in which completed actions trigger 149 * other actions. Normally, a concrete ForkJoinTask subclass declares 150 * fields comprising its parameters, established in a constructor, and 151 * then defines a {@code compute} method that somehow uses the control 152 * methods supplied by this base class. 153 * 154 * <p>Method {@link #join} and its variants are appropriate for use 155 * only when completion dependencies are acyclic; that is, the 156 * parallel computation can be described as a directed acyclic graph 157 * (DAG). Otherwise, executions may encounter a form of deadlock as 158 * tasks cyclically wait for each other. However, this framework 159 * supports other methods and techniques (for example the use of 160 * {@link Phaser}, {@link #helpQuiesce}, and {@link #complete}) that 161 * may be of use in constructing custom subclasses for problems that 162 * are not statically structured as DAGs. To support such usages, a 163 * ForkJoinTask may be atomically <em>tagged</em> with a {@code short} 164 * value using {@link #setForkJoinTaskTag} or {@link 165 * #compareAndSetForkJoinTaskTag} and checked using {@link 166 * #getForkJoinTaskTag}. The ForkJoinTask implementation does not use 167 * these {@code protected} methods or tags for any purpose, but they 168 * may be of use in the construction of specialized subclasses. For 169 * example, parallel graph traversals can use the supplied methods to 170 * avoid revisiting nodes/tasks that have already been processed. 171 * (Method names for tagging are bulky in part to encourage definition 172 * of methods that reflect their usage patterns.) 173 * 174 * <p>Most base support methods are {@code final}, to prevent 175 * overriding of implementations that are intrinsically tied to the 176 * underlying lightweight task scheduling framework. Developers 177 * creating new basic styles of fork/join processing should minimally 178 * implement {@code protected} methods {@link #exec}, {@link 179 * #setRawResult}, and {@link #getRawResult}, while also introducing 180 * an abstract computational method that can be implemented in its 181 * subclasses, possibly relying on other {@code protected} methods 182 * provided by this class. 183 * 184 * <p>ForkJoinTasks should perform relatively small amounts of 185 * computation. Large tasks should be split into smaller subtasks, 186 * usually via recursive decomposition. As a very rough rule of thumb, 187 * a task should perform more than 100 and less than 10000 basic 188 * computational steps, and should avoid indefinite looping. If tasks 189 * are too big, then parallelism cannot improve throughput. If too 190 * small, then memory and internal task maintenance overhead may 191 * overwhelm processing. 192 * 193 * <p>This class provides {@code adapt} methods for {@link Runnable} 194 * and {@link Callable}, that may be of use when mixing execution of 195 * {@code ForkJoinTasks} with other kinds of tasks. When all tasks are 196 * of this form, consider using a pool constructed in <em>asyncMode</em>. 197 * 198 * <p>ForkJoinTasks are {@code Serializable}, which enables them to be 199 * used in extensions such as remote execution frameworks. It is 200 * sensible to serialize tasks only before or after, but not during, 201 * execution. Serialization is not relied on during execution itself. 202 * 203 * @since 1.7 204 * @author Doug Lea 205 */ 206 public abstract class ForkJoinTask<V> implements Future<V>, Serializable { 207 208 /* 209 * See the internal documentation of class ForkJoinPool for a 210 * general implementation overview. ForkJoinTasks are mainly 211 * responsible for maintaining their "status" field amidst relays 212 * to methods in ForkJoinWorkerThread and ForkJoinPool. 213 * 214 * The methods of this class are more-or-less layered into 215 * (1) basic status maintenance 216 * (2) execution and awaiting completion 217 * (3) user-level methods that additionally report results. 218 * This is sometimes hard to see because this file orders exported 219 * methods in a way that flows well in javadocs. 220 */ 221 222 /** 223 * The status field holds run control status bits packed into a 224 * single int to ensure atomicity. Status is initially zero, and 225 * takes on nonnegative values until completed, upon which it 226 * holds (sign bit) DONE, possibly with ABNORMAL (cancelled or 227 * exceptional) and THROWN (in which case an exception has been 228 * stored). Tasks with dependent blocked waiting joiners have the 229 * SIGNAL bit set. Completion of a task with SIGNAL set awakens 230 * any waiters via notifyAll. (Waiters also help signal others 231 * upon completion.) 232 * 233 * These control bits occupy only (some of) the upper half (16 234 * bits) of status field. The lower bits are used for user-defined 235 * tags. 236 */ 237 volatile int status; // accessed directly by pool and workers 238 239 private static final int DONE = 1 << 31; // must be negative 240 private static final int ABNORMAL = 1 << 18; // set atomically with DONE 241 private static final int THROWN = 1 << 17; // set atomically with ABNORMAL 242 private static final int SIGNAL = 1 << 16; // true if joiner waiting 243 private static final int SMASK = 0xffff; // short bits for tags 244 245 /** 246 * Constructor for subclasses to call. 247 */ 248 public ForkJoinTask() {} 249 250 static boolean isExceptionalStatus(int s) { // needed by subclasses 251 return (s & THROWN) != 0; 252 } 253 254 /** 255 * Sets DONE status and wakes up threads waiting to join this task. 256 * 257 * @return status on exit 258 */ 259 private int setDone() { 260 int s; 261 if (((s = (int)STATUS.getAndBitwiseOr(this, DONE)) & SIGNAL) != 0) 262 synchronized (this) { notifyAll(); } 263 return s | DONE; 264 } 265 266 /** 267 * Marks cancelled or exceptional completion unless already done. 268 * 269 * @param completion must be DONE | ABNORMAL, ORed with THROWN if exceptional 270 * @return status on exit 271 */ 272 private int abnormalCompletion(int completion) { 273 for (int s, ns;;) { 274 if ((s = status) < 0) 275 return s; 276 else if (STATUS.weakCompareAndSet(this, s, ns = s | completion)) { 277 if ((s & SIGNAL) != 0) 278 synchronized (this) { notifyAll(); } 279 return ns; 280 } 281 } 282 } 283 284 /** 285 * Primary execution method for stolen tasks. Unless done, calls 286 * exec and records status if completed, but doesn't wait for 287 * completion otherwise. 288 * 289 * @return status on exit from this method 290 */ 291 final int doExec() { 292 int s; boolean completed; 293 if ((s = status) >= 0) { 294 try { 295 completed = exec(); 296 } catch (Throwable rex) { 297 completed = false; 298 s = setExceptionalCompletion(rex); 299 } 300 if (completed) 301 s = setDone(); 302 } 303 return s; 304 } 305 306 /** 307 * If not done, sets SIGNAL status and performs Object.wait(timeout). 308 * This task may or may not be done on exit. Ignores interrupts. 309 * 310 * @param timeout using Object.wait conventions. 311 */ 312 final void internalWait(long timeout) { 313 if ((int)STATUS.getAndBitwiseOr(this, SIGNAL) >= 0) { 314 synchronized (this) { 315 if (status >= 0) 316 try { wait(timeout); } catch (InterruptedException ie) { } 317 else 318 notifyAll(); 319 } 320 } 321 } 322 323 /** 324 * Blocks a non-worker-thread until completion. 325 * @return status upon completion 326 */ 327 private int externalAwaitDone() { 328 int s = tryExternalHelp(); 329 if (s >= 0 && (s = (int)STATUS.getAndBitwiseOr(this, SIGNAL)) >= 0) { 330 boolean interrupted = false; 331 synchronized (this) { 332 for (;;) { 333 if ((s = status) >= 0) { 334 try { 335 wait(0L); 336 } catch (InterruptedException ie) { 337 interrupted = true; 338 } 339 } 340 else { 341 notifyAll(); 342 break; 343 } 344 } 345 } 346 if (interrupted) 347 Thread.currentThread().interrupt(); 348 } 349 return s; 350 } 351 352 /** 353 * Blocks a non-worker-thread until completion or interruption. 354 */ 355 private int externalInterruptibleAwaitDone() throws InterruptedException { 356 int s = tryExternalHelp(); 357 if (s >= 0 && (s = (int)STATUS.getAndBitwiseOr(this, SIGNAL)) >= 0) { 358 synchronized (this) { 359 for (;;) { 360 if ((s = status) >= 0) 361 wait(0L); 362 else { 363 notifyAll(); 364 break; 365 } 366 } 367 } 368 } 369 else if (Thread.interrupted()) 370 throw new InterruptedException(); 371 return s; 372 } 373 374 /** 375 * Tries to help with tasks allowed for external callers. 376 * 377 * @return current status 378 */ 379 private int tryExternalHelp() { 380 int s; 381 return ((s = status) < 0 ? s: 382 (this instanceof CountedCompleter) ? 383 ForkJoinPool.common.externalHelpComplete( 384 (CountedCompleter<?>)this, 0) : 385 ForkJoinPool.common.tryExternalUnpush(this) ? 386 doExec() : 0); 387 } 388 389 /** 390 * Implementation for join, get, quietlyJoin. Directly handles 391 * only cases of already-completed, external wait, and 392 * unfork+exec. Others are relayed to ForkJoinPool.awaitJoin. 393 * 394 * @return status upon completion 395 */ 396 private int doJoin() { 397 int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; 398 return (s = status) < 0 ? s : 399 ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? 400 (w = (wt = (ForkJoinWorkerThread)t).workQueue). 401 tryUnpush(this) && (s = doExec()) < 0 ? s : 402 wt.pool.awaitJoin(w, this, 0L) : 403 externalAwaitDone(); 404 } 405 406 /** 407 * Implementation for invoke, quietlyInvoke. 408 * 409 * @return status upon completion 410 */ 411 private int doInvoke() { 412 int s; Thread t; ForkJoinWorkerThread wt; 413 return (s = doExec()) < 0 ? s : 414 ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? 415 (wt = (ForkJoinWorkerThread)t).pool. 416 awaitJoin(wt.workQueue, this, 0L) : 417 externalAwaitDone(); 418 } 419 420 // Exception table support 421 422 /** 423 * Hash table of exceptions thrown by tasks, to enable reporting 424 * by callers. Because exceptions are rare, we don't directly keep 425 * them with task objects, but instead use a weak ref table. Note 426 * that cancellation exceptions don't appear in the table, but are 427 * instead recorded as status values. 428 * 429 * The exception table has a fixed capacity. 430 */ 431 private static final ExceptionNode[] exceptionTable 432 = new ExceptionNode[32]; 433 434 /** Lock protecting access to exceptionTable. */ 435 private static final ReentrantLock exceptionTableLock 436 = new ReentrantLock(); 437 438 /** Reference queue of stale exceptionally completed tasks. */ 439 private static final ReferenceQueue<ForkJoinTask<?>> exceptionTableRefQueue 440 = new ReferenceQueue<>(); 441 442 /** 443 * Key-value nodes for exception table. The chained hash table 444 * uses identity comparisons, full locking, and weak references 445 * for keys. The table has a fixed capacity because it only 446 * maintains task exceptions long enough for joiners to access 447 * them, so should never become very large for sustained 448 * periods. However, since we do not know when the last joiner 449 * completes, we must use weak references and expunge them. We do 450 * so on each operation (hence full locking). Also, some thread in 451 * any ForkJoinPool will call helpExpungeStaleExceptions when its 452 * pool becomes isQuiescent. 453 */ 454 static final class ExceptionNode extends WeakReference<ForkJoinTask<?>> { 455 final Throwable ex; 456 ExceptionNode next; 457 final long thrower; // use id not ref to avoid weak cycles 458 final int hashCode; // store task hashCode before weak ref disappears 459 ExceptionNode(ForkJoinTask<?> task, Throwable ex, ExceptionNode next, 460 ReferenceQueue<ForkJoinTask<?>> exceptionTableRefQueue) { 461 super(task, exceptionTableRefQueue); 462 this.ex = ex; 463 this.next = next; 464 this.thrower = Thread.currentThread().getId(); 465 this.hashCode = System.identityHashCode(task); 466 } 467 } 468 469 /** 470 * Records exception and sets status. 471 * 472 * @return status on exit 473 */ 474 final int recordExceptionalCompletion(Throwable ex) { 475 int s; 476 if ((s = status) >= 0) { 477 int h = System.identityHashCode(this); 478 final ReentrantLock lock = exceptionTableLock; 479 lock.lock(); 480 try { 481 expungeStaleExceptions(); 482 ExceptionNode[] t = exceptionTable; 483 int i = h & (t.length - 1); 484 for (ExceptionNode e = t[i]; ; e = e.next) { 485 if (e == null) { 486 t[i] = new ExceptionNode(this, ex, t[i], 487 exceptionTableRefQueue); 488 break; 489 } 490 if (e.get() == this) // already present 491 break; 492 } 493 } finally { 494 lock.unlock(); 495 } 496 s = abnormalCompletion(DONE | ABNORMAL | THROWN); 497 } 498 return s; 499 } 500 501 /** 502 * Records exception and possibly propagates. 503 * 504 * @return status on exit 505 */ 506 private int setExceptionalCompletion(Throwable ex) { 507 int s = recordExceptionalCompletion(ex); 508 if ((s & THROWN) != 0) 509 internalPropagateException(ex); 510 return s; 511 } 512 513 /** 514 * Hook for exception propagation support for tasks with completers. 515 */ 516 void internalPropagateException(Throwable ex) { 517 } 518 519 /** 520 * Cancels, ignoring any exceptions thrown by cancel. Used during 521 * worker and pool shutdown. Cancel is spec'ed not to throw any 522 * exceptions, but if it does anyway, we have no recourse during 523 * shutdown, so guard against this case. 524 */ 525 static final void cancelIgnoringExceptions(ForkJoinTask<?> t) { 526 if (t != null && t.status >= 0) { 527 try { 528 t.cancel(false); 529 } catch (Throwable ignore) { 530 } 531 } 532 } 533 534 /** 535 * Removes exception node and clears status. 536 */ 537 private void clearExceptionalCompletion() { 538 int h = System.identityHashCode(this); 539 final ReentrantLock lock = exceptionTableLock; 540 lock.lock(); 541 try { 542 ExceptionNode[] t = exceptionTable; 543 int i = h & (t.length - 1); 544 ExceptionNode e = t[i]; 545 ExceptionNode pred = null; 546 while (e != null) { 547 ExceptionNode next = e.next; 548 if (e.get() == this) { 549 if (pred == null) 550 t[i] = next; 551 else 552 pred.next = next; 553 break; 554 } 555 pred = e; 556 e = next; 557 } 558 expungeStaleExceptions(); 559 status = 0; 560 } finally { 561 lock.unlock(); 562 } 563 } 564 565 /** 566 * Returns a rethrowable exception for this task, if available. 567 * To provide accurate stack traces, if the exception was not 568 * thrown by the current thread, we try to create a new exception 569 * of the same type as the one thrown, but with the recorded 570 * exception as its cause. If there is no such constructor, we 571 * instead try to use a no-arg constructor, followed by initCause, 572 * to the same effect. If none of these apply, or any fail due to 573 * other exceptions, we return the recorded exception, which is 574 * still correct, although it may contain a misleading stack 575 * trace. 576 * 577 * @return the exception, or null if none 578 */ 579 private Throwable getThrowableException() { 580 int h = System.identityHashCode(this); 581 ExceptionNode e; 582 final ReentrantLock lock = exceptionTableLock; 583 lock.lock(); 584 try { 585 expungeStaleExceptions(); 586 ExceptionNode[] t = exceptionTable; 587 e = t[h & (t.length - 1)]; 588 while (e != null && e.get() != this) 589 e = e.next; 590 } finally { 591 lock.unlock(); 592 } 593 Throwable ex; 594 if (e == null || (ex = e.ex) == null) 595 return null; 596 if (e.thrower != Thread.currentThread().getId()) { 597 try { 598 Constructor<?> noArgCtor = null; 599 // public ctors only 600 for (Constructor<?> c : ex.getClass().getConstructors()) { 601 Class<?>[] ps = c.getParameterTypes(); 602 if (ps.length == 0) 603 noArgCtor = c; 604 else if (ps.length == 1 && ps[0] == Throwable.class) 605 return (Throwable)c.newInstance(ex); 606 } 607 if (noArgCtor != null) { 608 Throwable wx = (Throwable)noArgCtor.newInstance(); 609 wx.initCause(ex); 610 return wx; 611 } 612 } catch (Exception ignore) { 613 } 614 } 615 return ex; 616 } 617 618 /** 619 * Polls stale refs and removes them. Call only while holding lock. 620 */ 621 private static void expungeStaleExceptions() { 622 for (Object x; (x = exceptionTableRefQueue.poll()) != null;) { 623 if (x instanceof ExceptionNode) { 624 ExceptionNode[] t = exceptionTable; 625 int i = ((ExceptionNode)x).hashCode & (t.length - 1); 626 ExceptionNode e = t[i]; 627 ExceptionNode pred = null; 628 while (e != null) { 629 ExceptionNode next = e.next; 630 if (e == x) { 631 if (pred == null) 632 t[i] = next; 633 else 634 pred.next = next; 635 break; 636 } 637 pred = e; 638 e = next; 639 } 640 } 641 } 642 } 643 644 /** 645 * If lock is available, polls stale refs and removes them. 646 * Called from ForkJoinPool when pools become quiescent. 647 */ 648 static final void helpExpungeStaleExceptions() { 649 final ReentrantLock lock = exceptionTableLock; 650 if (lock.tryLock()) { 651 try { 652 expungeStaleExceptions(); 653 } finally { 654 lock.unlock(); 655 } 656 } 657 } 658 659 /** 660 * A version of "sneaky throw" to relay exceptions. 661 */ 662 static void rethrow(Throwable ex) { 663 ForkJoinTask.<RuntimeException>uncheckedThrow(ex); 664 } 665 666 /** 667 * The sneaky part of sneaky throw, relying on generics 668 * limitations to evade compiler complaints about rethrowing 669 * unchecked exceptions. 670 */ 671 @SuppressWarnings("unchecked") static <T extends Throwable> 672 void uncheckedThrow(Throwable t) throws T { 673 if (t != null) 674 throw (T)t; // rely on vacuous cast 675 else 676 throw new Error("Unknown Exception"); 677 } 678 679 /** 680 * Throws exception, if any, associated with the given status. 681 */ 682 private void reportException(int s) { 683 rethrow((s & THROWN) != 0 ? getThrowableException() : 684 new CancellationException()); 685 } 686 687 // public methods 688 689 /** 690 * Arranges to asynchronously execute this task in the pool the 691 * current task is running in, if applicable, or using the {@link 692 * ForkJoinPool#commonPool()} if not {@link #inForkJoinPool}. While 693 * it is not necessarily enforced, it is a usage error to fork a 694 * task more than once unless it has completed and been 695 * reinitialized. Subsequent modifications to the state of this 696 * task or any data it operates on are not necessarily 697 * consistently observable by any thread other than the one 698 * executing it unless preceded by a call to {@link #join} or 699 * related methods, or a call to {@link #isDone} returning {@code 700 * true}. 701 * 702 * @return {@code this}, to simplify usage 703 */ 704 public final ForkJoinTask<V> fork() { 705 Thread t; 706 if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) 707 ((ForkJoinWorkerThread)t).workQueue.push(this); 708 else 709 ForkJoinPool.common.externalPush(this); 710 return this; 711 } 712 713 /** 714 * Returns the result of the computation when it 715 * {@linkplain #isDone is done}. 716 * This method differs from {@link #get()} in that abnormal 717 * completion results in {@code RuntimeException} or {@code Error}, 718 * not {@code ExecutionException}, and that interrupts of the 719 * calling thread do <em>not</em> cause the method to abruptly 720 * return by throwing {@code InterruptedException}. 721 * 722 * @return the computed result 723 */ 724 public final V join() { 725 int s; 726 if (((s = doJoin()) & ABNORMAL) != 0) 727 reportException(s); 728 return getRawResult(); 729 } 730 731 /** 732 * Commences performing this task, awaits its completion if 733 * necessary, and returns its result, or throws an (unchecked) 734 * {@code RuntimeException} or {@code Error} if the underlying 735 * computation did so. 736 * 737 * @return the computed result 738 */ 739 public final V invoke() { 740 int s; 741 if (((s = doInvoke()) & ABNORMAL) != 0) 742 reportException(s); 743 return getRawResult(); 744 } 745 746 /** 747 * Forks the given tasks, returning when {@code isDone} holds for 748 * each task or an (unchecked) exception is encountered, in which 749 * case the exception is rethrown. If more than one task 750 * encounters an exception, then this method throws any one of 751 * these exceptions. If any task encounters an exception, the 752 * other may be cancelled. However, the execution status of 753 * individual tasks is not guaranteed upon exceptional return. The 754 * status of each task may be obtained using {@link 755 * #getException()} and related methods to check if they have been 756 * cancelled, completed normally or exceptionally, or left 757 * unprocessed. 758 * 759 * @param t1 the first task 760 * @param t2 the second task 761 * @throws NullPointerException if any task is null 762 */ 763 public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) { 764 int s1, s2; 765 t2.fork(); 766 if (((s1 = t1.doInvoke()) & ABNORMAL) != 0) 767 t1.reportException(s1); 768 if (((s2 = t2.doJoin()) & ABNORMAL) != 0) 769 t2.reportException(s2); 770 } 771 772 /** 773 * Forks the given tasks, returning when {@code isDone} holds for 774 * each task or an (unchecked) exception is encountered, in which 775 * case the exception is rethrown. If more than one task 776 * encounters an exception, then this method throws any one of 777 * these exceptions. If any task encounters an exception, others 778 * may be cancelled. However, the execution status of individual 779 * tasks is not guaranteed upon exceptional return. The status of 780 * each task may be obtained using {@link #getException()} and 781 * related methods to check if they have been cancelled, completed 782 * normally or exceptionally, or left unprocessed. 783 * 784 * @param tasks the tasks 785 * @throws NullPointerException if any task is null 786 */ 787 public static void invokeAll(ForkJoinTask<?>... tasks) { 788 Throwable ex = null; 789 int last = tasks.length - 1; 790 for (int i = last; i >= 0; --i) { 791 ForkJoinTask<?> t = tasks[i]; 792 if (t == null) { 793 if (ex == null) 794 ex = new NullPointerException(); 795 } 796 else if (i != 0) 797 t.fork(); 798 else if ((t.doInvoke() & ABNORMAL) != 0 && ex == null) 799 ex = t.getException(); 800 } 801 for (int i = 1; i <= last; ++i) { 802 ForkJoinTask<?> t = tasks[i]; 803 if (t != null) { 804 if (ex != null) 805 t.cancel(false); 806 else if ((t.doJoin() & ABNORMAL) != 0) 807 ex = t.getException(); 808 } 809 } 810 if (ex != null) 811 rethrow(ex); 812 } 813 814 /** 815 * Forks all tasks in the specified collection, returning when 816 * {@code isDone} holds for each task or an (unchecked) exception 817 * is encountered, in which case the exception is rethrown. If 818 * more than one task encounters an exception, then this method 819 * throws any one of these exceptions. If any task encounters an 820 * exception, others may be cancelled. However, the execution 821 * status of individual tasks is not guaranteed upon exceptional 822 * return. The status of each task may be obtained using {@link 823 * #getException()} and related methods to check if they have been 824 * cancelled, completed normally or exceptionally, or left 825 * unprocessed. 826 * 827 * @param tasks the collection of tasks 828 * @param <T> the type of the values returned from the tasks 829 * @return the tasks argument, to simplify usage 830 * @throws NullPointerException if tasks or any element are null 831 */ 832 public static <T extends ForkJoinTask<?>> Collection<T> invokeAll(Collection<T> tasks) { 833 if (!(tasks instanceof RandomAccess) || !(tasks instanceof List<?>)) { 834 invokeAll(tasks.toArray(new ForkJoinTask<?>[0])); 835 return tasks; 836 } 837 @SuppressWarnings("unchecked") 838 List<? extends ForkJoinTask<?>> ts = 839 (List<? extends ForkJoinTask<?>>) tasks; 840 Throwable ex = null; 841 int last = ts.size() - 1; 842 for (int i = last; i >= 0; --i) { 843 ForkJoinTask<?> t = ts.get(i); 844 if (t == null) { 845 if (ex == null) 846 ex = new NullPointerException(); 847 } 848 else if (i != 0) 849 t.fork(); 850 else if ((t.doInvoke() & ABNORMAL) != 0 && ex == null) 851 ex = t.getException(); 852 } 853 for (int i = 1; i <= last; ++i) { 854 ForkJoinTask<?> t = ts.get(i); 855 if (t != null) { 856 if (ex != null) 857 t.cancel(false); 858 else if ((t.doJoin() & ABNORMAL) != 0) 859 ex = t.getException(); 860 } 861 } 862 if (ex != null) 863 rethrow(ex); 864 return tasks; 865 } 866 867 /** 868 * Attempts to cancel execution of this task. This attempt will 869 * fail if the task has already completed or could not be 870 * cancelled for some other reason. If successful, and this task 871 * has not started when {@code cancel} is called, execution of 872 * this task is suppressed. After this method returns 873 * successfully, unless there is an intervening call to {@link 874 * #reinitialize}, subsequent calls to {@link #isCancelled}, 875 * {@link #isDone}, and {@code cancel} will return {@code true} 876 * and calls to {@link #join} and related methods will result in 877 * {@code CancellationException}. 878 * 879 * <p>This method may be overridden in subclasses, but if so, must 880 * still ensure that these properties hold. In particular, the 881 * {@code cancel} method itself must not throw exceptions. 882 * 883 * <p>This method is designed to be invoked by <em>other</em> 884 * tasks. To terminate the current task, you can just return or 885 * throw an unchecked exception from its computation method, or 886 * invoke {@link #completeExceptionally(Throwable)}. 887 * 888 * @param mayInterruptIfRunning this value has no effect in the 889 * default implementation because interrupts are not used to 890 * control cancellation. 891 * 892 * @return {@code true} if this task is now cancelled 893 */ 894 public boolean cancel(boolean mayInterruptIfRunning) { 895 int s = abnormalCompletion(DONE | ABNORMAL); 896 return (s & (ABNORMAL | THROWN)) == ABNORMAL; 897 } 898 899 public final boolean isDone() { 900 return status < 0; 901 } 902 903 public final boolean isCancelled() { 904 return (status & (ABNORMAL | THROWN)) == ABNORMAL; 905 } 906 907 /** 908 * Returns {@code true} if this task threw an exception or was cancelled. 909 * 910 * @return {@code true} if this task threw an exception or was cancelled 911 */ 912 public final boolean isCompletedAbnormally() { 913 return (status & ABNORMAL) != 0; 914 } 915 916 /** 917 * Returns {@code true} if this task completed without throwing an 918 * exception and was not cancelled. 919 * 920 * @return {@code true} if this task completed without throwing an 921 * exception and was not cancelled 922 */ 923 public final boolean isCompletedNormally() { 924 return (status & (DONE | ABNORMAL)) == DONE; 925 } 926 927 /** 928 * Returns the exception thrown by the base computation, or a 929 * {@code CancellationException} if cancelled, or {@code null} if 930 * none or if the method has not yet completed. 931 * 932 * @return the exception, or {@code null} if none 933 */ 934 public final Throwable getException() { 935 int s = status; 936 return ((s & ABNORMAL) == 0 ? null : 937 (s & THROWN) == 0 ? new CancellationException() : 938 getThrowableException()); 939 } 940 941 /** 942 * Completes this task abnormally, and if not already aborted or 943 * cancelled, causes it to throw the given exception upon 944 * {@code join} and related operations. This method may be used 945 * to induce exceptions in asynchronous tasks, or to force 946 * completion of tasks that would not otherwise complete. Its use 947 * in other situations is discouraged. This method is 948 * overridable, but overridden versions must invoke {@code super} 949 * implementation to maintain guarantees. 950 * 951 * @param ex the exception to throw. If this exception is not a 952 * {@code RuntimeException} or {@code Error}, the actual exception 953 * thrown will be a {@code RuntimeException} with cause {@code ex}. 954 */ 955 public void completeExceptionally(Throwable ex) { 956 setExceptionalCompletion((ex instanceof RuntimeException) || 957 (ex instanceof Error) ? ex : 958 new RuntimeException(ex)); 959 } 960 961 /** 962 * Completes this task, and if not already aborted or cancelled, 963 * returning the given value as the result of subsequent 964 * invocations of {@code join} and related operations. This method 965 * may be used to provide results for asynchronous tasks, or to 966 * provide alternative handling for tasks that would not otherwise 967 * complete normally. Its use in other situations is 968 * discouraged. This method is overridable, but overridden 969 * versions must invoke {@code super} implementation to maintain 970 * guarantees. 971 * 972 * @param value the result value for this task 973 */ 974 public void complete(V value) { 975 try { 976 setRawResult(value); 977 } catch (Throwable rex) { 978 setExceptionalCompletion(rex); 979 return; 980 } 981 setDone(); 982 } 983 984 /** 985 * Completes this task normally without setting a value. The most 986 * recent value established by {@link #setRawResult} (or {@code 987 * null} by default) will be returned as the result of subsequent 988 * invocations of {@code join} and related operations. 989 * 990 * @since 1.8 991 */ 992 public final void quietlyComplete() { 993 setDone(); 994 } 995 996 /** 997 * Waits if necessary for the computation to complete, and then 998 * retrieves its result. 999 * 1000 * @return the computed result 1001 * @throws CancellationException if the computation was cancelled 1002 * @throws ExecutionException if the computation threw an 1003 * exception 1004 * @throws InterruptedException if the current thread is not a 1005 * member of a ForkJoinPool and was interrupted while waiting 1006 */ 1007 public final V get() throws InterruptedException, ExecutionException { 1008 int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ? 1009 doJoin() : externalInterruptibleAwaitDone(); 1010 if ((s & THROWN) != 0) 1011 throw new ExecutionException(getThrowableException()); 1012 else if ((s & ABNORMAL) != 0) 1013 throw new CancellationException(); 1014 else 1015 return getRawResult(); 1016 } 1017 1018 /** 1019 * Waits if necessary for at most the given time for the computation 1020 * to complete, and then retrieves its result, if available. 1021 * 1022 * @param timeout the maximum time to wait 1023 * @param unit the time unit of the timeout argument 1024 * @return the computed result 1025 * @throws CancellationException if the computation was cancelled 1026 * @throws ExecutionException if the computation threw an 1027 * exception 1028 * @throws InterruptedException if the current thread is not a 1029 * member of a ForkJoinPool and was interrupted while waiting 1030 * @throws TimeoutException if the wait timed out 1031 */ 1032 public final V get(long timeout, TimeUnit unit) 1033 throws InterruptedException, ExecutionException, TimeoutException { 1034 int s; 1035 long nanos = unit.toNanos(timeout); 1036 if (Thread.interrupted()) 1037 throw new InterruptedException(); 1038 if ((s = status) >= 0 && nanos > 0L) { 1039 long d = System.nanoTime() + nanos; 1040 long deadline = (d == 0L) ? 1L : d; // avoid 0 1041 Thread t = Thread.currentThread(); 1042 if (t instanceof ForkJoinWorkerThread) { 1043 ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t; 1044 s = wt.pool.awaitJoin(wt.workQueue, this, deadline); 1045 } 1046 else if ((s = ((this instanceof CountedCompleter) ? 1047 ForkJoinPool.common.externalHelpComplete( 1048 (CountedCompleter<?>)this, 0) : 1049 ForkJoinPool.common.tryExternalUnpush(this) ? 1050 doExec() : 0)) >= 0) { 1051 long ns, ms; // measure in nanosecs, but wait in millisecs 1052 while ((s = status) >= 0 && 1053 (ns = deadline - System.nanoTime()) > 0L) { 1054 if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) > 0L && 1055 (s = (int)STATUS.getAndBitwiseOr(this, SIGNAL)) >= 0) { 1056 synchronized (this) { 1057 if (status >= 0) 1058 wait(ms); // OK to throw InterruptedException 1059 else 1060 notifyAll(); 1061 } 1062 } 1063 } 1064 } 1065 } 1066 if (s >= 0) 1067 throw new TimeoutException(); 1068 else if ((s & THROWN) != 0) 1069 throw new ExecutionException(getThrowableException()); 1070 else if ((s & ABNORMAL) != 0) 1071 throw new CancellationException(); 1072 else 1073 return getRawResult(); 1074 } 1075 1076 /** 1077 * Joins this task, without returning its result or throwing its 1078 * exception. This method may be useful when processing 1079 * collections of tasks when some have been cancelled or otherwise 1080 * known to have aborted. 1081 */ 1082 public final void quietlyJoin() { 1083 doJoin(); 1084 } 1085 1086 /** 1087 * Commences performing this task and awaits its completion if 1088 * necessary, without returning its result or throwing its 1089 * exception. 1090 */ 1091 public final void quietlyInvoke() { 1092 doInvoke(); 1093 } 1094 1095 /** 1096 * Possibly executes tasks until the pool hosting the current task 1097 * {@linkplain ForkJoinPool#isQuiescent is quiescent}. This 1098 * method may be of use in designs in which many tasks are forked, 1099 * but none are explicitly joined, instead executing them until 1100 * all are processed. 1101 */ 1102 public static void helpQuiesce() { 1103 Thread t; 1104 if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) { 1105 ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t; 1106 wt.pool.helpQuiescePool(wt.workQueue); 1107 } 1108 else 1109 ForkJoinPool.quiesceCommonPool(); 1110 } 1111 1112 /** 1113 * Resets the internal bookkeeping state of this task, allowing a 1114 * subsequent {@code fork}. This method allows repeated reuse of 1115 * this task, but only if reuse occurs when this task has either 1116 * never been forked, or has been forked, then completed and all 1117 * outstanding joins of this task have also completed. Effects 1118 * under any other usage conditions are not guaranteed. 1119 * This method may be useful when executing 1120 * pre-constructed trees of subtasks in loops. 1121 * 1122 * <p>Upon completion of this method, {@code isDone()} reports 1123 * {@code false}, and {@code getException()} reports {@code 1124 * null}. However, the value returned by {@code getRawResult} is 1125 * unaffected. To clear this value, you can invoke {@code 1126 * setRawResult(null)}. 1127 */ 1128 public void reinitialize() { 1129 if ((status & THROWN) != 0) 1130 clearExceptionalCompletion(); 1131 else 1132 status = 0; 1133 } 1134 1135 /** 1136 * Returns the pool hosting the current thread, or {@code null} 1137 * if the current thread is executing outside of any ForkJoinPool. 1138 * 1139 * <p>This method returns {@code null} if and only if {@link 1140 * #inForkJoinPool} returns {@code false}. 1141 * 1142 * @return the pool, or {@code null} if none 1143 */ 1144 public static ForkJoinPool getPool() { 1145 Thread t = Thread.currentThread(); 1146 return (t instanceof ForkJoinWorkerThread) ? 1147 ((ForkJoinWorkerThread) t).pool : null; 1148 } 1149 1150 /** 1151 * Returns {@code true} if the current thread is a {@link 1152 * ForkJoinWorkerThread} executing as a ForkJoinPool computation. 1153 * 1154 * @return {@code true} if the current thread is a {@link 1155 * ForkJoinWorkerThread} executing as a ForkJoinPool computation, 1156 * or {@code false} otherwise 1157 */ 1158 public static boolean inForkJoinPool() { 1159 return Thread.currentThread() instanceof ForkJoinWorkerThread; 1160 } 1161 1162 /** 1163 * Tries to unschedule this task for execution. This method will 1164 * typically (but is not guaranteed to) succeed if this task is 1165 * the most recently forked task by the current thread, and has 1166 * not commenced executing in another thread. This method may be 1167 * useful when arranging alternative local processing of tasks 1168 * that could have been, but were not, stolen. 1169 * 1170 * @return {@code true} if unforked 1171 */ 1172 public boolean tryUnfork() { 1173 Thread t; 1174 return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? 1175 ((ForkJoinWorkerThread)t).workQueue.tryUnpush(this) : 1176 ForkJoinPool.common.tryExternalUnpush(this)); 1177 } 1178 1179 /** 1180 * Returns an estimate of the number of tasks that have been 1181 * forked by the current worker thread but not yet executed. This 1182 * value may be useful for heuristic decisions about whether to 1183 * fork other tasks. 1184 * 1185 * @return the number of tasks 1186 */ 1187 public static int getQueuedTaskCount() { 1188 Thread t; ForkJoinPool.WorkQueue q; 1189 if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) 1190 q = ((ForkJoinWorkerThread)t).workQueue; 1191 else 1192 q = ForkJoinPool.commonSubmitterQueue(); 1193 return (q == null) ? 0 : q.queueSize(); 1194 } 1195 1196 /** 1197 * Returns an estimate of how many more locally queued tasks are 1198 * held by the current worker thread than there are other worker 1199 * threads that might steal them, or zero if this thread is not 1200 * operating in a ForkJoinPool. This value may be useful for 1201 * heuristic decisions about whether to fork other tasks. In many 1202 * usages of ForkJoinTasks, at steady state, each worker should 1203 * aim to maintain a small constant surplus (for example, 3) of 1204 * tasks, and to process computations locally if this threshold is 1205 * exceeded. 1206 * 1207 * @return the surplus number of tasks, which may be negative 1208 */ 1209 public static int getSurplusQueuedTaskCount() { 1210 return ForkJoinPool.getSurplusQueuedTaskCount(); 1211 } 1212 1213 // Extension methods 1214 1215 /** 1216 * Returns the result that would be returned by {@link #join}, even 1217 * if this task completed abnormally, or {@code null} if this task 1218 * is not known to have been completed. This method is designed 1219 * to aid debugging, as well as to support extensions. Its use in 1220 * any other context is discouraged. 1221 * 1222 * @return the result, or {@code null} if not completed 1223 */ 1224 public abstract V getRawResult(); 1225 1226 /** 1227 * Forces the given value to be returned as a result. This method 1228 * is designed to support extensions, and should not in general be 1229 * called otherwise. 1230 * 1231 * @param value the value 1232 */ 1233 protected abstract void setRawResult(V value); 1234 1235 /** 1236 * Immediately performs the base action of this task and returns 1237 * true if, upon return from this method, this task is guaranteed 1238 * to have completed. This method may return false otherwise, to 1239 * indicate that this task is not necessarily complete (or is not 1240 * known to be complete), for example in asynchronous actions that 1241 * require explicit invocations of completion methods. This method 1242 * may also throw an (unchecked) exception to indicate abnormal 1243 * exit. This method is designed to support extensions, and should 1244 * not in general be called otherwise. 1245 * 1246 * @return {@code true} if this task is known to have completed normally 1247 */ 1248 protected abstract boolean exec(); 1249 1250 /** 1251 * Returns, but does not unschedule or execute, a task queued by 1252 * the current thread but not yet executed, if one is immediately 1253 * available. There is no guarantee that this task will actually 1254 * be polled or executed next. Conversely, this method may return 1255 * null even if a task exists but cannot be accessed without 1256 * contention with other threads. This method is designed 1257 * primarily to support extensions, and is unlikely to be useful 1258 * otherwise. 1259 * 1260 * @return the next task, or {@code null} if none are available 1261 */ 1262 protected static ForkJoinTask<?> peekNextLocalTask() { 1263 Thread t; ForkJoinPool.WorkQueue q; 1264 if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) 1265 q = ((ForkJoinWorkerThread)t).workQueue; 1266 else 1267 q = ForkJoinPool.commonSubmitterQueue(); 1268 return (q == null) ? null : q.peek(); 1269 } 1270 1271 /** 1272 * Unschedules and returns, without executing, the next task 1273 * queued by the current thread but not yet executed, if the 1274 * current thread is operating in a ForkJoinPool. This method is 1275 * designed primarily to support extensions, and is unlikely to be 1276 * useful otherwise. 1277 * 1278 * @return the next task, or {@code null} if none are available 1279 */ 1280 protected static ForkJoinTask<?> pollNextLocalTask() { 1281 Thread t; 1282 return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? 1283 ((ForkJoinWorkerThread)t).workQueue.nextLocalTask() : 1284 null; 1285 } 1286 1287 /** 1288 * If the current thread is operating in a ForkJoinPool, 1289 * unschedules and returns, without executing, the next task 1290 * queued by the current thread but not yet executed, if one is 1291 * available, or if not available, a task that was forked by some 1292 * other thread, if available. Availability may be transient, so a 1293 * {@code null} result does not necessarily imply quiescence of 1294 * the pool this task is operating in. This method is designed 1295 * primarily to support extensions, and is unlikely to be useful 1296 * otherwise. 1297 * 1298 * @return a task, or {@code null} if none are available 1299 */ 1300 protected static ForkJoinTask<?> pollTask() { 1301 Thread t; ForkJoinWorkerThread wt; 1302 return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? 1303 (wt = (ForkJoinWorkerThread)t).pool.nextTaskFor(wt.workQueue) : 1304 null; 1305 } 1306 1307 /** 1308 * If the current thread is operating in a ForkJoinPool, 1309 * unschedules and returns, without executing, a task externally 1310 * submitted to the pool, if one is available. Availability may be 1311 * transient, so a {@code null} result does not necessarily imply 1312 * quiescence of the pool. This method is designed primarily to 1313 * support extensions, and is unlikely to be useful otherwise. 1314 * 1315 * @return a task, or {@code null} if none are available 1316 * @since 9 1317 */ 1318 protected static ForkJoinTask<?> pollSubmission() { 1319 Thread t; 1320 return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? 1321 ((ForkJoinWorkerThread)t).pool.pollSubmission() : null; 1322 } 1323 1324 // tag operations 1325 1326 /** 1327 * Returns the tag for this task. 1328 * 1329 * @return the tag for this task 1330 * @since 1.8 1331 */ 1332 public final short getForkJoinTaskTag() { 1333 return (short)status; 1334 } 1335 1336 /** 1337 * Atomically sets the tag value for this task and returns the old value. 1338 * 1339 * @param newValue the new tag value 1340 * @return the previous value of the tag 1341 * @since 1.8 1342 */ 1343 public final short setForkJoinTaskTag(short newValue) { 1344 for (int s;;) { 1345 if (STATUS.weakCompareAndSet(this, s = status, 1346 (s & ~SMASK) | (newValue & SMASK))) 1347 return (short)s; 1348 } 1349 } 1350 1351 /** 1352 * Atomically conditionally sets the tag value for this task. 1353 * Among other applications, tags can be used as visit markers 1354 * in tasks operating on graphs, as in methods that check: {@code 1355 * if (task.compareAndSetForkJoinTaskTag((short)0, (short)1))} 1356 * before processing, otherwise exiting because the node has 1357 * already been visited. 1358 * 1359 * @param expect the expected tag value 1360 * @param update the new tag value 1361 * @return {@code true} if successful; i.e., the current value was 1362 * equal to {@code expect} and was changed to {@code update}. 1363 * @since 1.8 1364 */ 1365 public final boolean compareAndSetForkJoinTaskTag(short expect, short update) { 1366 for (int s;;) { 1367 if ((short)(s = status) != expect) 1368 return false; 1369 if (STATUS.weakCompareAndSet(this, s, 1370 (s & ~SMASK) | (update & SMASK))) 1371 return true; 1372 } 1373 } 1374 1375 /** 1376 * Adapter for Runnables. This implements RunnableFuture 1377 * to be compliant with AbstractExecutorService constraints 1378 * when used in ForkJoinPool. 1379 */ 1380 static final class AdaptedRunnable<T> extends ForkJoinTask<T> 1381 implements RunnableFuture<T> { 1382 @SuppressWarnings("serial") // Conditionally serializable 1383 final Runnable runnable; 1384 @SuppressWarnings("serial") // Conditionally serializable 1385 T result; 1386 AdaptedRunnable(Runnable runnable, T result) { 1387 if (runnable == null) throw new NullPointerException(); 1388 this.runnable = runnable; 1389 this.result = result; // OK to set this even before completion 1390 } 1391 public final T getRawResult() { return result; } 1392 public final void setRawResult(T v) { result = v; } 1393 public final boolean exec() { runnable.run(); return true; } 1394 public final void run() { invoke(); } 1395 public String toString() { 1396 return super.toString() + "[Wrapped task = " + runnable + "]"; 1397 } 1398 private static final long serialVersionUID = 5232453952276885070L; 1399 } 1400 1401 /** 1402 * Adapter for Runnables without results. 1403 */ 1404 static final class AdaptedRunnableAction extends ForkJoinTask<Void> 1405 implements RunnableFuture<Void> { 1406 @SuppressWarnings("serial") // Conditionally serializable 1407 final Runnable runnable; 1408 AdaptedRunnableAction(Runnable runnable) { 1409 if (runnable == null) throw new NullPointerException(); 1410 this.runnable = runnable; 1411 } 1412 public final Void getRawResult() { return null; } 1413 public final void setRawResult(Void v) { } 1414 public final boolean exec() { runnable.run(); return true; } 1415 public final void run() { invoke(); } 1416 public String toString() { 1417 return super.toString() + "[Wrapped task = " + runnable + "]"; 1418 } 1419 private static final long serialVersionUID = 5232453952276885070L; 1420 } 1421 1422 /** 1423 * Adapter for Runnables in which failure forces worker exception. 1424 */ 1425 static final class RunnableExecuteAction extends ForkJoinTask<Void> { 1426 @SuppressWarnings("serial") // Conditionally serializable 1427 final Runnable runnable; 1428 RunnableExecuteAction(Runnable runnable) { 1429 if (runnable == null) throw new NullPointerException(); 1430 this.runnable = runnable; 1431 } 1432 public final Void getRawResult() { return null; } 1433 public final void setRawResult(Void v) { } 1434 public final boolean exec() { runnable.run(); return true; } 1435 void internalPropagateException(Throwable ex) { 1436 rethrow(ex); // rethrow outside exec() catches. 1437 } 1438 private static final long serialVersionUID = 5232453952276885070L; 1439 } 1440 1441 /** 1442 * Adapter for Callables. 1443 */ 1444 static final class AdaptedCallable<T> extends ForkJoinTask<T> 1445 implements RunnableFuture<T> { 1446 @SuppressWarnings("serial") // Conditionally serializable 1447 final Callable<? extends T> callable; 1448 @SuppressWarnings("serial") // Conditionally serializable 1449 T result; 1450 AdaptedCallable(Callable<? extends T> callable) { 1451 if (callable == null) throw new NullPointerException(); 1452 this.callable = callable; 1453 } 1454 public final T getRawResult() { return result; } 1455 public final void setRawResult(T v) { result = v; } 1456 public final boolean exec() { 1457 try { 1458 result = callable.call(); 1459 return true; 1460 } catch (RuntimeException rex) { 1461 throw rex; 1462 } catch (Exception ex) { 1463 throw new RuntimeException(ex); 1464 } 1465 } 1466 public final void run() { invoke(); } 1467 public String toString() { 1468 return super.toString() + "[Wrapped task = " + callable + "]"; 1469 } 1470 private static final long serialVersionUID = 2838392045355241008L; 1471 } 1472 1473 /** 1474 * Returns a new {@code ForkJoinTask} that performs the {@code run} 1475 * method of the given {@code Runnable} as its action, and returns 1476 * a null result upon {@link #join}. 1477 * 1478 * @param runnable the runnable action 1479 * @return the task 1480 */ 1481 public static ForkJoinTask<?> adapt(Runnable runnable) { 1482 return new AdaptedRunnableAction(runnable); 1483 } 1484 1485 /** 1486 * Returns a new {@code ForkJoinTask} that performs the {@code run} 1487 * method of the given {@code Runnable} as its action, and returns 1488 * the given result upon {@link #join}. 1489 * 1490 * @param runnable the runnable action 1491 * @param result the result upon completion 1492 * @param <T> the type of the result 1493 * @return the task 1494 */ 1495 public static <T> ForkJoinTask<T> adapt(Runnable runnable, T result) { 1496 return new AdaptedRunnable<T>(runnable, result); 1497 } 1498 1499 /** 1500 * Returns a new {@code ForkJoinTask} that performs the {@code call} 1501 * method of the given {@code Callable} as its action, and returns 1502 * its result upon {@link #join}, translating any checked exceptions 1503 * encountered into {@code RuntimeException}. 1504 * 1505 * @param callable the callable action 1506 * @param <T> the type of the callable's result 1507 * @return the task 1508 */ 1509 public static <T> ForkJoinTask<T> adapt(Callable<? extends T> callable) { 1510 return new AdaptedCallable<T>(callable); 1511 } 1512 1513 // Serialization support 1514 1515 private static final long serialVersionUID = -7721805057305804111L; 1516 1517 /** 1518 * Saves this task to a stream (that is, serializes it). 1519 * 1520 * @param s the stream 1521 * @throws java.io.IOException if an I/O error occurs 1522 * @serialData the current run status and the exception thrown 1523 * during execution, or {@code null} if none 1524 */ 1525 private void writeObject(java.io.ObjectOutputStream s) 1526 throws java.io.IOException { 1527 s.defaultWriteObject(); 1528 s.writeObject(getException()); 1529 } 1530 1531 /** 1532 * Reconstitutes this task from a stream (that is, deserializes it). 1533 * @param s the stream 1534 * @throws ClassNotFoundException if the class of a serialized object 1535 * could not be found 1536 * @throws java.io.IOException if an I/O error occurs 1537 */ 1538 private void readObject(java.io.ObjectInputStream s) 1539 throws java.io.IOException, ClassNotFoundException { 1540 s.defaultReadObject(); 1541 Object ex = s.readObject(); 1542 if (ex != null) 1543 setExceptionalCompletion((Throwable)ex); 1544 } 1545 1546 // VarHandle mechanics 1547 private static final VarHandle STATUS; 1548 static { 1549 try { 1550 MethodHandles.Lookup l = MethodHandles.lookup(); 1551 STATUS = l.findVarHandle(ForkJoinTask.class, "status", int.class); 1552 } catch (ReflectiveOperationException e) { 1553 throw new ExceptionInInitializerError(e); 1554 } 1555 } 1556 1557 }