1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent; 37 38 import java.util.concurrent.locks.LockSupport; 39 import java.util.function.BiConsumer; 40 import java.util.function.BiFunction; 41 import java.util.function.Consumer; 42 import java.util.function.Function; 43 import java.util.function.Supplier; 44 45 /** 46 * A {@link Future} that may be explicitly completed (setting its 47 * value and status), and may be used as a {@link CompletionStage}, 48 * supporting dependent functions and actions that trigger upon its 49 * completion. 50 * 51 * <p>When two or more threads attempt to 52 * {@link #complete complete}, 53 * {@link #completeExceptionally completeExceptionally}, or 54 * {@link #cancel cancel} 55 * a CompletableFuture, only one of them succeeds. 56 * 57 * <p>In addition to these and related methods for directly 58 * manipulating status and results, CompletableFuture implements 59 * interface {@link CompletionStage} with the following policies: <ul> 60 * 61 * <li>Actions supplied for dependent completions of 62 * <em>non-async</em> methods may be performed by the thread that 63 * completes the current CompletableFuture, or by any other caller of 64 * a completion method. 65 * 66 * <li>All <em>async</em> methods without an explicit Executor 67 * argument are performed using the {@link ForkJoinPool#commonPool()} 68 * (unless it does not support a parallelism level of at least two, in 69 * which case, a new Thread is created to run each task). This may be 70 * overridden for non-static methods in subclasses by defining method 71 * {@link #defaultExecutor()}. To simplify monitoring, debugging, 72 * and tracking, all generated asynchronous tasks are instances of the 73 * marker interface {@link AsynchronousCompletionTask}. Operations 74 * with time-delays can use adapter methods defined in this class, for 75 * example: {@code supplyAsync(supplier, delayedExecutor(timeout, 76 * timeUnit))}. To support methods with delays and timeouts, this 77 * class maintains at most one daemon thread for triggering and 78 * cancelling actions, not for running them. 79 * 80 * <li>All CompletionStage methods are implemented independently of 81 * other public methods, so the behavior of one method is not impacted 82 * by overrides of others in subclasses. 83 * 84 * <li>All CompletionStage methods return CompletableFutures. To 85 * restrict usages to only those methods defined in interface 86 * CompletionStage, use method {@link #minimalCompletionStage}. Or to 87 * ensure only that clients do not themselves modify a future, use 88 * method {@link #copy}. 89 * </ul> 90 * 91 * <p>CompletableFuture also implements {@link Future} with the following 92 * policies: <ul> 93 * 94 * <li>Since (unlike {@link FutureTask}) this class has no direct 95 * control over the computation that causes it to be completed, 96 * cancellation is treated as just another form of exceptional 97 * completion. Method {@link #cancel cancel} has the same effect as 98 * {@code completeExceptionally(new CancellationException())}. Method 99 * {@link #isCompletedExceptionally} can be used to determine if a 100 * CompletableFuture completed in any exceptional fashion. 101 * 102 * <li>In case of exceptional completion with a CompletionException, 103 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an 104 * {@link ExecutionException} with the same cause as held in the 105 * corresponding CompletionException. To simplify usage in most 106 * contexts, this class also defines methods {@link #join()} and 107 * {@link #getNow} that instead throw the CompletionException directly 108 * in these cases. 109 * </ul> 110 * 111 * <p>Arguments used to pass a completion result (that is, for 112 * parameters of type {@code T}) for methods accepting them may be 113 * null, but passing a null value for any other parameter will result 114 * in a {@link NullPointerException} being thrown. 115 * 116 * <p>Subclasses of this class should normally override the "virtual 117 * constructor" method {@link #newIncompleteFuture}, which establishes 118 * the concrete type returned by CompletionStage methods. For example, 119 * here is a class that substitutes a different default Executor and 120 * disables the {@code obtrude} methods: 121 * 122 * <pre> {@code 123 * class MyCompletableFuture<T> extends CompletableFuture<T> { 124 * static final Executor myExecutor = ...; 125 * public MyCompletableFuture() { } 126 * public <U> CompletableFuture<U> newIncompleteFuture() { 127 * return new MyCompletableFuture<U>(); } 128 * public Executor defaultExecutor() { 129 * return myExecutor; } 130 * public void obtrudeValue(T value) { 131 * throw new UnsupportedOperationException(); } 132 * public void obtrudeException(Throwable ex) { 133 * throw new UnsupportedOperationException(); } 134 * }}</pre> 135 * 136 * @author Doug Lea 137 * @since 1.8 138 * @param <T> The result type returned by this future's {@code join} 139 * and {@code get} methods 140 */ 141 public class CompletableFuture<T> implements Future<T>, CompletionStage<T> { 142 143 /* 144 * Overview: 145 * 146 * A CompletableFuture may have dependent completion actions, 147 * collected in a linked stack. It atomically completes by CASing 148 * a result field, and then pops off and runs those actions. This 149 * applies across normal vs exceptional outcomes, sync vs async 150 * actions, binary triggers, and various forms of completions. 151 * 152 * Non-nullness of field result (set via CAS) indicates done. An 153 * AltResult is used to box null as a result, as well as to hold 154 * exceptions. Using a single field makes completion simple to 155 * detect and trigger. Encoding and decoding is straightforward 156 * but adds to the sprawl of trapping and associating exceptions 157 * with targets. Minor simplifications rely on (static) NIL (to 158 * box null results) being the only AltResult with a null 159 * exception field, so we don't usually need explicit comparisons. 160 * Even though some of the generics casts are unchecked (see 161 * SuppressWarnings annotations), they are placed to be 162 * appropriate even if checked. 163 * 164 * Dependent actions are represented by Completion objects linked 165 * as Treiber stacks headed by field "stack". There are Completion 166 * classes for each kind of action, grouped into single-input 167 * (UniCompletion), two-input (BiCompletion), projected 168 * (BiCompletions using either (not both) of two inputs), shared 169 * (CoCompletion, used by the second of two sources), zero-input 170 * source actions, and Signallers that unblock waiters. Class 171 * Completion extends ForkJoinTask to enable async execution 172 * (adding no space overhead because we exploit its "tag" methods 173 * to maintain claims). It is also declared as Runnable to allow 174 * usage with arbitrary executors. 175 * 176 * Support for each kind of CompletionStage relies on a separate 177 * class, along with two CompletableFuture methods: 178 * 179 * * A Completion class with name X corresponding to function, 180 * prefaced with "Uni", "Bi", or "Or". Each class contains 181 * fields for source(s), actions, and dependent. They are 182 * boringly similar, differing from others only with respect to 183 * underlying functional forms. We do this so that users don't 184 * encounter layers of adapters in common usages. 185 * 186 * * Boolean CompletableFuture method x(...) (for example 187 * uniApply) takes all of the arguments needed to check that an 188 * action is triggerable, and then either runs the action or 189 * arranges its async execution by executing its Completion 190 * argument, if present. The method returns true if known to be 191 * complete. 192 * 193 * * Completion method tryFire(int mode) invokes the associated x 194 * method with its held arguments, and on success cleans up. 195 * The mode argument allows tryFire to be called twice (SYNC, 196 * then ASYNC); the first to screen and trap exceptions while 197 * arranging to execute, and the second when called from a 198 * task. (A few classes are not used async so take slightly 199 * different forms.) The claim() callback suppresses function 200 * invocation if already claimed by another thread. 201 * 202 * * CompletableFuture method xStage(...) is called from a public 203 * stage method of CompletableFuture x. It screens user 204 * arguments and invokes and/or creates the stage object. If 205 * not async and x is already complete, the action is run 206 * immediately. Otherwise a Completion c is created, pushed to 207 * x's stack (unless done), and started or triggered via 208 * c.tryFire. This also covers races possible if x completes 209 * while pushing. Classes with two inputs (for example BiApply) 210 * deal with races across both while pushing actions. The 211 * second completion is a CoCompletion pointing to the first, 212 * shared so that at most one performs the action. The 213 * multiple-arity methods allOf and anyOf do this pairwise to 214 * form trees of completions. 215 * 216 * Note that the generic type parameters of methods vary according 217 * to whether "this" is a source, dependent, or completion. 218 * 219 * Method postComplete is called upon completion unless the target 220 * is guaranteed not to be observable (i.e., not yet returned or 221 * linked). Multiple threads can call postComplete, which 222 * atomically pops each dependent action, and tries to trigger it 223 * via method tryFire, in NESTED mode. Triggering can propagate 224 * recursively, so NESTED mode returns its completed dependent (if 225 * one exists) for further processing by its caller (see method 226 * postFire). 227 * 228 * Blocking methods get() and join() rely on Signaller Completions 229 * that wake up waiting threads. The mechanics are similar to 230 * Treiber stack wait-nodes used in FutureTask, Phaser, and 231 * SynchronousQueue. See their internal documentation for 232 * algorithmic details. 233 * 234 * Without precautions, CompletableFutures would be prone to 235 * garbage accumulation as chains of Completions build up, each 236 * pointing back to its sources. So we null out fields as soon as 237 * possible (see especially method Completion.detach). The 238 * screening checks needed anyway harmlessly ignore null arguments 239 * that may have been obtained during races with threads nulling 240 * out fields. We also try to unlink fired Completions from 241 * stacks that might never be popped (see method postFire). 242 * Completion fields need not be declared as final or volatile 243 * because they are only visible to other threads upon safe 244 * publication. 245 */ 246 247 volatile Object result; // Either the result or boxed AltResult 248 volatile Completion stack; // Top of Treiber stack of dependent actions 249 250 final boolean internalComplete(Object r) { // CAS from null to r 251 return U.compareAndSwapObject(this, RESULT, null, r); 252 } 253 254 final boolean casStack(Completion cmp, Completion val) { 255 return U.compareAndSwapObject(this, STACK, cmp, val); 256 } 257 258 /** Returns true if successfully pushed c onto stack. */ 259 final boolean tryPushStack(Completion c) { 260 Completion h = stack; 261 lazySetNext(c, h); 262 return U.compareAndSwapObject(this, STACK, h, c); 263 } 264 265 /** Unconditionally pushes c onto stack, retrying if necessary. */ 266 final void pushStack(Completion c) { 267 do {} while (!tryPushStack(c)); 268 } 269 270 /* ------------- Encoding and decoding outcomes -------------- */ 271 272 static final class AltResult { // See above 273 final Throwable ex; // null only for NIL 274 AltResult(Throwable x) { this.ex = x; } 275 } 276 277 /** The encoding of the null value. */ 278 static final AltResult NIL = new AltResult(null); 279 280 /** Completes with the null value, unless already completed. */ 281 final boolean completeNull() { 282 return U.compareAndSwapObject(this, RESULT, null, 283 NIL); 284 } 285 286 /** Returns the encoding of the given non-exceptional value. */ 287 final Object encodeValue(T t) { 288 return (t == null) ? NIL : t; 289 } 290 291 /** Completes with a non-exceptional result, unless already completed. */ 292 final boolean completeValue(T t) { 293 return U.compareAndSwapObject(this, RESULT, null, 294 (t == null) ? NIL : t); 295 } 296 297 /** 298 * Returns the encoding of the given (non-null) exception as a 299 * wrapped CompletionException unless it is one already. 300 */ 301 static AltResult encodeThrowable(Throwable x) { 302 return new AltResult((x instanceof CompletionException) ? x : 303 new CompletionException(x)); 304 } 305 306 /** Completes with an exceptional result, unless already completed. */ 307 final boolean completeThrowable(Throwable x) { 308 return U.compareAndSwapObject(this, RESULT, null, 309 encodeThrowable(x)); 310 } 311 312 /** 313 * Returns the encoding of the given (non-null) exception as a 314 * wrapped CompletionException unless it is one already. May 315 * return the given Object r (which must have been the result of a 316 * source future) if it is equivalent, i.e. if this is a simple 317 * relay of an existing CompletionException. 318 */ 319 static Object encodeThrowable(Throwable x, Object r) { 320 if (!(x instanceof CompletionException)) 321 x = new CompletionException(x); 322 else if (r instanceof AltResult && x == ((AltResult)r).ex) 323 return r; 324 return new AltResult(x); 325 } 326 327 /** 328 * Completes with the given (non-null) exceptional result as a 329 * wrapped CompletionException unless it is one already, unless 330 * already completed. May complete with the given Object r 331 * (which must have been the result of a source future) if it is 332 * equivalent, i.e. if this is a simple propagation of an 333 * existing CompletionException. 334 */ 335 final boolean completeThrowable(Throwable x, Object r) { 336 return U.compareAndSwapObject(this, RESULT, null, 337 encodeThrowable(x, r)); 338 } 339 340 /** 341 * Returns the encoding of the given arguments: if the exception 342 * is non-null, encodes as AltResult. Otherwise uses the given 343 * value, boxed as NIL if null. 344 */ 345 Object encodeOutcome(T t, Throwable x) { 346 return (x == null) ? (t == null) ? NIL : t : encodeThrowable(x); 347 } 348 349 /** 350 * Returns the encoding of a copied outcome; if exceptional, 351 * rewraps as a CompletionException, else returns argument. 352 */ 353 static Object encodeRelay(Object r) { 354 Throwable x; 355 return (((r instanceof AltResult) && 356 (x = ((AltResult)r).ex) != null && 357 !(x instanceof CompletionException)) ? 358 new AltResult(new CompletionException(x)) : r); 359 } 360 361 /** 362 * Completes with r or a copy of r, unless already completed. 363 * If exceptional, r is first coerced to a CompletionException. 364 */ 365 final boolean completeRelay(Object r) { 366 return U.compareAndSwapObject(this, RESULT, null, 367 encodeRelay(r)); 368 } 369 370 /** 371 * Reports result using Future.get conventions. 372 */ 373 private static <T> T reportGet(Object r) 374 throws InterruptedException, ExecutionException { 375 if (r == null) // by convention below, null means interrupted 376 throw new InterruptedException(); 377 if (r instanceof AltResult) { 378 Throwable x, cause; 379 if ((x = ((AltResult)r).ex) == null) 380 return null; 381 if (x instanceof CancellationException) 382 throw (CancellationException)x; 383 if ((x instanceof CompletionException) && 384 (cause = x.getCause()) != null) 385 x = cause; 386 throw new ExecutionException(x); 387 } 388 @SuppressWarnings("unchecked") T t = (T) r; 389 return t; 390 } 391 392 /** 393 * Decodes outcome to return result or throw unchecked exception. 394 */ 395 private static <T> T reportJoin(Object r) { 396 if (r instanceof AltResult) { 397 Throwable x; 398 if ((x = ((AltResult)r).ex) == null) 399 return null; 400 if (x instanceof CancellationException) 401 throw (CancellationException)x; 402 if (x instanceof CompletionException) 403 throw (CompletionException)x; 404 throw new CompletionException(x); 405 } 406 @SuppressWarnings("unchecked") T t = (T) r; 407 return t; 408 } 409 410 /* ------------- Async task preliminaries -------------- */ 411 412 /** 413 * A marker interface identifying asynchronous tasks produced by 414 * {@code async} methods. This may be useful for monitoring, 415 * debugging, and tracking asynchronous activities. 416 * 417 * @since 1.8 418 */ 419 public static interface AsynchronousCompletionTask { 420 } 421 422 private static final boolean USE_COMMON_POOL = 423 (ForkJoinPool.getCommonPoolParallelism() > 1); 424 425 /** 426 * Default executor -- ForkJoinPool.commonPool() unless it cannot 427 * support parallelism. 428 */ 429 private static final Executor ASYNC_POOL = USE_COMMON_POOL ? 430 ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); 431 432 /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */ 433 static final class ThreadPerTaskExecutor implements Executor { 434 public void execute(Runnable r) { new Thread(r).start(); } 435 } 436 437 /** 438 * Null-checks user executor argument, and translates uses of 439 * commonPool to ASYNC_POOL in case parallelism disabled. 440 */ 441 static Executor screenExecutor(Executor e) { 442 if (!USE_COMMON_POOL && e == ForkJoinPool.commonPool()) 443 return ASYNC_POOL; 444 if (e == null) throw new NullPointerException(); 445 return e; 446 } 447 448 // Modes for Completion.tryFire. Signedness matters. 449 static final int SYNC = 0; 450 static final int ASYNC = 1; 451 static final int NESTED = -1; 452 453 /** 454 * Spins before blocking in waitingGet 455 */ 456 static final int SPINS = (Runtime.getRuntime().availableProcessors() > 1 ? 457 1 << 8 : 0); 458 459 /* ------------- Base Completion classes and operations -------------- */ 460 461 @SuppressWarnings("serial") 462 abstract static class Completion extends ForkJoinTask<Void> 463 implements Runnable, AsynchronousCompletionTask { 464 volatile Completion next; // Treiber stack link 465 466 /** 467 * Performs completion action if triggered, returning a 468 * dependent that may need propagation, if one exists. 469 * 470 * @param mode SYNC, ASYNC, or NESTED 471 */ 472 abstract CompletableFuture<?> tryFire(int mode); 473 474 /** Returns true if possibly still triggerable. Used by cleanStack. */ 475 abstract boolean isLive(); 476 477 public final void run() { tryFire(ASYNC); } 478 public final boolean exec() { tryFire(ASYNC); return false; } 479 public final Void getRawResult() { return null; } 480 public final void setRawResult(Void v) {} 481 } 482 483 static void lazySetNext(Completion c, Completion next) { 484 U.putOrderedObject(c, NEXT, next); 485 } 486 487 /** 488 * Pops and tries to trigger all reachable dependents. Call only 489 * when known to be done. 490 */ 491 final void postComplete() { 492 /* 493 * On each step, variable f holds current dependents to pop 494 * and run. It is extended along only one path at a time, 495 * pushing others to avoid unbounded recursion. 496 */ 497 CompletableFuture<?> f = this; Completion h; 498 while ((h = f.stack) != null || 499 (f != this && (h = (f = this).stack) != null)) { 500 CompletableFuture<?> d; Completion t; 501 if (f.casStack(h, t = h.next)) { 502 if (t != null) { 503 if (f != this) { 504 pushStack(h); 505 continue; 506 } 507 h.next = null; // detach 508 } 509 f = (d = h.tryFire(NESTED)) == null ? this : d; 510 } 511 } 512 } 513 514 /** Traverses stack and unlinks dead Completions. */ 515 final void cleanStack() { 516 for (Completion p = null, q = stack; q != null;) { 517 Completion s = q.next; 518 if (q.isLive()) { 519 p = q; 520 q = s; 521 } 522 else if (p == null) { 523 casStack(q, s); 524 q = stack; 525 } 526 else { 527 p.next = s; 528 if (p.isLive()) 529 q = s; 530 else { 531 p = null; // restart 532 q = stack; 533 } 534 } 535 } 536 } 537 538 /* ------------- One-input Completions -------------- */ 539 540 /** A Completion with a source, dependent, and executor. */ 541 @SuppressWarnings("serial") 542 abstract static class UniCompletion<T,V> extends Completion { 543 Executor executor; // executor to use (null if none) 544 CompletableFuture<V> dep; // the dependent to complete 545 CompletableFuture<T> src; // source for action 546 547 UniCompletion(Executor executor, CompletableFuture<V> dep, 548 CompletableFuture<T> src) { 549 this.executor = executor; this.dep = dep; this.src = src; 550 } 551 552 /** 553 * Returns true if action can be run. Call only when known to 554 * be triggerable. Uses FJ tag bit to ensure that only one 555 * thread claims ownership. If async, starts as task -- a 556 * later call to tryFire will run action. 557 */ 558 final boolean claim() { 559 Executor e = executor; 560 if (compareAndSetForkJoinTaskTag((short)0, (short)1)) { 561 if (e == null) 562 return true; 563 executor = null; // disable 564 e.execute(this); 565 } 566 return false; 567 } 568 569 final boolean isLive() { return dep != null; } 570 } 571 572 /** Pushes the given completion (if it exists) unless done. */ 573 final void push(UniCompletion<?,?> c) { 574 if (c != null) { 575 while (result == null && !tryPushStack(c)) 576 lazySetNext(c, null); // clear on failure 577 } 578 } 579 580 /** 581 * Post-processing by dependent after successful UniCompletion 582 * tryFire. Tries to clean stack of source a, and then either runs 583 * postComplete or returns this to caller, depending on mode. 584 */ 585 final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) { 586 if (a != null && a.stack != null) { 587 if (mode < 0 || a.result == null) 588 a.cleanStack(); 589 else 590 a.postComplete(); 591 } 592 if (result != null && stack != null) { 593 if (mode < 0) 594 return this; 595 else 596 postComplete(); 597 } 598 return null; 599 } 600 601 @SuppressWarnings("serial") 602 static final class UniApply<T,V> extends UniCompletion<T,V> { 603 Function<? super T,? extends V> fn; 604 UniApply(Executor executor, CompletableFuture<V> dep, 605 CompletableFuture<T> src, 606 Function<? super T,? extends V> fn) { 607 super(executor, dep, src); this.fn = fn; 608 } 609 final CompletableFuture<V> tryFire(int mode) { 610 CompletableFuture<V> d; CompletableFuture<T> a; 611 if ((d = dep) == null || 612 !d.uniApply(a = src, fn, mode > 0 ? null : this)) 613 return null; 614 dep = null; src = null; fn = null; 615 return d.postFire(a, mode); 616 } 617 } 618 619 final <S> boolean uniApply(CompletableFuture<S> a, 620 Function<? super S,? extends T> f, 621 UniApply<S,T> c) { 622 Object r; Throwable x; 623 if (a == null || (r = a.result) == null || f == null) 624 return false; 625 tryComplete: if (result == null) { 626 if (r instanceof AltResult) { 627 if ((x = ((AltResult)r).ex) != null) { 628 completeThrowable(x, r); 629 break tryComplete; 630 } 631 r = null; 632 } 633 try { 634 if (c != null && !c.claim()) 635 return false; 636 @SuppressWarnings("unchecked") S s = (S) r; 637 completeValue(f.apply(s)); 638 } catch (Throwable ex) { 639 completeThrowable(ex); 640 } 641 } 642 return true; 643 } 644 645 private <V> CompletableFuture<V> uniApplyStage( 646 Executor e, Function<? super T,? extends V> f) { 647 if (f == null) throw new NullPointerException(); 648 CompletableFuture<V> d = newIncompleteFuture(); 649 if (e != null || !d.uniApply(this, f, null)) { 650 UniApply<T,V> c = new UniApply<T,V>(e, d, this, f); 651 push(c); 652 c.tryFire(SYNC); 653 } 654 return d; 655 } 656 657 @SuppressWarnings("serial") 658 static final class UniAccept<T> extends UniCompletion<T,Void> { 659 Consumer<? super T> fn; 660 UniAccept(Executor executor, CompletableFuture<Void> dep, 661 CompletableFuture<T> src, Consumer<? super T> fn) { 662 super(executor, dep, src); this.fn = fn; 663 } 664 final CompletableFuture<Void> tryFire(int mode) { 665 CompletableFuture<Void> d; CompletableFuture<T> a; 666 if ((d = dep) == null || 667 !d.uniAccept(a = src, fn, mode > 0 ? null : this)) 668 return null; 669 dep = null; src = null; fn = null; 670 return d.postFire(a, mode); 671 } 672 } 673 674 final <S> boolean uniAccept(CompletableFuture<S> a, 675 Consumer<? super S> f, UniAccept<S> c) { 676 Object r; Throwable x; 677 if (a == null || (r = a.result) == null || f == null) 678 return false; 679 tryComplete: if (result == null) { 680 if (r instanceof AltResult) { 681 if ((x = ((AltResult)r).ex) != null) { 682 completeThrowable(x, r); 683 break tryComplete; 684 } 685 r = null; 686 } 687 try { 688 if (c != null && !c.claim()) 689 return false; 690 @SuppressWarnings("unchecked") S s = (S) r; 691 f.accept(s); 692 completeNull(); 693 } catch (Throwable ex) { 694 completeThrowable(ex); 695 } 696 } 697 return true; 698 } 699 700 private CompletableFuture<Void> uniAcceptStage(Executor e, 701 Consumer<? super T> f) { 702 if (f == null) throw new NullPointerException(); 703 CompletableFuture<Void> d = newIncompleteFuture(); 704 if (e != null || !d.uniAccept(this, f, null)) { 705 UniAccept<T> c = new UniAccept<T>(e, d, this, f); 706 push(c); 707 c.tryFire(SYNC); 708 } 709 return d; 710 } 711 712 @SuppressWarnings("serial") 713 static final class UniRun<T> extends UniCompletion<T,Void> { 714 Runnable fn; 715 UniRun(Executor executor, CompletableFuture<Void> dep, 716 CompletableFuture<T> src, Runnable fn) { 717 super(executor, dep, src); this.fn = fn; 718 } 719 final CompletableFuture<Void> tryFire(int mode) { 720 CompletableFuture<Void> d; CompletableFuture<T> a; 721 if ((d = dep) == null || 722 !d.uniRun(a = src, fn, mode > 0 ? null : this)) 723 return null; 724 dep = null; src = null; fn = null; 725 return d.postFire(a, mode); 726 } 727 } 728 729 final boolean uniRun(CompletableFuture<?> a, Runnable f, UniRun<?> c) { 730 Object r; Throwable x; 731 if (a == null || (r = a.result) == null || f == null) 732 return false; 733 if (result == null) { 734 if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) 735 completeThrowable(x, r); 736 else 737 try { 738 if (c != null && !c.claim()) 739 return false; 740 f.run(); 741 completeNull(); 742 } catch (Throwable ex) { 743 completeThrowable(ex); 744 } 745 } 746 return true; 747 } 748 749 private CompletableFuture<Void> uniRunStage(Executor e, Runnable f) { 750 if (f == null) throw new NullPointerException(); 751 CompletableFuture<Void> d = newIncompleteFuture(); 752 if (e != null || !d.uniRun(this, f, null)) { 753 UniRun<T> c = new UniRun<T>(e, d, this, f); 754 push(c); 755 c.tryFire(SYNC); 756 } 757 return d; 758 } 759 760 @SuppressWarnings("serial") 761 static final class UniWhenComplete<T> extends UniCompletion<T,T> { 762 BiConsumer<? super T, ? super Throwable> fn; 763 UniWhenComplete(Executor executor, CompletableFuture<T> dep, 764 CompletableFuture<T> src, 765 BiConsumer<? super T, ? super Throwable> fn) { 766 super(executor, dep, src); this.fn = fn; 767 } 768 final CompletableFuture<T> tryFire(int mode) { 769 CompletableFuture<T> d; CompletableFuture<T> a; 770 if ((d = dep) == null || 771 !d.uniWhenComplete(a = src, fn, mode > 0 ? null : this)) 772 return null; 773 dep = null; src = null; fn = null; 774 return d.postFire(a, mode); 775 } 776 } 777 778 final boolean uniWhenComplete(CompletableFuture<T> a, 779 BiConsumer<? super T,? super Throwable> f, 780 UniWhenComplete<T> c) { 781 Object r; T t; Throwable x = null; 782 if (a == null || (r = a.result) == null || f == null) 783 return false; 784 if (result == null) { 785 try { 786 if (c != null && !c.claim()) 787 return false; 788 if (r instanceof AltResult) { 789 x = ((AltResult)r).ex; 790 t = null; 791 } else { 792 @SuppressWarnings("unchecked") T tr = (T) r; 793 t = tr; 794 } 795 f.accept(t, x); 796 if (x == null) { 797 internalComplete(r); 798 return true; 799 } 800 } catch (Throwable ex) { 801 if (x == null) 802 x = ex; 803 } 804 completeThrowable(x, r); 805 } 806 return true; 807 } 808 809 private CompletableFuture<T> uniWhenCompleteStage( 810 Executor e, BiConsumer<? super T, ? super Throwable> f) { 811 if (f == null) throw new NullPointerException(); 812 CompletableFuture<T> d = newIncompleteFuture(); 813 if (e != null || !d.uniWhenComplete(this, f, null)) { 814 UniWhenComplete<T> c = new UniWhenComplete<T>(e, d, this, f); 815 push(c); 816 c.tryFire(SYNC); 817 } 818 return d; 819 } 820 821 @SuppressWarnings("serial") 822 static final class UniHandle<T,V> extends UniCompletion<T,V> { 823 BiFunction<? super T, Throwable, ? extends V> fn; 824 UniHandle(Executor executor, CompletableFuture<V> dep, 825 CompletableFuture<T> src, 826 BiFunction<? super T, Throwable, ? extends V> fn) { 827 super(executor, dep, src); this.fn = fn; 828 } 829 final CompletableFuture<V> tryFire(int mode) { 830 CompletableFuture<V> d; CompletableFuture<T> a; 831 if ((d = dep) == null || 832 !d.uniHandle(a = src, fn, mode > 0 ? null : this)) 833 return null; 834 dep = null; src = null; fn = null; 835 return d.postFire(a, mode); 836 } 837 } 838 839 final <S> boolean uniHandle(CompletableFuture<S> a, 840 BiFunction<? super S, Throwable, ? extends T> f, 841 UniHandle<S,T> c) { 842 Object r; S s; Throwable x; 843 if (a == null || (r = a.result) == null || f == null) 844 return false; 845 if (result == null) { 846 try { 847 if (c != null && !c.claim()) 848 return false; 849 if (r instanceof AltResult) { 850 x = ((AltResult)r).ex; 851 s = null; 852 } else { 853 x = null; 854 @SuppressWarnings("unchecked") S ss = (S) r; 855 s = ss; 856 } 857 completeValue(f.apply(s, x)); 858 } catch (Throwable ex) { 859 completeThrowable(ex); 860 } 861 } 862 return true; 863 } 864 865 private <V> CompletableFuture<V> uniHandleStage( 866 Executor e, BiFunction<? super T, Throwable, ? extends V> f) { 867 if (f == null) throw new NullPointerException(); 868 CompletableFuture<V> d = newIncompleteFuture(); 869 if (e != null || !d.uniHandle(this, f, null)) { 870 UniHandle<T,V> c = new UniHandle<T,V>(e, d, this, f); 871 push(c); 872 c.tryFire(SYNC); 873 } 874 return d; 875 } 876 877 @SuppressWarnings("serial") 878 static final class UniExceptionally<T> extends UniCompletion<T,T> { 879 Function<? super Throwable, ? extends T> fn; 880 UniExceptionally(CompletableFuture<T> dep, CompletableFuture<T> src, 881 Function<? super Throwable, ? extends T> fn) { 882 super(null, dep, src); this.fn = fn; 883 } 884 final CompletableFuture<T> tryFire(int mode) { // never ASYNC 885 // assert mode != ASYNC; 886 CompletableFuture<T> d; CompletableFuture<T> a; 887 if ((d = dep) == null || !d.uniExceptionally(a = src, fn, this)) 888 return null; 889 dep = null; src = null; fn = null; 890 return d.postFire(a, mode); 891 } 892 } 893 894 final boolean uniExceptionally(CompletableFuture<T> a, 895 Function<? super Throwable, ? extends T> f, 896 UniExceptionally<T> c) { 897 Object r; Throwable x; 898 if (a == null || (r = a.result) == null || f == null) 899 return false; 900 if (result == null) { 901 try { 902 if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) { 903 if (c != null && !c.claim()) 904 return false; 905 completeValue(f.apply(x)); 906 } else 907 internalComplete(r); 908 } catch (Throwable ex) { 909 completeThrowable(ex); 910 } 911 } 912 return true; 913 } 914 915 private CompletableFuture<T> uniExceptionallyStage( 916 Function<Throwable, ? extends T> f) { 917 if (f == null) throw new NullPointerException(); 918 CompletableFuture<T> d = newIncompleteFuture(); 919 if (!d.uniExceptionally(this, f, null)) { 920 UniExceptionally<T> c = new UniExceptionally<T>(d, this, f); 921 push(c); 922 c.tryFire(SYNC); 923 } 924 return d; 925 } 926 927 @SuppressWarnings("serial") 928 static final class UniRelay<T> extends UniCompletion<T,T> { // for Compose 929 UniRelay(CompletableFuture<T> dep, CompletableFuture<T> src) { 930 super(null, dep, src); 931 } 932 final CompletableFuture<T> tryFire(int mode) { 933 CompletableFuture<T> d; CompletableFuture<T> a; 934 if ((d = dep) == null || !d.uniRelay(a = src)) 935 return null; 936 src = null; dep = null; 937 return d.postFire(a, mode); 938 } 939 } 940 941 final boolean uniRelay(CompletableFuture<T> a) { 942 Object r; 943 if (a == null || (r = a.result) == null) 944 return false; 945 if (result == null) // no need to claim 946 completeRelay(r); 947 return true; 948 } 949 950 private CompletableFuture<T> uniCopyStage() { 951 Object r; 952 CompletableFuture<T> d = newIncompleteFuture(); 953 if ((r = result) != null) 954 d.completeRelay(r); 955 else { 956 UniRelay<T> c = new UniRelay<T>(d, this); 957 push(c); 958 c.tryFire(SYNC); 959 } 960 return d; 961 } 962 963 private MinimalStage<T> uniAsMinimalStage() { 964 Object r; 965 if ((r = result) != null) 966 return new MinimalStage<T>(encodeRelay(r)); 967 MinimalStage<T> d = new MinimalStage<T>(); 968 UniRelay<T> c = new UniRelay<T>(d, this); 969 push(c); 970 c.tryFire(SYNC); 971 return d; 972 } 973 974 @SuppressWarnings("serial") 975 static final class UniCompose<T,V> extends UniCompletion<T,V> { 976 Function<? super T, ? extends CompletionStage<V>> fn; 977 UniCompose(Executor executor, CompletableFuture<V> dep, 978 CompletableFuture<T> src, 979 Function<? super T, ? extends CompletionStage<V>> fn) { 980 super(executor, dep, src); this.fn = fn; 981 } 982 final CompletableFuture<V> tryFire(int mode) { 983 CompletableFuture<V> d; CompletableFuture<T> a; 984 if ((d = dep) == null || 985 !d.uniCompose(a = src, fn, mode > 0 ? null : this)) 986 return null; 987 dep = null; src = null; fn = null; 988 return d.postFire(a, mode); 989 } 990 } 991 992 final <S> boolean uniCompose( 993 CompletableFuture<S> a, 994 Function<? super S, ? extends CompletionStage<T>> f, 995 UniCompose<S,T> c) { 996 Object r; Throwable x; 997 if (a == null || (r = a.result) == null || f == null) 998 return false; 999 tryComplete: if (result == null) { 1000 if (r instanceof AltResult) { 1001 if ((x = ((AltResult)r).ex) != null) { 1002 completeThrowable(x, r); 1003 break tryComplete; 1004 } 1005 r = null; 1006 } 1007 try { 1008 if (c != null && !c.claim()) 1009 return false; 1010 @SuppressWarnings("unchecked") S s = (S) r; 1011 CompletableFuture<T> g = f.apply(s).toCompletableFuture(); 1012 if (g.result == null || !uniRelay(g)) { 1013 UniRelay<T> copy = new UniRelay<T>(this, g); 1014 g.push(copy); 1015 copy.tryFire(SYNC); 1016 if (result == null) 1017 return false; 1018 } 1019 } catch (Throwable ex) { 1020 completeThrowable(ex); 1021 } 1022 } 1023 return true; 1024 } 1025 1026 private <V> CompletableFuture<V> uniComposeStage( 1027 Executor e, Function<? super T, ? extends CompletionStage<V>> f) { 1028 if (f == null) throw new NullPointerException(); 1029 Object r, s; Throwable x; 1030 CompletableFuture<V> d = newIncompleteFuture(); 1031 if (e == null && (r = result) != null) { 1032 if (r instanceof AltResult) { 1033 if ((x = ((AltResult)r).ex) != null) { 1034 d.result = encodeThrowable(x, r); 1035 return d; 1036 } 1037 r = null; 1038 } 1039 try { 1040 @SuppressWarnings("unchecked") T t = (T) r; 1041 CompletableFuture<V> g = f.apply(t).toCompletableFuture(); 1042 if ((s = g.result) != null) 1043 d.completeRelay(s); 1044 else { 1045 UniRelay<V> c = new UniRelay<V>(d, g); 1046 g.push(c); 1047 c.tryFire(SYNC); 1048 } 1049 return d; 1050 } catch (Throwable ex) { 1051 d.result = encodeThrowable(ex); 1052 return d; 1053 } 1054 } 1055 UniCompose<T,V> c = new UniCompose<T,V>(e, d, this, f); 1056 push(c); 1057 c.tryFire(SYNC); 1058 return d; 1059 } 1060 1061 /* ------------- Two-input Completions -------------- */ 1062 1063 /** A Completion for an action with two sources */ 1064 @SuppressWarnings("serial") 1065 abstract static class BiCompletion<T,U,V> extends UniCompletion<T,V> { 1066 CompletableFuture<U> snd; // second source for action 1067 BiCompletion(Executor executor, CompletableFuture<V> dep, 1068 CompletableFuture<T> src, CompletableFuture<U> snd) { 1069 super(executor, dep, src); this.snd = snd; 1070 } 1071 } 1072 1073 /** A Completion delegating to a BiCompletion */ 1074 @SuppressWarnings("serial") 1075 static final class CoCompletion extends Completion { 1076 BiCompletion<?,?,?> base; 1077 CoCompletion(BiCompletion<?,?,?> base) { this.base = base; } 1078 final CompletableFuture<?> tryFire(int mode) { 1079 BiCompletion<?,?,?> c; CompletableFuture<?> d; 1080 if ((c = base) == null || (d = c.tryFire(mode)) == null) 1081 return null; 1082 base = null; // detach 1083 return d; 1084 } 1085 final boolean isLive() { 1086 BiCompletion<?,?,?> c; 1087 return (c = base) != null && c.dep != null; 1088 } 1089 } 1090 1091 /** Pushes completion to this and b unless both done. */ 1092 final void bipush(CompletableFuture<?> b, BiCompletion<?,?,?> c) { 1093 if (c != null) { 1094 Object r; 1095 while ((r = result) == null && !tryPushStack(c)) 1096 lazySetNext(c, null); // clear on failure 1097 if (b != null && b != this && b.result == null) { 1098 Completion q = (r != null) ? c : new CoCompletion(c); 1099 while (b.result == null && !b.tryPushStack(q)) 1100 lazySetNext(q, null); // clear on failure 1101 } 1102 } 1103 } 1104 1105 /** Post-processing after successful BiCompletion tryFire. */ 1106 final CompletableFuture<T> postFire(CompletableFuture<?> a, 1107 CompletableFuture<?> b, int mode) { 1108 if (b != null && b.stack != null) { // clean second source 1109 if (mode < 0 || b.result == null) 1110 b.cleanStack(); 1111 else 1112 b.postComplete(); 1113 } 1114 return postFire(a, mode); 1115 } 1116 1117 @SuppressWarnings("serial") 1118 static final class BiApply<T,U,V> extends BiCompletion<T,U,V> { 1119 BiFunction<? super T,? super U,? extends V> fn; 1120 BiApply(Executor executor, CompletableFuture<V> dep, 1121 CompletableFuture<T> src, CompletableFuture<U> snd, 1122 BiFunction<? super T,? super U,? extends V> fn) { 1123 super(executor, dep, src, snd); this.fn = fn; 1124 } 1125 final CompletableFuture<V> tryFire(int mode) { 1126 CompletableFuture<V> d; 1127 CompletableFuture<T> a; 1128 CompletableFuture<U> b; 1129 if ((d = dep) == null || 1130 !d.biApply(a = src, b = snd, fn, mode > 0 ? null : this)) 1131 return null; 1132 dep = null; src = null; snd = null; fn = null; 1133 return d.postFire(a, b, mode); 1134 } 1135 } 1136 1137 final <R,S> boolean biApply(CompletableFuture<R> a, 1138 CompletableFuture<S> b, 1139 BiFunction<? super R,? super S,? extends T> f, 1140 BiApply<R,S,T> c) { 1141 Object r, s; Throwable x; 1142 if (a == null || (r = a.result) == null || 1143 b == null || (s = b.result) == null || f == null) 1144 return false; 1145 tryComplete: if (result == null) { 1146 if (r instanceof AltResult) { 1147 if ((x = ((AltResult)r).ex) != null) { 1148 completeThrowable(x, r); 1149 break tryComplete; 1150 } 1151 r = null; 1152 } 1153 if (s instanceof AltResult) { 1154 if ((x = ((AltResult)s).ex) != null) { 1155 completeThrowable(x, s); 1156 break tryComplete; 1157 } 1158 s = null; 1159 } 1160 try { 1161 if (c != null && !c.claim()) 1162 return false; 1163 @SuppressWarnings("unchecked") R rr = (R) r; 1164 @SuppressWarnings("unchecked") S ss = (S) s; 1165 completeValue(f.apply(rr, ss)); 1166 } catch (Throwable ex) { 1167 completeThrowable(ex); 1168 } 1169 } 1170 return true; 1171 } 1172 1173 private <U,V> CompletableFuture<V> biApplyStage( 1174 Executor e, CompletionStage<U> o, 1175 BiFunction<? super T,? super U,? extends V> f) { 1176 CompletableFuture<U> b; 1177 if (f == null || (b = o.toCompletableFuture()) == null) 1178 throw new NullPointerException(); 1179 CompletableFuture<V> d = newIncompleteFuture(); 1180 if (e != null || !d.biApply(this, b, f, null)) { 1181 BiApply<T,U,V> c = new BiApply<T,U,V>(e, d, this, b, f); 1182 bipush(b, c); 1183 c.tryFire(SYNC); 1184 } 1185 return d; 1186 } 1187 1188 @SuppressWarnings("serial") 1189 static final class BiAccept<T,U> extends BiCompletion<T,U,Void> { 1190 BiConsumer<? super T,? super U> fn; 1191 BiAccept(Executor executor, CompletableFuture<Void> dep, 1192 CompletableFuture<T> src, CompletableFuture<U> snd, 1193 BiConsumer<? super T,? super U> fn) { 1194 super(executor, dep, src, snd); this.fn = fn; 1195 } 1196 final CompletableFuture<Void> tryFire(int mode) { 1197 CompletableFuture<Void> d; 1198 CompletableFuture<T> a; 1199 CompletableFuture<U> b; 1200 if ((d = dep) == null || 1201 !d.biAccept(a = src, b = snd, fn, mode > 0 ? null : this)) 1202 return null; 1203 dep = null; src = null; snd = null; fn = null; 1204 return d.postFire(a, b, mode); 1205 } 1206 } 1207 1208 final <R,S> boolean biAccept(CompletableFuture<R> a, 1209 CompletableFuture<S> b, 1210 BiConsumer<? super R,? super S> f, 1211 BiAccept<R,S> c) { 1212 Object r, s; Throwable x; 1213 if (a == null || (r = a.result) == null || 1214 b == null || (s = b.result) == null || f == null) 1215 return false; 1216 tryComplete: if (result == null) { 1217 if (r instanceof AltResult) { 1218 if ((x = ((AltResult)r).ex) != null) { 1219 completeThrowable(x, r); 1220 break tryComplete; 1221 } 1222 r = null; 1223 } 1224 if (s instanceof AltResult) { 1225 if ((x = ((AltResult)s).ex) != null) { 1226 completeThrowable(x, s); 1227 break tryComplete; 1228 } 1229 s = null; 1230 } 1231 try { 1232 if (c != null && !c.claim()) 1233 return false; 1234 @SuppressWarnings("unchecked") R rr = (R) r; 1235 @SuppressWarnings("unchecked") S ss = (S) s; 1236 f.accept(rr, ss); 1237 completeNull(); 1238 } catch (Throwable ex) { 1239 completeThrowable(ex); 1240 } 1241 } 1242 return true; 1243 } 1244 1245 private <U> CompletableFuture<Void> biAcceptStage( 1246 Executor e, CompletionStage<U> o, 1247 BiConsumer<? super T,? super U> f) { 1248 CompletableFuture<U> b; 1249 if (f == null || (b = o.toCompletableFuture()) == null) 1250 throw new NullPointerException(); 1251 CompletableFuture<Void> d = newIncompleteFuture(); 1252 if (e != null || !d.biAccept(this, b, f, null)) { 1253 BiAccept<T,U> c = new BiAccept<T,U>(e, d, this, b, f); 1254 bipush(b, c); 1255 c.tryFire(SYNC); 1256 } 1257 return d; 1258 } 1259 1260 @SuppressWarnings("serial") 1261 static final class BiRun<T,U> extends BiCompletion<T,U,Void> { 1262 Runnable fn; 1263 BiRun(Executor executor, CompletableFuture<Void> dep, 1264 CompletableFuture<T> src, 1265 CompletableFuture<U> snd, 1266 Runnable fn) { 1267 super(executor, dep, src, snd); this.fn = fn; 1268 } 1269 final CompletableFuture<Void> tryFire(int mode) { 1270 CompletableFuture<Void> d; 1271 CompletableFuture<T> a; 1272 CompletableFuture<U> b; 1273 if ((d = dep) == null || 1274 !d.biRun(a = src, b = snd, fn, mode > 0 ? null : this)) 1275 return null; 1276 dep = null; src = null; snd = null; fn = null; 1277 return d.postFire(a, b, mode); 1278 } 1279 } 1280 1281 final boolean biRun(CompletableFuture<?> a, CompletableFuture<?> b, 1282 Runnable f, BiRun<?,?> c) { 1283 Object r, s; Throwable x; 1284 if (a == null || (r = a.result) == null || 1285 b == null || (s = b.result) == null || f == null) 1286 return false; 1287 if (result == null) { 1288 if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) 1289 completeThrowable(x, r); 1290 else if (s instanceof AltResult && (x = ((AltResult)s).ex) != null) 1291 completeThrowable(x, s); 1292 else 1293 try { 1294 if (c != null && !c.claim()) 1295 return false; 1296 f.run(); 1297 completeNull(); 1298 } catch (Throwable ex) { 1299 completeThrowable(ex); 1300 } 1301 } 1302 return true; 1303 } 1304 1305 private CompletableFuture<Void> biRunStage(Executor e, CompletionStage<?> o, 1306 Runnable f) { 1307 CompletableFuture<?> b; 1308 if (f == null || (b = o.toCompletableFuture()) == null) 1309 throw new NullPointerException(); 1310 CompletableFuture<Void> d = newIncompleteFuture(); 1311 if (e != null || !d.biRun(this, b, f, null)) { 1312 BiRun<T,?> c = new BiRun<>(e, d, this, b, f); 1313 bipush(b, c); 1314 c.tryFire(SYNC); 1315 } 1316 return d; 1317 } 1318 1319 @SuppressWarnings("serial") 1320 static final class BiRelay<T,U> extends BiCompletion<T,U,Void> { // for And 1321 BiRelay(CompletableFuture<Void> dep, 1322 CompletableFuture<T> src, 1323 CompletableFuture<U> snd) { 1324 super(null, dep, src, snd); 1325 } 1326 final CompletableFuture<Void> tryFire(int mode) { 1327 CompletableFuture<Void> d; 1328 CompletableFuture<T> a; 1329 CompletableFuture<U> b; 1330 if ((d = dep) == null || !d.biRelay(a = src, b = snd)) 1331 return null; 1332 src = null; snd = null; dep = null; 1333 return d.postFire(a, b, mode); 1334 } 1335 } 1336 1337 boolean biRelay(CompletableFuture<?> a, CompletableFuture<?> b) { 1338 Object r, s; Throwable x; 1339 if (a == null || (r = a.result) == null || 1340 b == null || (s = b.result) == null) 1341 return false; 1342 if (result == null) { 1343 if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) 1344 completeThrowable(x, r); 1345 else if (s instanceof AltResult && (x = ((AltResult)s).ex) != null) 1346 completeThrowable(x, s); 1347 else 1348 completeNull(); 1349 } 1350 return true; 1351 } 1352 1353 /** Recursively constructs a tree of completions. */ 1354 static CompletableFuture<Void> andTree(CompletableFuture<?>[] cfs, 1355 int lo, int hi) { 1356 CompletableFuture<Void> d = new CompletableFuture<Void>(); 1357 if (lo > hi) // empty 1358 d.result = NIL; 1359 else { 1360 CompletableFuture<?> a, b; 1361 int mid = (lo + hi) >>> 1; 1362 if ((a = (lo == mid ? cfs[lo] : 1363 andTree(cfs, lo, mid))) == null || 1364 (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] : 1365 andTree(cfs, mid+1, hi))) == null) 1366 throw new NullPointerException(); 1367 if (!d.biRelay(a, b)) { 1368 BiRelay<?,?> c = new BiRelay<>(d, a, b); 1369 a.bipush(b, c); 1370 c.tryFire(SYNC); 1371 } 1372 } 1373 return d; 1374 } 1375 1376 /* ------------- Projected (Ored) BiCompletions -------------- */ 1377 1378 /** Pushes completion to this and b unless either done. */ 1379 final void orpush(CompletableFuture<?> b, BiCompletion<?,?,?> c) { 1380 if (c != null) { 1381 while ((b == null || b.result == null) && result == null) { 1382 if (tryPushStack(c)) { 1383 if (b != null && b != this && b.result == null) { 1384 Completion q = new CoCompletion(c); 1385 while (result == null && b.result == null && 1386 !b.tryPushStack(q)) 1387 lazySetNext(q, null); // clear on failure 1388 } 1389 break; 1390 } 1391 lazySetNext(c, null); // clear on failure 1392 } 1393 } 1394 } 1395 1396 @SuppressWarnings("serial") 1397 static final class OrApply<T,U extends T,V> extends BiCompletion<T,U,V> { 1398 Function<? super T,? extends V> fn; 1399 OrApply(Executor executor, CompletableFuture<V> dep, 1400 CompletableFuture<T> src, 1401 CompletableFuture<U> snd, 1402 Function<? super T,? extends V> fn) { 1403 super(executor, dep, src, snd); this.fn = fn; 1404 } 1405 final CompletableFuture<V> tryFire(int mode) { 1406 CompletableFuture<V> d; 1407 CompletableFuture<T> a; 1408 CompletableFuture<U> b; 1409 if ((d = dep) == null || 1410 !d.orApply(a = src, b = snd, fn, mode > 0 ? null : this)) 1411 return null; 1412 dep = null; src = null; snd = null; fn = null; 1413 return d.postFire(a, b, mode); 1414 } 1415 } 1416 1417 final <R,S extends R> boolean orApply(CompletableFuture<R> a, 1418 CompletableFuture<S> b, 1419 Function<? super R, ? extends T> f, 1420 OrApply<R,S,T> c) { 1421 Object r; Throwable x; 1422 if (a == null || b == null || 1423 ((r = a.result) == null && (r = b.result) == null) || f == null) 1424 return false; 1425 tryComplete: if (result == null) { 1426 try { 1427 if (c != null && !c.claim()) 1428 return false; 1429 if (r instanceof AltResult) { 1430 if ((x = ((AltResult)r).ex) != null) { 1431 completeThrowable(x, r); 1432 break tryComplete; 1433 } 1434 r = null; 1435 } 1436 @SuppressWarnings("unchecked") R rr = (R) r; 1437 completeValue(f.apply(rr)); 1438 } catch (Throwable ex) { 1439 completeThrowable(ex); 1440 } 1441 } 1442 return true; 1443 } 1444 1445 private <U extends T,V> CompletableFuture<V> orApplyStage( 1446 Executor e, CompletionStage<U> o, 1447 Function<? super T, ? extends V> f) { 1448 CompletableFuture<U> b; 1449 if (f == null || (b = o.toCompletableFuture()) == null) 1450 throw new NullPointerException(); 1451 CompletableFuture<V> d = newIncompleteFuture(); 1452 if (e != null || !d.orApply(this, b, f, null)) { 1453 OrApply<T,U,V> c = new OrApply<T,U,V>(e, d, this, b, f); 1454 orpush(b, c); 1455 c.tryFire(SYNC); 1456 } 1457 return d; 1458 } 1459 1460 @SuppressWarnings("serial") 1461 static final class OrAccept<T,U extends T> extends BiCompletion<T,U,Void> { 1462 Consumer<? super T> fn; 1463 OrAccept(Executor executor, CompletableFuture<Void> dep, 1464 CompletableFuture<T> src, 1465 CompletableFuture<U> snd, 1466 Consumer<? super T> fn) { 1467 super(executor, dep, src, snd); this.fn = fn; 1468 } 1469 final CompletableFuture<Void> tryFire(int mode) { 1470 CompletableFuture<Void> d; 1471 CompletableFuture<T> a; 1472 CompletableFuture<U> b; 1473 if ((d = dep) == null || 1474 !d.orAccept(a = src, b = snd, fn, mode > 0 ? null : this)) 1475 return null; 1476 dep = null; src = null; snd = null; fn = null; 1477 return d.postFire(a, b, mode); 1478 } 1479 } 1480 1481 final <R,S extends R> boolean orAccept(CompletableFuture<R> a, 1482 CompletableFuture<S> b, 1483 Consumer<? super R> f, 1484 OrAccept<R,S> c) { 1485 Object r; Throwable x; 1486 if (a == null || b == null || 1487 ((r = a.result) == null && (r = b.result) == null) || f == null) 1488 return false; 1489 tryComplete: if (result == null) { 1490 try { 1491 if (c != null && !c.claim()) 1492 return false; 1493 if (r instanceof AltResult) { 1494 if ((x = ((AltResult)r).ex) != null) { 1495 completeThrowable(x, r); 1496 break tryComplete; 1497 } 1498 r = null; 1499 } 1500 @SuppressWarnings("unchecked") R rr = (R) r; 1501 f.accept(rr); 1502 completeNull(); 1503 } catch (Throwable ex) { 1504 completeThrowable(ex); 1505 } 1506 } 1507 return true; 1508 } 1509 1510 private <U extends T> CompletableFuture<Void> orAcceptStage( 1511 Executor e, CompletionStage<U> o, Consumer<? super T> f) { 1512 CompletableFuture<U> b; 1513 if (f == null || (b = o.toCompletableFuture()) == null) 1514 throw new NullPointerException(); 1515 CompletableFuture<Void> d = newIncompleteFuture(); 1516 if (e != null || !d.orAccept(this, b, f, null)) { 1517 OrAccept<T,U> c = new OrAccept<T,U>(e, d, this, b, f); 1518 orpush(b, c); 1519 c.tryFire(SYNC); 1520 } 1521 return d; 1522 } 1523 1524 @SuppressWarnings("serial") 1525 static final class OrRun<T,U> extends BiCompletion<T,U,Void> { 1526 Runnable fn; 1527 OrRun(Executor executor, CompletableFuture<Void> dep, 1528 CompletableFuture<T> src, 1529 CompletableFuture<U> snd, 1530 Runnable fn) { 1531 super(executor, dep, src, snd); this.fn = fn; 1532 } 1533 final CompletableFuture<Void> tryFire(int mode) { 1534 CompletableFuture<Void> d; 1535 CompletableFuture<T> a; 1536 CompletableFuture<U> b; 1537 if ((d = dep) == null || 1538 !d.orRun(a = src, b = snd, fn, mode > 0 ? null : this)) 1539 return null; 1540 dep = null; src = null; snd = null; fn = null; 1541 return d.postFire(a, b, mode); 1542 } 1543 } 1544 1545 final boolean orRun(CompletableFuture<?> a, CompletableFuture<?> b, 1546 Runnable f, OrRun<?,?> c) { 1547 Object r; Throwable x; 1548 if (a == null || b == null || 1549 ((r = a.result) == null && (r = b.result) == null) || f == null) 1550 return false; 1551 if (result == null) { 1552 try { 1553 if (c != null && !c.claim()) 1554 return false; 1555 if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) 1556 completeThrowable(x, r); 1557 else { 1558 f.run(); 1559 completeNull(); 1560 } 1561 } catch (Throwable ex) { 1562 completeThrowable(ex); 1563 } 1564 } 1565 return true; 1566 } 1567 1568 private CompletableFuture<Void> orRunStage(Executor e, CompletionStage<?> o, 1569 Runnable f) { 1570 CompletableFuture<?> b; 1571 if (f == null || (b = o.toCompletableFuture()) == null) 1572 throw new NullPointerException(); 1573 CompletableFuture<Void> d = newIncompleteFuture(); 1574 if (e != null || !d.orRun(this, b, f, null)) { 1575 OrRun<T,?> c = new OrRun<>(e, d, this, b, f); 1576 orpush(b, c); 1577 c.tryFire(SYNC); 1578 } 1579 return d; 1580 } 1581 1582 @SuppressWarnings("serial") 1583 static final class OrRelay<T,U> extends BiCompletion<T,U,Object> { // for Or 1584 OrRelay(CompletableFuture<Object> dep, CompletableFuture<T> src, 1585 CompletableFuture<U> snd) { 1586 super(null, dep, src, snd); 1587 } 1588 final CompletableFuture<Object> tryFire(int mode) { 1589 CompletableFuture<Object> d; 1590 CompletableFuture<T> a; 1591 CompletableFuture<U> b; 1592 if ((d = dep) == null || !d.orRelay(a = src, b = snd)) 1593 return null; 1594 src = null; snd = null; dep = null; 1595 return d.postFire(a, b, mode); 1596 } 1597 } 1598 1599 final boolean orRelay(CompletableFuture<?> a, CompletableFuture<?> b) { 1600 Object r; 1601 if (a == null || b == null || 1602 ((r = a.result) == null && (r = b.result) == null)) 1603 return false; 1604 if (result == null) 1605 completeRelay(r); 1606 return true; 1607 } 1608 1609 /** Recursively constructs a tree of completions. */ 1610 static CompletableFuture<Object> orTree(CompletableFuture<?>[] cfs, 1611 int lo, int hi) { 1612 CompletableFuture<Object> d = new CompletableFuture<Object>(); 1613 if (lo <= hi) { 1614 CompletableFuture<?> a, b; 1615 int mid = (lo + hi) >>> 1; 1616 if ((a = (lo == mid ? cfs[lo] : 1617 orTree(cfs, lo, mid))) == null || 1618 (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] : 1619 orTree(cfs, mid+1, hi))) == null) 1620 throw new NullPointerException(); 1621 if (!d.orRelay(a, b)) { 1622 OrRelay<?,?> c = new OrRelay<>(d, a, b); 1623 a.orpush(b, c); 1624 c.tryFire(SYNC); 1625 } 1626 } 1627 return d; 1628 } 1629 1630 /* ------------- Zero-input Async forms -------------- */ 1631 1632 @SuppressWarnings("serial") 1633 static final class AsyncSupply<T> extends ForkJoinTask<Void> 1634 implements Runnable, AsynchronousCompletionTask { 1635 CompletableFuture<T> dep; Supplier<? extends T> fn; 1636 AsyncSupply(CompletableFuture<T> dep, Supplier<? extends T> fn) { 1637 this.dep = dep; this.fn = fn; 1638 } 1639 1640 public final Void getRawResult() { return null; } 1641 public final void setRawResult(Void v) {} 1642 public final boolean exec() { run(); return true; } 1643 1644 public void run() { 1645 CompletableFuture<T> d; Supplier<? extends T> f; 1646 if ((d = dep) != null && (f = fn) != null) { 1647 dep = null; fn = null; 1648 if (d.result == null) { 1649 try { 1650 d.completeValue(f.get()); 1651 } catch (Throwable ex) { 1652 d.completeThrowable(ex); 1653 } 1654 } 1655 d.postComplete(); 1656 } 1657 } 1658 } 1659 1660 static <U> CompletableFuture<U> asyncSupplyStage(Executor e, 1661 Supplier<U> f) { 1662 if (f == null) throw new NullPointerException(); 1663 CompletableFuture<U> d = new CompletableFuture<U>(); 1664 e.execute(new AsyncSupply<U>(d, f)); 1665 return d; 1666 } 1667 1668 @SuppressWarnings("serial") 1669 static final class AsyncRun extends ForkJoinTask<Void> 1670 implements Runnable, AsynchronousCompletionTask { 1671 CompletableFuture<Void> dep; Runnable fn; 1672 AsyncRun(CompletableFuture<Void> dep, Runnable fn) { 1673 this.dep = dep; this.fn = fn; 1674 } 1675 1676 public final Void getRawResult() { return null; } 1677 public final void setRawResult(Void v) {} 1678 public final boolean exec() { run(); return true; } 1679 1680 public void run() { 1681 CompletableFuture<Void> d; Runnable f; 1682 if ((d = dep) != null && (f = fn) != null) { 1683 dep = null; fn = null; 1684 if (d.result == null) { 1685 try { 1686 f.run(); 1687 d.completeNull(); 1688 } catch (Throwable ex) { 1689 d.completeThrowable(ex); 1690 } 1691 } 1692 d.postComplete(); 1693 } 1694 } 1695 } 1696 1697 static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) { 1698 if (f == null) throw new NullPointerException(); 1699 CompletableFuture<Void> d = new CompletableFuture<Void>(); 1700 e.execute(new AsyncRun(d, f)); 1701 return d; 1702 } 1703 1704 /* ------------- Signallers -------------- */ 1705 1706 /** 1707 * Completion for recording and releasing a waiting thread. This 1708 * class implements ManagedBlocker to avoid starvation when 1709 * blocking actions pile up in ForkJoinPools. 1710 */ 1711 @SuppressWarnings("serial") 1712 static final class Signaller extends Completion 1713 implements ForkJoinPool.ManagedBlocker { 1714 long nanos; // remaining wait time if timed 1715 final long deadline; // non-zero if timed 1716 final boolean interruptible; 1717 boolean interrupted; 1718 volatile Thread thread; 1719 1720 Signaller(boolean interruptible, long nanos, long deadline) { 1721 this.thread = Thread.currentThread(); 1722 this.interruptible = interruptible; 1723 this.nanos = nanos; 1724 this.deadline = deadline; 1725 } 1726 final CompletableFuture<?> tryFire(int ignore) { 1727 Thread w; // no need to atomically claim 1728 if ((w = thread) != null) { 1729 thread = null; 1730 LockSupport.unpark(w); 1731 } 1732 return null; 1733 } 1734 public boolean isReleasable() { 1735 if (Thread.interrupted()) 1736 interrupted = true; 1737 return ((interrupted && interruptible) || 1738 (deadline != 0L && 1739 (nanos <= 0L || 1740 (nanos = deadline - System.nanoTime()) <= 0L)) || 1741 thread == null); 1742 } 1743 public boolean block() { 1744 while (!isReleasable()) { 1745 if (deadline == 0L) 1746 LockSupport.park(this); 1747 else 1748 LockSupport.parkNanos(this, nanos); 1749 } 1750 return true; 1751 } 1752 final boolean isLive() { return thread != null; } 1753 } 1754 1755 /** 1756 * Returns raw result after waiting, or null if interruptible and 1757 * interrupted. 1758 */ 1759 private Object waitingGet(boolean interruptible) { 1760 Signaller q = null; 1761 boolean queued = false; 1762 int spins = SPINS; 1763 Object r; 1764 while ((r = result) == null) { 1765 if (spins > 0) { 1766 if (ThreadLocalRandom.nextSecondarySeed() >= 0) 1767 --spins; 1768 } 1769 else if (q == null) 1770 q = new Signaller(interruptible, 0L, 0L); 1771 else if (!queued) 1772 queued = tryPushStack(q); 1773 else { 1774 try { 1775 ForkJoinPool.managedBlock(q); 1776 } catch (InterruptedException ie) { // currently cannot happen 1777 q.interrupted = true; 1778 } 1779 if (q.interrupted && interruptible) 1780 break; 1781 } 1782 } 1783 if (q != null) { 1784 q.thread = null; 1785 if (q.interrupted) { 1786 if (interruptible) 1787 cleanStack(); 1788 else 1789 Thread.currentThread().interrupt(); 1790 } 1791 } 1792 if (r != null) 1793 postComplete(); 1794 return r; 1795 } 1796 1797 /** 1798 * Returns raw result after waiting, or null if interrupted, or 1799 * throws TimeoutException on timeout. 1800 */ 1801 private Object timedGet(long nanos) throws TimeoutException { 1802 if (Thread.interrupted()) 1803 return null; 1804 if (nanos > 0L) { 1805 long d = System.nanoTime() + nanos; 1806 long deadline = (d == 0L) ? 1L : d; // avoid 0 1807 Signaller q = null; 1808 boolean queued = false; 1809 Object r; 1810 while ((r = result) == null) { // similar to untimed, without spins 1811 if (q == null) 1812 q = new Signaller(true, nanos, deadline); 1813 else if (!queued) 1814 queued = tryPushStack(q); 1815 else if (q.nanos <= 0L) 1816 break; 1817 else { 1818 try { 1819 ForkJoinPool.managedBlock(q); 1820 } catch (InterruptedException ie) { 1821 q.interrupted = true; 1822 } 1823 if (q.interrupted) 1824 break; 1825 } 1826 } 1827 if (q != null) 1828 q.thread = null; 1829 if (r != null) 1830 postComplete(); 1831 else 1832 cleanStack(); 1833 if (r != null || (q != null && q.interrupted)) 1834 return r; 1835 } 1836 throw new TimeoutException(); 1837 } 1838 1839 /* ------------- public methods -------------- */ 1840 1841 /** 1842 * Creates a new incomplete CompletableFuture. 1843 */ 1844 public CompletableFuture() { 1845 } 1846 1847 /** 1848 * Creates a new complete CompletableFuture with given encoded result. 1849 */ 1850 CompletableFuture(Object r) { 1851 this.result = r; 1852 } 1853 1854 /** 1855 * Returns a new CompletableFuture that is asynchronously completed 1856 * by a task running in the {@link ForkJoinPool#commonPool()} with 1857 * the value obtained by calling the given Supplier. 1858 * 1859 * @param supplier a function returning the value to be used 1860 * to complete the returned CompletableFuture 1861 * @param <U> the function's return type 1862 * @return the new CompletableFuture 1863 */ 1864 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { 1865 return asyncSupplyStage(ASYNC_POOL, supplier); 1866 } 1867 1868 /** 1869 * Returns a new CompletableFuture that is asynchronously completed 1870 * by a task running in the given executor with the value obtained 1871 * by calling the given Supplier. 1872 * 1873 * @param supplier a function returning the value to be used 1874 * to complete the returned CompletableFuture 1875 * @param executor the executor to use for asynchronous execution 1876 * @param <U> the function's return type 1877 * @return the new CompletableFuture 1878 */ 1879 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, 1880 Executor executor) { 1881 return asyncSupplyStage(screenExecutor(executor), supplier); 1882 } 1883 1884 /** 1885 * Returns a new CompletableFuture that is asynchronously completed 1886 * by a task running in the {@link ForkJoinPool#commonPool()} after 1887 * it runs the given action. 1888 * 1889 * @param runnable the action to run before completing the 1890 * returned CompletableFuture 1891 * @return the new CompletableFuture 1892 */ 1893 public static CompletableFuture<Void> runAsync(Runnable runnable) { 1894 return asyncRunStage(ASYNC_POOL, runnable); 1895 } 1896 1897 /** 1898 * Returns a new CompletableFuture that is asynchronously completed 1899 * by a task running in the given executor after it runs the given 1900 * action. 1901 * 1902 * @param runnable the action to run before completing the 1903 * returned CompletableFuture 1904 * @param executor the executor to use for asynchronous execution 1905 * @return the new CompletableFuture 1906 */ 1907 public static CompletableFuture<Void> runAsync(Runnable runnable, 1908 Executor executor) { 1909 return asyncRunStage(screenExecutor(executor), runnable); 1910 } 1911 1912 /** 1913 * Returns a new CompletableFuture that is already completed with 1914 * the given value. 1915 * 1916 * @param value the value 1917 * @param <U> the type of the value 1918 * @return the completed CompletableFuture 1919 */ 1920 public static <U> CompletableFuture<U> completedFuture(U value) { 1921 return new CompletableFuture<U>((value == null) ? NIL : value); 1922 } 1923 1924 /** 1925 * Returns {@code true} if completed in any fashion: normally, 1926 * exceptionally, or via cancellation. 1927 * 1928 * @return {@code true} if completed 1929 */ 1930 public boolean isDone() { 1931 return result != null; 1932 } 1933 1934 /** 1935 * Waits if necessary for this future to complete, and then 1936 * returns its result. 1937 * 1938 * @return the result value 1939 * @throws CancellationException if this future was cancelled 1940 * @throws ExecutionException if this future completed exceptionally 1941 * @throws InterruptedException if the current thread was interrupted 1942 * while waiting 1943 */ 1944 public T get() throws InterruptedException, ExecutionException { 1945 Object r; 1946 return reportGet((r = result) == null ? waitingGet(true) : r); 1947 } 1948 1949 /** 1950 * Waits if necessary for at most the given time for this future 1951 * to complete, and then returns its result, if available. 1952 * 1953 * @param timeout the maximum time to wait 1954 * @param unit the time unit of the timeout argument 1955 * @return the result value 1956 * @throws CancellationException if this future was cancelled 1957 * @throws ExecutionException if this future completed exceptionally 1958 * @throws InterruptedException if the current thread was interrupted 1959 * while waiting 1960 * @throws TimeoutException if the wait timed out 1961 */ 1962 public T get(long timeout, TimeUnit unit) 1963 throws InterruptedException, ExecutionException, TimeoutException { 1964 Object r; 1965 long nanos = unit.toNanos(timeout); 1966 return reportGet((r = result) == null ? timedGet(nanos) : r); 1967 } 1968 1969 /** 1970 * Returns the result value when complete, or throws an 1971 * (unchecked) exception if completed exceptionally. To better 1972 * conform with the use of common functional forms, if a 1973 * computation involved in the completion of this 1974 * CompletableFuture threw an exception, this method throws an 1975 * (unchecked) {@link CompletionException} with the underlying 1976 * exception as its cause. 1977 * 1978 * @return the result value 1979 * @throws CancellationException if the computation was cancelled 1980 * @throws CompletionException if this future completed 1981 * exceptionally or a completion computation threw an exception 1982 */ 1983 public T join() { 1984 Object r; 1985 return reportJoin((r = result) == null ? waitingGet(false) : r); 1986 } 1987 1988 /** 1989 * Returns the result value (or throws any encountered exception) 1990 * if completed, else returns the given valueIfAbsent. 1991 * 1992 * @param valueIfAbsent the value to return if not completed 1993 * @return the result value, if completed, else the given valueIfAbsent 1994 * @throws CancellationException if the computation was cancelled 1995 * @throws CompletionException if this future completed 1996 * exceptionally or a completion computation threw an exception 1997 */ 1998 public T getNow(T valueIfAbsent) { 1999 Object r; 2000 return ((r = result) == null) ? valueIfAbsent : reportJoin(r); 2001 } 2002 2003 /** 2004 * If not already completed, sets the value returned by {@link 2005 * #get()} and related methods to the given value. 2006 * 2007 * @param value the result value 2008 * @return {@code true} if this invocation caused this CompletableFuture 2009 * to transition to a completed state, else {@code false} 2010 */ 2011 public boolean complete(T value) { 2012 boolean triggered = completeValue(value); 2013 postComplete(); 2014 return triggered; 2015 } 2016 2017 /** 2018 * If not already completed, causes invocations of {@link #get()} 2019 * and related methods to throw the given exception. 2020 * 2021 * @param ex the exception 2022 * @return {@code true} if this invocation caused this CompletableFuture 2023 * to transition to a completed state, else {@code false} 2024 */ 2025 public boolean completeExceptionally(Throwable ex) { 2026 if (ex == null) throw new NullPointerException(); 2027 boolean triggered = internalComplete(new AltResult(ex)); 2028 postComplete(); 2029 return triggered; 2030 } 2031 2032 public <U> CompletableFuture<U> thenApply( 2033 Function<? super T,? extends U> fn) { 2034 return uniApplyStage(null, fn); 2035 } 2036 2037 public <U> CompletableFuture<U> thenApplyAsync( 2038 Function<? super T,? extends U> fn) { 2039 return uniApplyStage(defaultExecutor(), fn); 2040 } 2041 2042 public <U> CompletableFuture<U> thenApplyAsync( 2043 Function<? super T,? extends U> fn, Executor executor) { 2044 return uniApplyStage(screenExecutor(executor), fn); 2045 } 2046 2047 public CompletableFuture<Void> thenAccept(Consumer<? super T> action) { 2048 return uniAcceptStage(null, action); 2049 } 2050 2051 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) { 2052 return uniAcceptStage(defaultExecutor(), action); 2053 } 2054 2055 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, 2056 Executor executor) { 2057 return uniAcceptStage(screenExecutor(executor), action); 2058 } 2059 2060 public CompletableFuture<Void> thenRun(Runnable action) { 2061 return uniRunStage(null, action); 2062 } 2063 2064 public CompletableFuture<Void> thenRunAsync(Runnable action) { 2065 return uniRunStage(defaultExecutor(), action); 2066 } 2067 2068 public CompletableFuture<Void> thenRunAsync(Runnable action, 2069 Executor executor) { 2070 return uniRunStage(screenExecutor(executor), action); 2071 } 2072 2073 public <U,V> CompletableFuture<V> thenCombine( 2074 CompletionStage<? extends U> other, 2075 BiFunction<? super T,? super U,? extends V> fn) { 2076 return biApplyStage(null, other, fn); 2077 } 2078 2079 public <U,V> CompletableFuture<V> thenCombineAsync( 2080 CompletionStage<? extends U> other, 2081 BiFunction<? super T,? super U,? extends V> fn) { 2082 return biApplyStage(defaultExecutor(), other, fn); 2083 } 2084 2085 public <U,V> CompletableFuture<V> thenCombineAsync( 2086 CompletionStage<? extends U> other, 2087 BiFunction<? super T,? super U,? extends V> fn, Executor executor) { 2088 return biApplyStage(screenExecutor(executor), other, fn); 2089 } 2090 2091 public <U> CompletableFuture<Void> thenAcceptBoth( 2092 CompletionStage<? extends U> other, 2093 BiConsumer<? super T, ? super U> action) { 2094 return biAcceptStage(null, other, action); 2095 } 2096 2097 public <U> CompletableFuture<Void> thenAcceptBothAsync( 2098 CompletionStage<? extends U> other, 2099 BiConsumer<? super T, ? super U> action) { 2100 return biAcceptStage(defaultExecutor(), other, action); 2101 } 2102 2103 public <U> CompletableFuture<Void> thenAcceptBothAsync( 2104 CompletionStage<? extends U> other, 2105 BiConsumer<? super T, ? super U> action, Executor executor) { 2106 return biAcceptStage(screenExecutor(executor), other, action); 2107 } 2108 2109 public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, 2110 Runnable action) { 2111 return biRunStage(null, other, action); 2112 } 2113 2114 public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, 2115 Runnable action) { 2116 return biRunStage(defaultExecutor(), other, action); 2117 } 2118 2119 public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, 2120 Runnable action, 2121 Executor executor) { 2122 return biRunStage(screenExecutor(executor), other, action); 2123 } 2124 2125 public <U> CompletableFuture<U> applyToEither( 2126 CompletionStage<? extends T> other, Function<? super T, U> fn) { 2127 return orApplyStage(null, other, fn); 2128 } 2129 2130 public <U> CompletableFuture<U> applyToEitherAsync( 2131 CompletionStage<? extends T> other, Function<? super T, U> fn) { 2132 return orApplyStage(defaultExecutor(), other, fn); 2133 } 2134 2135 public <U> CompletableFuture<U> applyToEitherAsync( 2136 CompletionStage<? extends T> other, Function<? super T, U> fn, 2137 Executor executor) { 2138 return orApplyStage(screenExecutor(executor), other, fn); 2139 } 2140 2141 public CompletableFuture<Void> acceptEither( 2142 CompletionStage<? extends T> other, Consumer<? super T> action) { 2143 return orAcceptStage(null, other, action); 2144 } 2145 2146 public CompletableFuture<Void> acceptEitherAsync( 2147 CompletionStage<? extends T> other, Consumer<? super T> action) { 2148 return orAcceptStage(defaultExecutor(), other, action); 2149 } 2150 2151 public CompletableFuture<Void> acceptEitherAsync( 2152 CompletionStage<? extends T> other, Consumer<? super T> action, 2153 Executor executor) { 2154 return orAcceptStage(screenExecutor(executor), other, action); 2155 } 2156 2157 public CompletableFuture<Void> runAfterEither(CompletionStage<?> other, 2158 Runnable action) { 2159 return orRunStage(null, other, action); 2160 } 2161 2162 public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, 2163 Runnable action) { 2164 return orRunStage(defaultExecutor(), other, action); 2165 } 2166 2167 public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, 2168 Runnable action, 2169 Executor executor) { 2170 return orRunStage(screenExecutor(executor), other, action); 2171 } 2172 2173 public <U> CompletableFuture<U> thenCompose( 2174 Function<? super T, ? extends CompletionStage<U>> fn) { 2175 return uniComposeStage(null, fn); 2176 } 2177 2178 public <U> CompletableFuture<U> thenComposeAsync( 2179 Function<? super T, ? extends CompletionStage<U>> fn) { 2180 return uniComposeStage(defaultExecutor(), fn); 2181 } 2182 2183 public <U> CompletableFuture<U> thenComposeAsync( 2184 Function<? super T, ? extends CompletionStage<U>> fn, 2185 Executor executor) { 2186 return uniComposeStage(screenExecutor(executor), fn); 2187 } 2188 2189 public CompletableFuture<T> whenComplete( 2190 BiConsumer<? super T, ? super Throwable> action) { 2191 return uniWhenCompleteStage(null, action); 2192 } 2193 2194 public CompletableFuture<T> whenCompleteAsync( 2195 BiConsumer<? super T, ? super Throwable> action) { 2196 return uniWhenCompleteStage(defaultExecutor(), action); 2197 } 2198 2199 public CompletableFuture<T> whenCompleteAsync( 2200 BiConsumer<? super T, ? super Throwable> action, Executor executor) { 2201 return uniWhenCompleteStage(screenExecutor(executor), action); 2202 } 2203 2204 public <U> CompletableFuture<U> handle( 2205 BiFunction<? super T, Throwable, ? extends U> fn) { 2206 return uniHandleStage(null, fn); 2207 } 2208 2209 public <U> CompletableFuture<U> handleAsync( 2210 BiFunction<? super T, Throwable, ? extends U> fn) { 2211 return uniHandleStage(defaultExecutor(), fn); 2212 } 2213 2214 public <U> CompletableFuture<U> handleAsync( 2215 BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) { 2216 return uniHandleStage(screenExecutor(executor), fn); 2217 } 2218 2219 /** 2220 * Returns this CompletableFuture. 2221 * 2222 * @return this CompletableFuture 2223 */ 2224 public CompletableFuture<T> toCompletableFuture() { 2225 return this; 2226 } 2227 2228 // not in interface CompletionStage 2229 2230 /** 2231 * Returns a new CompletableFuture that is completed when this 2232 * CompletableFuture completes, with the result of the given 2233 * function of the exception triggering this CompletableFuture's 2234 * completion when it completes exceptionally; otherwise, if this 2235 * CompletableFuture completes normally, then the returned 2236 * CompletableFuture also completes normally with the same value. 2237 * Note: More flexible versions of this functionality are 2238 * available using methods {@code whenComplete} and {@code handle}. 2239 * 2240 * @param fn the function to use to compute the value of the 2241 * returned CompletableFuture if this CompletableFuture completed 2242 * exceptionally 2243 * @return the new CompletableFuture 2244 */ 2245 public CompletableFuture<T> exceptionally( 2246 Function<Throwable, ? extends T> fn) { 2247 return uniExceptionallyStage(fn); 2248 } 2249 2250 2251 /* ------------- Arbitrary-arity constructions -------------- */ 2252 2253 /** 2254 * Returns a new CompletableFuture that is completed when all of 2255 * the given CompletableFutures complete. If any of the given 2256 * CompletableFutures complete exceptionally, then the returned 2257 * CompletableFuture also does so, with a CompletionException 2258 * holding this exception as its cause. Otherwise, the results, 2259 * if any, of the given CompletableFutures are not reflected in 2260 * the returned CompletableFuture, but may be obtained by 2261 * inspecting them individually. If no CompletableFutures are 2262 * provided, returns a CompletableFuture completed with the value 2263 * {@code null}. 2264 * 2265 * <p>Among the applications of this method is to await completion 2266 * of a set of independent CompletableFutures before continuing a 2267 * program, as in: {@code CompletableFuture.allOf(c1, c2, 2268 * c3).join();}. 2269 * 2270 * @param cfs the CompletableFutures 2271 * @return a new CompletableFuture that is completed when all of the 2272 * given CompletableFutures complete 2273 * @throws NullPointerException if the array or any of its elements are 2274 * {@code null} 2275 */ 2276 public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) { 2277 return andTree(cfs, 0, cfs.length - 1); 2278 } 2279 2280 /** 2281 * Returns a new CompletableFuture that is completed when any of 2282 * the given CompletableFutures complete, with the same result. 2283 * Otherwise, if it completed exceptionally, the returned 2284 * CompletableFuture also does so, with a CompletionException 2285 * holding this exception as its cause. If no CompletableFutures 2286 * are provided, returns an incomplete CompletableFuture. 2287 * 2288 * @param cfs the CompletableFutures 2289 * @return a new CompletableFuture that is completed with the 2290 * result or exception of any of the given CompletableFutures when 2291 * one completes 2292 * @throws NullPointerException if the array or any of its elements are 2293 * {@code null} 2294 */ 2295 public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) { 2296 return orTree(cfs, 0, cfs.length - 1); 2297 } 2298 2299 /* ------------- Control and status methods -------------- */ 2300 2301 /** 2302 * If not already completed, completes this CompletableFuture with 2303 * a {@link CancellationException}. Dependent CompletableFutures 2304 * that have not already completed will also complete 2305 * exceptionally, with a {@link CompletionException} caused by 2306 * this {@code CancellationException}. 2307 * 2308 * @param mayInterruptIfRunning this value has no effect in this 2309 * implementation because interrupts are not used to control 2310 * processing. 2311 * 2312 * @return {@code true} if this task is now cancelled 2313 */ 2314 public boolean cancel(boolean mayInterruptIfRunning) { 2315 boolean cancelled = (result == null) && 2316 internalComplete(new AltResult(new CancellationException())); 2317 postComplete(); 2318 return cancelled || isCancelled(); 2319 } 2320 2321 /** 2322 * Returns {@code true} if this CompletableFuture was cancelled 2323 * before it completed normally. 2324 * 2325 * @return {@code true} if this CompletableFuture was cancelled 2326 * before it completed normally 2327 */ 2328 public boolean isCancelled() { 2329 Object r; 2330 return ((r = result) instanceof AltResult) && 2331 (((AltResult)r).ex instanceof CancellationException); 2332 } 2333 2334 /** 2335 * Returns {@code true} if this CompletableFuture completed 2336 * exceptionally, in any way. Possible causes include 2337 * cancellation, explicit invocation of {@code 2338 * completeExceptionally}, and abrupt termination of a 2339 * CompletionStage action. 2340 * 2341 * @return {@code true} if this CompletableFuture completed 2342 * exceptionally 2343 */ 2344 public boolean isCompletedExceptionally() { 2345 Object r; 2346 return ((r = result) instanceof AltResult) && r != NIL; 2347 } 2348 2349 /** 2350 * Forcibly sets or resets the value subsequently returned by 2351 * method {@link #get()} and related methods, whether or not 2352 * already completed. This method is designed for use only in 2353 * error recovery actions, and even in such situations may result 2354 * in ongoing dependent completions using established versus 2355 * overwritten outcomes. 2356 * 2357 * @param value the completion value 2358 */ 2359 public void obtrudeValue(T value) { 2360 result = (value == null) ? NIL : value; 2361 postComplete(); 2362 } 2363 2364 /** 2365 * Forcibly causes subsequent invocations of method {@link #get()} 2366 * and related methods to throw the given exception, whether or 2367 * not already completed. This method is designed for use only in 2368 * error recovery actions, and even in such situations may result 2369 * in ongoing dependent completions using established versus 2370 * overwritten outcomes. 2371 * 2372 * @param ex the exception 2373 * @throws NullPointerException if the exception is null 2374 */ 2375 public void obtrudeException(Throwable ex) { 2376 if (ex == null) throw new NullPointerException(); 2377 result = new AltResult(ex); 2378 postComplete(); 2379 } 2380 2381 /** 2382 * Returns the estimated number of CompletableFutures whose 2383 * completions are awaiting completion of this CompletableFuture. 2384 * This method is designed for use in monitoring system state, not 2385 * for synchronization control. 2386 * 2387 * @return the number of dependent CompletableFutures 2388 */ 2389 public int getNumberOfDependents() { 2390 int count = 0; 2391 for (Completion p = stack; p != null; p = p.next) 2392 ++count; 2393 return count; 2394 } 2395 2396 /** 2397 * Returns a string identifying this CompletableFuture, as well as 2398 * its completion state. The state, in brackets, contains the 2399 * String {@code "Completed Normally"} or the String {@code 2400 * "Completed Exceptionally"}, or the String {@code "Not 2401 * completed"} followed by the number of CompletableFutures 2402 * dependent upon its completion, if any. 2403 * 2404 * @return a string identifying this CompletableFuture, as well as its state 2405 */ 2406 public String toString() { 2407 Object r = result; 2408 int count = 0; // avoid call to getNumberOfDependents in case disabled 2409 for (Completion p = stack; p != null; p = p.next) 2410 ++count; 2411 return super.toString() + 2412 ((r == null) ? 2413 ((count == 0) ? 2414 "[Not completed]" : 2415 "[Not completed, " + count + " dependents]") : 2416 (((r instanceof AltResult) && ((AltResult)r).ex != null) ? 2417 "[Completed exceptionally]" : 2418 "[Completed normally]")); 2419 } 2420 2421 // jdk9 additions 2422 2423 /** 2424 * Returns a new incomplete CompletableFuture of the type to be 2425 * returned by a CompletionStage method. Subclasses should 2426 * normally override this method to return an instance of the same 2427 * class as this CompletableFuture. The default implementation 2428 * returns an instance of class CompletableFuture. 2429 * 2430 * @param <U> the type of the value 2431 * @return a new CompletableFuture 2432 * @since 1.9 2433 */ 2434 public <U> CompletableFuture<U> newIncompleteFuture() { 2435 return new CompletableFuture<U>(); 2436 } 2437 2438 /** 2439 * Returns the default Executor used for async methods that do not 2440 * specify an Executor. This class uses the {@link 2441 * ForkJoinPool#commonPool()} if it supports more than one 2442 * parallel thread, or else an Executor using one thread per async 2443 * task. This method may be overridden in subclasses to return 2444 * an Executor that provides at least one independent thread. 2445 * 2446 * @return the executor 2447 * @since 1.9 2448 */ 2449 public Executor defaultExecutor() { 2450 return ASYNC_POOL; 2451 } 2452 2453 /** 2454 * Returns a new CompletableFuture that is completed normally with 2455 * the same value as this CompletableFuture when it completes 2456 * normally. If this CompletableFuture completes exceptionally, 2457 * then the returned CompletableFuture completes exceptionally 2458 * with a CompletionException with this exception as cause. The 2459 * behavior is equivalent to {@code thenApply(x -> x)}. This 2460 * method may be useful as a form of "defensive copying", to 2461 * prevent clients from completing, while still being able to 2462 * arrange dependent actions. 2463 * 2464 * @return the new CompletableFuture 2465 * @since 1.9 2466 */ 2467 public CompletableFuture<T> copy() { 2468 return uniCopyStage(); 2469 } 2470 2471 /** 2472 * Returns a new CompletionStage that is completed normally with 2473 * the same value as this CompletableFuture when it completes 2474 * normally, and cannot be independently completed or otherwise 2475 * used in ways not defined by the methods of interface {@link 2476 * CompletionStage}. If this CompletableFuture completes 2477 * exceptionally, then the returned CompletionStage completes 2478 * exceptionally with a CompletionException with this exception as 2479 * cause. 2480 * 2481 * @return the new CompletionStage 2482 * @since 1.9 2483 */ 2484 public CompletionStage<T> minimalCompletionStage() { 2485 return uniAsMinimalStage(); 2486 } 2487 2488 /** 2489 * Completes this CompletableFuture with the result of 2490 * the given Supplier function invoked from an asynchronous 2491 * task using the given executor. 2492 * 2493 * @param supplier a function returning the value to be used 2494 * to complete this CompletableFuture 2495 * @param executor the executor to use for asynchronous execution 2496 * @return this CompletableFuture 2497 * @since 1.9 2498 */ 2499 public CompletableFuture<T> completeAsync(Supplier<? extends T> supplier, 2500 Executor executor) { 2501 if (supplier == null || executor == null) 2502 throw new NullPointerException(); 2503 executor.execute(new AsyncSupply<T>(this, supplier)); 2504 return this; 2505 } 2506 2507 /** 2508 * Completes this CompletableFuture with the result of the given 2509 * Supplier function invoked from an asynchronous task using the 2510 * default executor. 2511 * 2512 * @param supplier a function returning the value to be used 2513 * to complete this CompletableFuture 2514 * @return this CompletableFuture 2515 * @since 1.9 2516 */ 2517 public CompletableFuture<T> completeAsync(Supplier<? extends T> supplier) { 2518 return completeAsync(supplier, defaultExecutor()); 2519 } 2520 2521 /** 2522 * Exceptionally completes this CompletableFuture with 2523 * a {@link TimeoutException} if not otherwise completed 2524 * before the given timeout. 2525 * 2526 * @param timeout how long to wait before completing exceptionally 2527 * with a TimeoutException, in units of {@code unit} 2528 * @param unit a {@code TimeUnit} determining how to interpret the 2529 * {@code timeout} parameter 2530 * @return this CompletableFuture 2531 * @since 1.9 2532 */ 2533 public CompletableFuture<T> orTimeout(long timeout, TimeUnit unit) { 2534 if (unit == null) 2535 throw new NullPointerException(); 2536 if (result == null) 2537 whenComplete(new Canceller(Delayer.delay(new Timeout(this), 2538 timeout, unit))); 2539 return this; 2540 } 2541 2542 /** 2543 * Completes this CompletableFuture with the given value if not 2544 * otherwise completed before the given timeout. 2545 * 2546 * @param value the value to use upon timeout 2547 * @param timeout how long to wait before completing normally 2548 * with the given value, in units of {@code unit} 2549 * @param unit a {@code TimeUnit} determining how to interpret the 2550 * {@code timeout} parameter 2551 * @return this CompletableFuture 2552 * @since 1.9 2553 */ 2554 public CompletableFuture<T> completeOnTimeout(T value, long timeout, 2555 TimeUnit unit) { 2556 if (unit == null) 2557 throw new NullPointerException(); 2558 if (result == null) 2559 whenComplete(new Canceller(Delayer.delay( 2560 new DelayedCompleter<T>(this, value), 2561 timeout, unit))); 2562 return this; 2563 } 2564 2565 /** 2566 * Returns a new Executor that submits a task to the given base 2567 * executor after the given delay (or no delay if non-positive). 2568 * Each delay commences upon invocation of the returned executor's 2569 * {@code execute} method. 2570 * 2571 * @param delay how long to delay, in units of {@code unit} 2572 * @param unit a {@code TimeUnit} determining how to interpret the 2573 * {@code delay} parameter 2574 * @param executor the base executor 2575 * @return the new delayed executor 2576 * @since 1.9 2577 */ 2578 public static Executor delayedExecutor(long delay, TimeUnit unit, 2579 Executor executor) { 2580 if (unit == null || executor == null) 2581 throw new NullPointerException(); 2582 return new DelayedExecutor(delay, unit, executor); 2583 } 2584 2585 /** 2586 * Returns a new Executor that submits a task to the default 2587 * executor after the given delay (or no delay if non-positive). 2588 * Each delay commences upon invocation of the returned executor's 2589 * {@code execute} method. 2590 * 2591 * @param delay how long to delay, in units of {@code unit} 2592 * @param unit a {@code TimeUnit} determining how to interpret the 2593 * {@code delay} parameter 2594 * @return the new delayed executor 2595 * @since 1.9 2596 */ 2597 public static Executor delayedExecutor(long delay, TimeUnit unit) { 2598 if (unit == null) 2599 throw new NullPointerException(); 2600 return new DelayedExecutor(delay, unit, ASYNC_POOL); 2601 } 2602 2603 /** 2604 * Returns a new CompletionStage that is already completed with 2605 * the given value and supports only those methods in 2606 * interface {@link CompletionStage}. 2607 * 2608 * @param value the value 2609 * @param <U> the type of the value 2610 * @return the completed CompletionStage 2611 * @since 1.9 2612 */ 2613 public static <U> CompletionStage<U> completedStage(U value) { 2614 return new MinimalStage<U>((value == null) ? NIL : value); 2615 } 2616 2617 /** 2618 * Returns a new CompletableFuture that is already completed 2619 * exceptionally with the given exception. 2620 * 2621 * @param ex the exception 2622 * @param <U> the type of the value 2623 * @return the exceptionally completed CompletableFuture 2624 * @since 1.9 2625 */ 2626 public static <U> CompletableFuture<U> failedFuture(Throwable ex) { 2627 if (ex == null) throw new NullPointerException(); 2628 return new CompletableFuture<U>(new AltResult(ex)); 2629 } 2630 2631 /** 2632 * Returns a new CompletionStage that is already completed 2633 * exceptionally with the given exception and supports only those 2634 * methods in interface {@link CompletionStage}. 2635 * 2636 * @param ex the exception 2637 * @param <U> the type of the value 2638 * @return the exceptionally completed CompletionStage 2639 * @since 1.9 2640 */ 2641 public static <U> CompletionStage<U> failedStage(Throwable ex) { 2642 if (ex == null) throw new NullPointerException(); 2643 return new MinimalStage<U>(new AltResult(ex)); 2644 } 2645 2646 /** 2647 * Singleton delay scheduler, used only for starting and 2648 * cancelling tasks. 2649 */ 2650 static final class Delayer { 2651 static ScheduledFuture<?> delay(Runnable command, long delay, 2652 TimeUnit unit) { 2653 return delayer.schedule(command, delay, unit); 2654 } 2655 2656 static final class DaemonThreadFactory implements ThreadFactory { 2657 public Thread newThread(Runnable r) { 2658 Thread t = new Thread(r); 2659 t.setDaemon(true); 2660 t.setName("CompletableFutureDelayScheduler"); 2661 return t; 2662 } 2663 } 2664 2665 static final ScheduledThreadPoolExecutor delayer; 2666 static { 2667 (delayer = new ScheduledThreadPoolExecutor( 2668 1, new DaemonThreadFactory())). 2669 setRemoveOnCancelPolicy(true); 2670 } 2671 } 2672 2673 // Little class-ified lambdas to better support monitoring 2674 2675 static final class DelayedExecutor implements Executor { 2676 final long delay; 2677 final TimeUnit unit; 2678 final Executor executor; 2679 DelayedExecutor(long delay, TimeUnit unit, Executor executor) { 2680 this.delay = delay; this.unit = unit; this.executor = executor; 2681 } 2682 public void execute(Runnable r) { 2683 Delayer.delay(new TaskSubmitter(executor, r), delay, unit); 2684 } 2685 } 2686 2687 /** Action to submit user task */ 2688 static final class TaskSubmitter implements Runnable { 2689 final Executor executor; 2690 final Runnable action; 2691 TaskSubmitter(Executor executor, Runnable action) { 2692 this.executor = executor; 2693 this.action = action; 2694 } 2695 public void run() { executor.execute(action); } 2696 } 2697 2698 /** Action to completeExceptionally on timeout */ 2699 static final class Timeout implements Runnable { 2700 final CompletableFuture<?> f; 2701 Timeout(CompletableFuture<?> f) { this.f = f; } 2702 public void run() { 2703 if (f != null && !f.isDone()) 2704 f.completeExceptionally(new TimeoutException()); 2705 } 2706 } 2707 2708 /** Action to complete on timeout */ 2709 static final class DelayedCompleter<U> implements Runnable { 2710 final CompletableFuture<U> f; 2711 final U u; 2712 DelayedCompleter(CompletableFuture<U> f, U u) { this.f = f; this.u = u; } 2713 public void run() { 2714 if (f != null) 2715 f.complete(u); 2716 } 2717 } 2718 2719 /** Action to cancel unneeded timeouts */ 2720 static final class Canceller implements BiConsumer<Object, Throwable> { 2721 final Future<?> f; 2722 Canceller(Future<?> f) { this.f = f; } 2723 public void accept(Object ignore, Throwable ex) { 2724 if (ex == null && f != null && !f.isDone()) 2725 f.cancel(false); 2726 } 2727 } 2728 2729 /** 2730 * A subclass that just throws UOE for most non-CompletionStage methods. 2731 */ 2732 static final class MinimalStage<T> extends CompletableFuture<T> { 2733 MinimalStage() { } 2734 MinimalStage(Object r) { super(r); } 2735 @Override public <U> CompletableFuture<U> newIncompleteFuture() { 2736 return new MinimalStage<U>(); } 2737 @Override public T get() { 2738 throw new UnsupportedOperationException(); } 2739 @Override public T get(long timeout, TimeUnit unit) { 2740 throw new UnsupportedOperationException(); } 2741 @Override public T getNow(T valueIfAbsent) { 2742 throw new UnsupportedOperationException(); } 2743 @Override public T join() { 2744 throw new UnsupportedOperationException(); } 2745 @Override public boolean complete(T value) { 2746 throw new UnsupportedOperationException(); } 2747 @Override public boolean completeExceptionally(Throwable ex) { 2748 throw new UnsupportedOperationException(); } 2749 @Override public boolean cancel(boolean mayInterruptIfRunning) { 2750 throw new UnsupportedOperationException(); } 2751 @Override public void obtrudeValue(T value) { 2752 throw new UnsupportedOperationException(); } 2753 @Override public void obtrudeException(Throwable ex) { 2754 throw new UnsupportedOperationException(); } 2755 @Override public boolean isDone() { 2756 throw new UnsupportedOperationException(); } 2757 @Override public boolean isCancelled() { 2758 throw new UnsupportedOperationException(); } 2759 @Override public boolean isCompletedExceptionally() { 2760 throw new UnsupportedOperationException(); } 2761 @Override public int getNumberOfDependents() { 2762 throw new UnsupportedOperationException(); } 2763 @Override public CompletableFuture<T> completeAsync 2764 (Supplier<? extends T> supplier, Executor executor) { 2765 throw new UnsupportedOperationException(); } 2766 @Override public CompletableFuture<T> completeAsync 2767 (Supplier<? extends T> supplier) { 2768 throw new UnsupportedOperationException(); } 2769 @Override public CompletableFuture<T> orTimeout 2770 (long timeout, TimeUnit unit) { 2771 throw new UnsupportedOperationException(); } 2772 @Override public CompletableFuture<T> completeOnTimeout 2773 (T value, long timeout, TimeUnit unit) { 2774 throw new UnsupportedOperationException(); } 2775 } 2776 2777 // Unsafe mechanics 2778 private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe(); 2779 private static final long RESULT; 2780 private static final long STACK; 2781 private static final long NEXT; 2782 static { 2783 try { 2784 RESULT = U.objectFieldOffset 2785 (CompletableFuture.class.getDeclaredField("result")); 2786 STACK = U.objectFieldOffset 2787 (CompletableFuture.class.getDeclaredField("stack")); 2788 NEXT = U.objectFieldOffset 2789 (Completion.class.getDeclaredField("next")); 2790 } catch (ReflectiveOperationException e) { 2791 throw new Error(e); 2792 } 2793 2794 // Reduce the risk of rare disastrous classloading in first call to 2795 // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773 2796 Class<?> ensureLoaded = LockSupport.class; 2797 } 2798 }