1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent; 37 import java.util.function.Supplier; 38 import java.util.function.Consumer; 39 import java.util.function.BiConsumer; 40 import java.util.function.Function; 41 import java.util.function.BiFunction; 42 import java.util.concurrent.Future; 43 import java.util.concurrent.TimeUnit; 44 import java.util.concurrent.ForkJoinPool; 45 import java.util.concurrent.ForkJoinTask; 46 import java.util.concurrent.Executor; 47 import java.util.concurrent.ThreadLocalRandom; 48 import java.util.concurrent.ExecutionException; 49 import java.util.concurrent.TimeoutException; 50 import java.util.concurrent.CancellationException; 51 import java.util.concurrent.atomic.AtomicInteger; 52 import java.util.concurrent.locks.LockSupport; 53 54 /** 55 * A {@link Future} that may be explicitly completed (setting its 56 * value and status), and may include dependent functions and actions 57 * that trigger upon its completion. 58 * 59 * <p>When two or more threads attempt to 60 * {@link #complete complete}, 61 * {@link #completeExceptionally completeExceptionally}, or 62 * {@link #cancel cancel} 63 * a CompletableFuture, only one of them succeeds. 64 * 65 * <p>Methods are available for adding dependents based on Functions, 66 * Consumers, and Runnables. The appropriate form to use depends on 67 * whether actions require arguments and/or produce results. Actions 68 * may also be triggered after either or both the current and another 69 * CompletableFuture complete. Multiple CompletableFutures may also 70 * be grouped as one using {@link #anyOf(CompletableFuture...)} and 71 * {@link #allOf(CompletableFuture...)}. 72 * 73 * <p>Actions supplied for dependent completions (mainly using methods 74 * with prefix {@code then}) may be performed by the thread that 75 * completes the current CompletableFuture, or by any other caller of 76 * these methods. There are no guarantees about the order of 77 * processing completions unless constrained by these methods. 78 * 79 * <p>Since (unlike {@link FutureTask}) this class has no direct 80 * control over the computation that causes it to be completed, 81 * cancellation is treated as just another form of exceptional completion. 82 * Method {@link #cancel cancel} has the same effect as 83 * {@code completeExceptionally(new CancellationException())}. 84 * 85 * <p>Upon exceptional completion (including cancellation), or when a 86 * completion entails an additional computation which terminates 87 * abruptly with an (unchecked) exception or error, then all of their 88 * dependent completions (and their dependents in turn) generally act 89 * as {@code completeExceptionally} with a {@link CompletionException} 90 * holding that exception as its cause. However, the {@link 91 * #exceptionally exceptionally} and {@link #handle handle} 92 * completions <em>are</em> able to handle exceptional completions of 93 * the CompletableFutures they depend on. 94 * 95 * <p>In case of exceptional completion with a CompletionException, 96 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an 97 * {@link ExecutionException} with the same cause as held in the 98 * corresponding CompletionException. However, in these cases, 99 * methods {@link #join()} and {@link #getNow} throw the 100 * CompletionException, which simplifies usage. 101 * 102 * <p>CompletableFutures themselves do not execute asynchronously. 103 * However, the {@code async} methods provide commonly useful ways to 104 * commence asynchronous processing, using either a given {@link 105 * Executor} or by default the {@link ForkJoinPool#commonPool()}, of a 106 * function or action that will result in the completion of a new 107 * CompletableFuture. To simplify monitoring, debugging, and tracking, 108 * all generated asynchronous tasks are instances of the tagging 109 * interface {@link AsynchronousCompletionTask}. 110 * 111 * @author Doug Lea 112 * @since 1.8 113 */ 114 public class CompletableFuture<T> implements Future<T> { 115 116 /* 117 * Overview: 118 * 119 * 1. Non-nullness of field result (set via CAS) indicates done. 120 * An AltResult is used to box null as a result, as well as to 121 * hold exceptions. Using a single field makes completion fast 122 * and simple to detect and trigger, at the expense of a lot of 123 * encoding and decoding that infiltrates many methods. One minor 124 * simplification relies on the (static) NIL (to box null results) 125 * being the only AltResult with a null exception field, so we 126 * don't usually need explicit comparisons with NIL. The CF 127 * exception propagation mechanics surrounding decoding rely on 128 * unchecked casts of decoded results really being unchecked, 129 * where user type errors are caught at point of use, as is 130 * currently the case in Java. These are highlighted by using 131 * SuppressWarnings-annotated temporaries. 132 * 133 * 2. Waiters are held in a Treiber stack similar to the one used 134 * in FutureTask, Phaser, and SynchronousQueue. See their 135 * internal documentation for algorithmic details. 136 * 137 * 3. Completions are also kept in a list/stack, and pulled off 138 * and run when completion is triggered. (We could even use the 139 * same stack as for waiters, but would give up the potential 140 * parallelism obtained because woken waiters help release/run 141 * others -- see method postComplete). Because post-processing 142 * may race with direct calls, class Completion opportunistically 143 * extends AtomicInteger so callers can claim the action via 144 * compareAndSet(0, 1). The Completion.run methods are all 145 * written a boringly similar uniform way (that sometimes includes 146 * unnecessary-looking checks, kept to maintain uniformity). 147 * There are enough dimensions upon which they differ that 148 * attempts to factor commonalities while maintaining efficiency 149 * require more lines of code than they would save. 150 * 151 * 4. The exported then/and/or methods do support a bit of 152 * factoring (see doThenApply etc). They must cope with the 153 * intrinsic races surrounding addition of a dependent action 154 * versus performing the action directly because the task is 155 * already complete. For example, a CF may not be complete upon 156 * entry, so a dependent completion is added, but by the time it 157 * is added, the target CF is complete, so must be directly 158 * executed. This is all done while avoiding unnecessary object 159 * construction in safe-bypass cases. 160 */ 161 162 // preliminaries 163 164 static final class AltResult { 165 final Throwable ex; // null only for NIL 166 AltResult(Throwable ex) { this.ex = ex; } 167 } 168 169 static final AltResult NIL = new AltResult(null); 170 171 // Fields 172 173 volatile Object result; // Either the result or boxed AltResult 174 volatile WaitNode waiters; // Treiber stack of threads blocked on get() 175 volatile CompletionNode completions; // list (Treiber stack) of completions 176 177 // Basic utilities for triggering and processing completions 178 179 /** 180 * Removes and signals all waiting threads and runs all completions. 181 */ 182 final void postComplete() { 183 WaitNode q; Thread t; 184 while ((q = waiters) != null) { 185 if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) && 186 (t = q.thread) != null) { 187 q.thread = null; 188 LockSupport.unpark(t); 189 } 190 } 191 192 CompletionNode h; Completion c; 193 while ((h = completions) != null) { 194 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) && 195 (c = h.completion) != null) 196 c.run(); 197 } 198 } 199 200 /** 201 * Triggers completion with the encoding of the given arguments: 202 * if the exception is non-null, encodes it as a wrapped 203 * CompletionException unless it is one already. Otherwise uses 204 * the given result, boxed as NIL if null. 205 */ 206 final void internalComplete(Object v, Throwable ex) { 207 if (result == null) 208 UNSAFE.compareAndSwapObject 209 (this, RESULT, null, 210 (ex == null) ? (v == null) ? NIL : v : 211 new AltResult((ex instanceof CompletionException) ? ex : 212 new CompletionException(ex))); 213 postComplete(); // help out even if not triggered 214 } 215 216 /** 217 * If triggered, helps release and/or process completions. 218 */ 219 final void helpPostComplete() { 220 if (result != null) 221 postComplete(); 222 } 223 224 /* ------------- waiting for completions -------------- */ 225 226 /** Number of processors, for spin control */ 227 static final int NCPU = Runtime.getRuntime().availableProcessors(); 228 229 /** 230 * Heuristic spin value for waitingGet() before blocking on 231 * multiprocessors 232 */ 233 static final int SPINS = (NCPU > 1) ? 1 << 8 : 0; 234 235 /** 236 * Linked nodes to record waiting threads in a Treiber stack. See 237 * other classes such as Phaser and SynchronousQueue for more 238 * detailed explanation. This class implements ManagedBlocker to 239 * avoid starvation when blocking actions pile up in 240 * ForkJoinPools. 241 */ 242 static final class WaitNode implements ForkJoinPool.ManagedBlocker { 243 long nanos; // wait time if timed 244 final long deadline; // non-zero if timed 245 volatile int interruptControl; // > 0: interruptible, < 0: interrupted 246 volatile Thread thread; 247 volatile WaitNode next; 248 WaitNode(boolean interruptible, long nanos, long deadline) { 249 this.thread = Thread.currentThread(); 250 this.interruptControl = interruptible ? 1 : 0; 251 this.nanos = nanos; 252 this.deadline = deadline; 253 } 254 public boolean isReleasable() { 255 if (thread == null) 256 return true; 257 if (Thread.interrupted()) { 258 int i = interruptControl; 259 interruptControl = -1; 260 if (i > 0) 261 return true; 262 } 263 if (deadline != 0L && 264 (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) { 265 thread = null; 266 return true; 267 } 268 return false; 269 } 270 public boolean block() { 271 if (isReleasable()) 272 return true; 273 else if (deadline == 0L) 274 LockSupport.park(this); 275 else if (nanos > 0L) 276 LockSupport.parkNanos(this, nanos); 277 return isReleasable(); 278 } 279 } 280 281 /** 282 * Returns raw result after waiting, or null if interruptible and 283 * interrupted. 284 */ 285 private Object waitingGet(boolean interruptible) { 286 WaitNode q = null; 287 boolean queued = false; 288 int spins = SPINS; 289 for (Object r;;) { 290 if ((r = result) != null) { 291 if (q != null) { // suppress unpark 292 q.thread = null; 293 if (q.interruptControl < 0) { 294 if (interruptible) { 295 removeWaiter(q); 296 return null; 297 } 298 Thread.currentThread().interrupt(); 299 } 300 } 301 postComplete(); // help release others 302 return r; 303 } 304 else if (spins > 0) { 305 int rnd = ThreadLocalRandom.nextSecondarySeed(); 306 if (rnd == 0) 307 rnd = ThreadLocalRandom.current().nextInt(); 308 if (rnd >= 0) 309 --spins; 310 } 311 else if (q == null) 312 q = new WaitNode(interruptible, 0L, 0L); 313 else if (!queued) 314 queued = UNSAFE.compareAndSwapObject(this, WAITERS, 315 q.next = waiters, q); 316 else if (interruptible && q.interruptControl < 0) { 317 removeWaiter(q); 318 return null; 319 } 320 else if (q.thread != null && result == null) { 321 try { 322 ForkJoinPool.managedBlock(q); 323 } catch (InterruptedException ex) { 324 q.interruptControl = -1; 325 } 326 } 327 } 328 } 329 330 /** 331 * Awaits completion or aborts on interrupt or timeout. 332 * 333 * @param nanos time to wait 334 * @return raw result 335 */ 336 private Object timedAwaitDone(long nanos) 337 throws InterruptedException, TimeoutException { 338 WaitNode q = null; 339 boolean queued = false; 340 for (Object r;;) { 341 if ((r = result) != null) { 342 if (q != null) { 343 q.thread = null; 344 if (q.interruptControl < 0) { 345 removeWaiter(q); 346 throw new InterruptedException(); 347 } 348 } 349 postComplete(); 350 return r; 351 } 352 else if (q == null) { 353 if (nanos <= 0L) 354 throw new TimeoutException(); 355 long d = System.nanoTime() + nanos; 356 q = new WaitNode(true, nanos, d == 0L ? 1L : d); // avoid 0 357 } 358 else if (!queued) 359 queued = UNSAFE.compareAndSwapObject(this, WAITERS, 360 q.next = waiters, q); 361 else if (q.interruptControl < 0) { 362 removeWaiter(q); 363 throw new InterruptedException(); 364 } 365 else if (q.nanos <= 0L) { 366 if (result == null) { 367 removeWaiter(q); 368 throw new TimeoutException(); 369 } 370 } 371 else if (q.thread != null && result == null) { 372 try { 373 ForkJoinPool.managedBlock(q); 374 } catch (InterruptedException ex) { 375 q.interruptControl = -1; 376 } 377 } 378 } 379 } 380 381 /** 382 * Tries to unlink a timed-out or interrupted wait node to avoid 383 * accumulating garbage. Internal nodes are simply unspliced 384 * without CAS since it is harmless if they are traversed anyway 385 * by releasers. To avoid effects of unsplicing from already 386 * removed nodes, the list is retraversed in case of an apparent 387 * race. This is slow when there are a lot of nodes, but we don't 388 * expect lists to be long enough to outweigh higher-overhead 389 * schemes. 390 */ 391 private void removeWaiter(WaitNode node) { 392 if (node != null) { 393 node.thread = null; 394 retry: 395 for (;;) { // restart on removeWaiter race 396 for (WaitNode pred = null, q = waiters, s; q != null; q = s) { 397 s = q.next; 398 if (q.thread != null) 399 pred = q; 400 else if (pred != null) { 401 pred.next = s; 402 if (pred.thread == null) // check for race 403 continue retry; 404 } 405 else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s)) 406 continue retry; 407 } 408 break; 409 } 410 } 411 } 412 413 /* ------------- Async tasks -------------- */ 414 415 /** 416 * A tagging interface identifying asynchronous tasks produced by 417 * {@code async} methods. This may be useful for monitoring, 418 * debugging, and tracking asynchronous activities. 419 */ 420 public static interface AsynchronousCompletionTask { 421 } 422 423 /** Base class can act as either FJ or plain Runnable */ 424 abstract static class Async extends ForkJoinTask<Void> 425 implements Runnable, AsynchronousCompletionTask { 426 public final Void getRawResult() { return null; } 427 public final void setRawResult(Void v) { } 428 public final void run() { exec(); } 429 } 430 431 static final class AsyncRun extends Async { 432 final Runnable fn; 433 final CompletableFuture<Void> dst; 434 AsyncRun(Runnable fn, CompletableFuture<Void> dst) { 435 this.fn = fn; this.dst = dst; 436 } 437 public final boolean exec() { 438 CompletableFuture<Void> d; Throwable ex; 439 if ((d = this.dst) != null && d.result == null) { 440 try { 441 fn.run(); 442 ex = null; 443 } catch (Throwable rex) { 444 ex = rex; 445 } 446 d.internalComplete(null, ex); 447 } 448 return true; 449 } 450 private static final long serialVersionUID = 5232453952276885070L; 451 } 452 453 static final class AsyncSupply<U> extends Async { 454 final Supplier<U> fn; 455 final CompletableFuture<U> dst; 456 AsyncSupply(Supplier<U> fn, CompletableFuture<U> dst) { 457 this.fn = fn; this.dst = dst; 458 } 459 public final boolean exec() { 460 CompletableFuture<U> d; U u; Throwable ex; 461 if ((d = this.dst) != null && d.result == null) { 462 try { 463 u = fn.get(); 464 ex = null; 465 } catch (Throwable rex) { 466 ex = rex; 467 u = null; 468 } 469 d.internalComplete(u, ex); 470 } 471 return true; 472 } 473 private static final long serialVersionUID = 5232453952276885070L; 474 } 475 476 static final class AsyncApply<T,U> extends Async { 477 final Function<? super T,? extends U> fn; 478 final T arg; 479 final CompletableFuture<U> dst; 480 AsyncApply(T arg, Function<? super T,? extends U> fn, 481 CompletableFuture<U> dst) { 482 this.arg = arg; this.fn = fn; this.dst = dst; 483 } 484 public final boolean exec() { 485 CompletableFuture<U> d; U u; Throwable ex; 486 if ((d = this.dst) != null && d.result == null) { 487 try { 488 u = fn.apply(arg); 489 ex = null; 490 } catch (Throwable rex) { 491 ex = rex; 492 u = null; 493 } 494 d.internalComplete(u, ex); 495 } 496 return true; 497 } 498 private static final long serialVersionUID = 5232453952276885070L; 499 } 500 501 static final class AsyncBiApply<T,U,V> extends Async { 502 final BiFunction<? super T,? super U,? extends V> fn; 503 final T arg1; 504 final U arg2; 505 final CompletableFuture<V> dst; 506 AsyncBiApply(T arg1, U arg2, 507 BiFunction<? super T,? super U,? extends V> fn, 508 CompletableFuture<V> dst) { 509 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst; 510 } 511 public final boolean exec() { 512 CompletableFuture<V> d; V v; Throwable ex; 513 if ((d = this.dst) != null && d.result == null) { 514 try { 515 v = fn.apply(arg1, arg2); 516 ex = null; 517 } catch (Throwable rex) { 518 ex = rex; 519 v = null; 520 } 521 d.internalComplete(v, ex); 522 } 523 return true; 524 } 525 private static final long serialVersionUID = 5232453952276885070L; 526 } 527 528 static final class AsyncAccept<T> extends Async { 529 final Consumer<? super T> fn; 530 final T arg; 531 final CompletableFuture<Void> dst; 532 AsyncAccept(T arg, Consumer<? super T> fn, 533 CompletableFuture<Void> dst) { 534 this.arg = arg; this.fn = fn; this.dst = dst; 535 } 536 public final boolean exec() { 537 CompletableFuture<Void> d; Throwable ex; 538 if ((d = this.dst) != null && d.result == null) { 539 try { 540 fn.accept(arg); 541 ex = null; 542 } catch (Throwable rex) { 543 ex = rex; 544 } 545 d.internalComplete(null, ex); 546 } 547 return true; 548 } 549 private static final long serialVersionUID = 5232453952276885070L; 550 } 551 552 static final class AsyncBiAccept<T,U> extends Async { 553 final BiConsumer<? super T,? super U> fn; 554 final T arg1; 555 final U arg2; 556 final CompletableFuture<Void> dst; 557 AsyncBiAccept(T arg1, U arg2, 558 BiConsumer<? super T,? super U> fn, 559 CompletableFuture<Void> dst) { 560 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst; 561 } 562 public final boolean exec() { 563 CompletableFuture<Void> d; Throwable ex; 564 if ((d = this.dst) != null && d.result == null) { 565 try { 566 fn.accept(arg1, arg2); 567 ex = null; 568 } catch (Throwable rex) { 569 ex = rex; 570 } 571 d.internalComplete(null, ex); 572 } 573 return true; 574 } 575 private static final long serialVersionUID = 5232453952276885070L; 576 } 577 578 static final class AsyncCompose<T,U> extends Async { 579 final Function<? super T, CompletableFuture<U>> fn; 580 final T arg; 581 final CompletableFuture<U> dst; 582 AsyncCompose(T arg, 583 Function<? super T, CompletableFuture<U>> fn, 584 CompletableFuture<U> dst) { 585 this.arg = arg; this.fn = fn; this.dst = dst; 586 } 587 public final boolean exec() { 588 CompletableFuture<U> d, fr; U u; Throwable ex; 589 if ((d = this.dst) != null && d.result == null) { 590 try { 591 fr = fn.apply(arg); 592 ex = null; 593 } catch (Throwable rex) { 594 ex = rex; 595 fr = null; 596 } 597 if (ex != null) 598 u = null; 599 else if (fr == null) { 600 ex = new NullPointerException(); 601 u = null; 602 } 603 else { 604 Object r = fr.result; 605 if (r instanceof AltResult) { 606 ex = ((AltResult)r).ex; 607 u = null; 608 } 609 else { 610 @SuppressWarnings("unchecked") U ur = (U) r; 611 u = ur; 612 } 613 } 614 d.internalComplete(u, ex); 615 } 616 return true; 617 } 618 private static final long serialVersionUID = 5232453952276885070L; 619 } 620 621 /* ------------- Completions -------------- */ 622 623 /** 624 * Simple linked list nodes to record completions, used in 625 * basically the same way as WaitNodes. (We separate nodes from 626 * the Completions themselves mainly because for the And and Or 627 * methods, the same Completion object resides in two lists.) 628 */ 629 static final class CompletionNode { 630 final Completion completion; 631 volatile CompletionNode next; 632 CompletionNode(Completion completion) { this.completion = completion; } 633 } 634 635 // Opportunistically subclass AtomicInteger to use compareAndSet to claim. 636 abstract static class Completion extends AtomicInteger implements Runnable { 637 } 638 639 static final class ApplyCompletion<T,U> extends Completion { 640 final CompletableFuture<? extends T> src; 641 final Function<? super T,? extends U> fn; 642 final CompletableFuture<U> dst; 643 final Executor executor; 644 ApplyCompletion(CompletableFuture<? extends T> src, 645 Function<? super T,? extends U> fn, 646 CompletableFuture<U> dst, Executor executor) { 647 this.src = src; this.fn = fn; this.dst = dst; 648 this.executor = executor; 649 } 650 public final void run() { 651 final CompletableFuture<? extends T> a; 652 final Function<? super T,? extends U> fn; 653 final CompletableFuture<U> dst; 654 Object r; T t; Throwable ex; 655 if ((dst = this.dst) != null && 656 (fn = this.fn) != null && 657 (a = this.src) != null && 658 (r = a.result) != null && 659 compareAndSet(0, 1)) { 660 if (r instanceof AltResult) { 661 ex = ((AltResult)r).ex; 662 t = null; 663 } 664 else { 665 ex = null; 666 @SuppressWarnings("unchecked") T tr = (T) r; 667 t = tr; 668 } 669 Executor e = executor; 670 U u = null; 671 if (ex == null) { 672 try { 673 if (e != null) 674 e.execute(new AsyncApply<T,U>(t, fn, dst)); 675 else 676 u = fn.apply(t); 677 } catch (Throwable rex) { 678 ex = rex; 679 } 680 } 681 if (e == null || ex != null) 682 dst.internalComplete(u, ex); 683 } 684 } 685 private static final long serialVersionUID = 5232453952276885070L; 686 } 687 688 static final class AcceptCompletion<T> extends Completion { 689 final CompletableFuture<? extends T> src; 690 final Consumer<? super T> fn; 691 final CompletableFuture<Void> dst; 692 final Executor executor; 693 AcceptCompletion(CompletableFuture<? extends T> src, 694 Consumer<? super T> fn, 695 CompletableFuture<Void> dst, Executor executor) { 696 this.src = src; this.fn = fn; this.dst = dst; 697 this.executor = executor; 698 } 699 public final void run() { 700 final CompletableFuture<? extends T> a; 701 final Consumer<? super T> fn; 702 final CompletableFuture<Void> dst; 703 Object r; T t; Throwable ex; 704 if ((dst = this.dst) != null && 705 (fn = this.fn) != null && 706 (a = this.src) != null && 707 (r = a.result) != null && 708 compareAndSet(0, 1)) { 709 if (r instanceof AltResult) { 710 ex = ((AltResult)r).ex; 711 t = null; 712 } 713 else { 714 ex = null; 715 @SuppressWarnings("unchecked") T tr = (T) r; 716 t = tr; 717 } 718 Executor e = executor; 719 if (ex == null) { 720 try { 721 if (e != null) 722 e.execute(new AsyncAccept<T>(t, fn, dst)); 723 else 724 fn.accept(t); 725 } catch (Throwable rex) { 726 ex = rex; 727 } 728 } 729 if (e == null || ex != null) 730 dst.internalComplete(null, ex); 731 } 732 } 733 private static final long serialVersionUID = 5232453952276885070L; 734 } 735 736 static final class RunCompletion<T> extends Completion { 737 final CompletableFuture<? extends T> src; 738 final Runnable fn; 739 final CompletableFuture<Void> dst; 740 final Executor executor; 741 RunCompletion(CompletableFuture<? extends T> src, 742 Runnable fn, 743 CompletableFuture<Void> dst, 744 Executor executor) { 745 this.src = src; this.fn = fn; this.dst = dst; 746 this.executor = executor; 747 } 748 public final void run() { 749 final CompletableFuture<? extends T> a; 750 final Runnable fn; 751 final CompletableFuture<Void> dst; 752 Object r; Throwable ex; 753 if ((dst = this.dst) != null && 754 (fn = this.fn) != null && 755 (a = this.src) != null && 756 (r = a.result) != null && 757 compareAndSet(0, 1)) { 758 if (r instanceof AltResult) 759 ex = ((AltResult)r).ex; 760 else 761 ex = null; 762 Executor e = executor; 763 if (ex == null) { 764 try { 765 if (e != null) 766 e.execute(new AsyncRun(fn, dst)); 767 else 768 fn.run(); 769 } catch (Throwable rex) { 770 ex = rex; 771 } 772 } 773 if (e == null || ex != null) 774 dst.internalComplete(null, ex); 775 } 776 } 777 private static final long serialVersionUID = 5232453952276885070L; 778 } 779 780 static final class BiApplyCompletion<T,U,V> extends Completion { 781 final CompletableFuture<? extends T> src; 782 final CompletableFuture<? extends U> snd; 783 final BiFunction<? super T,? super U,? extends V> fn; 784 final CompletableFuture<V> dst; 785 final Executor executor; 786 BiApplyCompletion(CompletableFuture<? extends T> src, 787 CompletableFuture<? extends U> snd, 788 BiFunction<? super T,? super U,? extends V> fn, 789 CompletableFuture<V> dst, Executor executor) { 790 this.src = src; this.snd = snd; 791 this.fn = fn; this.dst = dst; 792 this.executor = executor; 793 } 794 public final void run() { 795 final CompletableFuture<? extends T> a; 796 final CompletableFuture<? extends U> b; 797 final BiFunction<? super T,? super U,? extends V> fn; 798 final CompletableFuture<V> dst; 799 Object r, s; T t; U u; Throwable ex; 800 if ((dst = this.dst) != null && 801 (fn = this.fn) != null && 802 (a = this.src) != null && 803 (r = a.result) != null && 804 (b = this.snd) != null && 805 (s = b.result) != null && 806 compareAndSet(0, 1)) { 807 if (r instanceof AltResult) { 808 ex = ((AltResult)r).ex; 809 t = null; 810 } 811 else { 812 ex = null; 813 @SuppressWarnings("unchecked") T tr = (T) r; 814 t = tr; 815 } 816 if (ex != null) 817 u = null; 818 else if (s instanceof AltResult) { 819 ex = ((AltResult)s).ex; 820 u = null; 821 } 822 else { 823 @SuppressWarnings("unchecked") U us = (U) s; 824 u = us; 825 } 826 Executor e = executor; 827 V v = null; 828 if (ex == null) { 829 try { 830 if (e != null) 831 e.execute(new AsyncBiApply<T,U,V>(t, u, fn, dst)); 832 else 833 v = fn.apply(t, u); 834 } catch (Throwable rex) { 835 ex = rex; 836 } 837 } 838 if (e == null || ex != null) 839 dst.internalComplete(v, ex); 840 } 841 } 842 private static final long serialVersionUID = 5232453952276885070L; 843 } 844 845 static final class BiAcceptCompletion<T,U> extends Completion { 846 final CompletableFuture<? extends T> src; 847 final CompletableFuture<? extends U> snd; 848 final BiConsumer<? super T,? super U> fn; 849 final CompletableFuture<Void> dst; 850 final Executor executor; 851 BiAcceptCompletion(CompletableFuture<? extends T> src, 852 CompletableFuture<? extends U> snd, 853 BiConsumer<? super T,? super U> fn, 854 CompletableFuture<Void> dst, Executor executor) { 855 this.src = src; this.snd = snd; 856 this.fn = fn; this.dst = dst; 857 this.executor = executor; 858 } 859 public final void run() { 860 final CompletableFuture<? extends T> a; 861 final CompletableFuture<? extends U> b; 862 final BiConsumer<? super T,? super U> fn; 863 final CompletableFuture<Void> dst; 864 Object r, s; T t; U u; Throwable ex; 865 if ((dst = this.dst) != null && 866 (fn = this.fn) != null && 867 (a = this.src) != null && 868 (r = a.result) != null && 869 (b = this.snd) != null && 870 (s = b.result) != null && 871 compareAndSet(0, 1)) { 872 if (r instanceof AltResult) { 873 ex = ((AltResult)r).ex; 874 t = null; 875 } 876 else { 877 ex = null; 878 @SuppressWarnings("unchecked") T tr = (T) r; 879 t = tr; 880 } 881 if (ex != null) 882 u = null; 883 else if (s instanceof AltResult) { 884 ex = ((AltResult)s).ex; 885 u = null; 886 } 887 else { 888 @SuppressWarnings("unchecked") U us = (U) s; 889 u = us; 890 } 891 Executor e = executor; 892 if (ex == null) { 893 try { 894 if (e != null) 895 e.execute(new AsyncBiAccept<T,U>(t, u, fn, dst)); 896 else 897 fn.accept(t, u); 898 } catch (Throwable rex) { 899 ex = rex; 900 } 901 } 902 if (e == null || ex != null) 903 dst.internalComplete(null, ex); 904 } 905 } 906 private static final long serialVersionUID = 5232453952276885070L; 907 } 908 909 static final class BiRunCompletion<T> extends Completion { 910 final CompletableFuture<? extends T> src; 911 final CompletableFuture<?> snd; 912 final Runnable fn; 913 final CompletableFuture<Void> dst; 914 final Executor executor; 915 BiRunCompletion(CompletableFuture<? extends T> src, 916 CompletableFuture<?> snd, 917 Runnable fn, 918 CompletableFuture<Void> dst, Executor executor) { 919 this.src = src; this.snd = snd; 920 this.fn = fn; this.dst = dst; 921 this.executor = executor; 922 } 923 public final void run() { 924 final CompletableFuture<? extends T> a; 925 final CompletableFuture<?> b; 926 final Runnable fn; 927 final CompletableFuture<Void> dst; 928 Object r, s; Throwable ex; 929 if ((dst = this.dst) != null && 930 (fn = this.fn) != null && 931 (a = this.src) != null && 932 (r = a.result) != null && 933 (b = this.snd) != null && 934 (s = b.result) != null && 935 compareAndSet(0, 1)) { 936 if (r instanceof AltResult) 937 ex = ((AltResult)r).ex; 938 else 939 ex = null; 940 if (ex == null && (s instanceof AltResult)) 941 ex = ((AltResult)s).ex; 942 Executor e = executor; 943 if (ex == null) { 944 try { 945 if (e != null) 946 e.execute(new AsyncRun(fn, dst)); 947 else 948 fn.run(); 949 } catch (Throwable rex) { 950 ex = rex; 951 } 952 } 953 if (e == null || ex != null) 954 dst.internalComplete(null, ex); 955 } 956 } 957 private static final long serialVersionUID = 5232453952276885070L; 958 } 959 960 static final class AndCompletion extends Completion { 961 final CompletableFuture<?> src; 962 final CompletableFuture<?> snd; 963 final CompletableFuture<Void> dst; 964 AndCompletion(CompletableFuture<?> src, 965 CompletableFuture<?> snd, 966 CompletableFuture<Void> dst) { 967 this.src = src; this.snd = snd; this.dst = dst; 968 } 969 public final void run() { 970 final CompletableFuture<?> a; 971 final CompletableFuture<?> b; 972 final CompletableFuture<Void> dst; 973 Object r, s; Throwable ex; 974 if ((dst = this.dst) != null && 975 (a = this.src) != null && 976 (r = a.result) != null && 977 (b = this.snd) != null && 978 (s = b.result) != null && 979 compareAndSet(0, 1)) { 980 if (r instanceof AltResult) 981 ex = ((AltResult)r).ex; 982 else 983 ex = null; 984 if (ex == null && (s instanceof AltResult)) 985 ex = ((AltResult)s).ex; 986 dst.internalComplete(null, ex); 987 } 988 } 989 private static final long serialVersionUID = 5232453952276885070L; 990 } 991 992 static final class OrApplyCompletion<T,U> extends Completion { 993 final CompletableFuture<? extends T> src; 994 final CompletableFuture<? extends T> snd; 995 final Function<? super T,? extends U> fn; 996 final CompletableFuture<U> dst; 997 final Executor executor; 998 OrApplyCompletion(CompletableFuture<? extends T> src, 999 CompletableFuture<? extends T> snd, 1000 Function<? super T,? extends U> fn, 1001 CompletableFuture<U> dst, Executor executor) { 1002 this.src = src; this.snd = snd; 1003 this.fn = fn; this.dst = dst; 1004 this.executor = executor; 1005 } 1006 public final void run() { 1007 final CompletableFuture<? extends T> a; 1008 final CompletableFuture<? extends T> b; 1009 final Function<? super T,? extends U> fn; 1010 final CompletableFuture<U> dst; 1011 Object r; T t; Throwable ex; 1012 if ((dst = this.dst) != null && 1013 (fn = this.fn) != null && 1014 (((a = this.src) != null && (r = a.result) != null) || 1015 ((b = this.snd) != null && (r = b.result) != null)) && 1016 compareAndSet(0, 1)) { 1017 if (r instanceof AltResult) { 1018 ex = ((AltResult)r).ex; 1019 t = null; 1020 } 1021 else { 1022 ex = null; 1023 @SuppressWarnings("unchecked") T tr = (T) r; 1024 t = tr; 1025 } 1026 Executor e = executor; 1027 U u = null; 1028 if (ex == null) { 1029 try { 1030 if (e != null) 1031 e.execute(new AsyncApply<T,U>(t, fn, dst)); 1032 else 1033 u = fn.apply(t); 1034 } catch (Throwable rex) { 1035 ex = rex; 1036 } 1037 } 1038 if (e == null || ex != null) 1039 dst.internalComplete(u, ex); 1040 } 1041 } 1042 private static final long serialVersionUID = 5232453952276885070L; 1043 } 1044 1045 static final class OrAcceptCompletion<T> extends Completion { 1046 final CompletableFuture<? extends T> src; 1047 final CompletableFuture<? extends T> snd; 1048 final Consumer<? super T> fn; 1049 final CompletableFuture<Void> dst; 1050 final Executor executor; 1051 OrAcceptCompletion(CompletableFuture<? extends T> src, 1052 CompletableFuture<? extends T> snd, 1053 Consumer<? super T> fn, 1054 CompletableFuture<Void> dst, Executor executor) { 1055 this.src = src; this.snd = snd; 1056 this.fn = fn; this.dst = dst; 1057 this.executor = executor; 1058 } 1059 public final void run() { 1060 final CompletableFuture<? extends T> a; 1061 final CompletableFuture<? extends T> b; 1062 final Consumer<? super T> fn; 1063 final CompletableFuture<Void> dst; 1064 Object r; T t; Throwable ex; 1065 if ((dst = this.dst) != null && 1066 (fn = this.fn) != null && 1067 (((a = this.src) != null && (r = a.result) != null) || 1068 ((b = this.snd) != null && (r = b.result) != null)) && 1069 compareAndSet(0, 1)) { 1070 if (r instanceof AltResult) { 1071 ex = ((AltResult)r).ex; 1072 t = null; 1073 } 1074 else { 1075 ex = null; 1076 @SuppressWarnings("unchecked") T tr = (T) r; 1077 t = tr; 1078 } 1079 Executor e = executor; 1080 if (ex == null) { 1081 try { 1082 if (e != null) 1083 e.execute(new AsyncAccept<T>(t, fn, dst)); 1084 else 1085 fn.accept(t); 1086 } catch (Throwable rex) { 1087 ex = rex; 1088 } 1089 } 1090 if (e == null || ex != null) 1091 dst.internalComplete(null, ex); 1092 } 1093 } 1094 private static final long serialVersionUID = 5232453952276885070L; 1095 } 1096 1097 static final class OrRunCompletion<T> extends Completion { 1098 final CompletableFuture<? extends T> src; 1099 final CompletableFuture<?> snd; 1100 final Runnable fn; 1101 final CompletableFuture<Void> dst; 1102 final Executor executor; 1103 OrRunCompletion(CompletableFuture<? extends T> src, 1104 CompletableFuture<?> snd, 1105 Runnable fn, 1106 CompletableFuture<Void> dst, Executor executor) { 1107 this.src = src; this.snd = snd; 1108 this.fn = fn; this.dst = dst; 1109 this.executor = executor; 1110 } 1111 public final void run() { 1112 final CompletableFuture<? extends T> a; 1113 final CompletableFuture<?> b; 1114 final Runnable fn; 1115 final CompletableFuture<Void> dst; 1116 Object r; Throwable ex; 1117 if ((dst = this.dst) != null && 1118 (fn = this.fn) != null && 1119 (((a = this.src) != null && (r = a.result) != null) || 1120 ((b = this.snd) != null && (r = b.result) != null)) && 1121 compareAndSet(0, 1)) { 1122 if (r instanceof AltResult) 1123 ex = ((AltResult)r).ex; 1124 else 1125 ex = null; 1126 Executor e = executor; 1127 if (ex == null) { 1128 try { 1129 if (e != null) 1130 e.execute(new AsyncRun(fn, dst)); 1131 else 1132 fn.run(); 1133 } catch (Throwable rex) { 1134 ex = rex; 1135 } 1136 } 1137 if (e == null || ex != null) 1138 dst.internalComplete(null, ex); 1139 } 1140 } 1141 private static final long serialVersionUID = 5232453952276885070L; 1142 } 1143 1144 static final class OrCompletion extends Completion { 1145 final CompletableFuture<?> src; 1146 final CompletableFuture<?> snd; 1147 final CompletableFuture<?> dst; 1148 OrCompletion(CompletableFuture<?> src, 1149 CompletableFuture<?> snd, 1150 CompletableFuture<?> dst) { 1151 this.src = src; this.snd = snd; this.dst = dst; 1152 } 1153 public final void run() { 1154 final CompletableFuture<?> a; 1155 final CompletableFuture<?> b; 1156 final CompletableFuture<?> dst; 1157 Object r, t; Throwable ex; 1158 if ((dst = this.dst) != null && 1159 (((a = this.src) != null && (r = a.result) != null) || 1160 ((b = this.snd) != null && (r = b.result) != null)) && 1161 compareAndSet(0, 1)) { 1162 if (r instanceof AltResult) { 1163 ex = ((AltResult)r).ex; 1164 t = null; 1165 } 1166 else { 1167 ex = null; 1168 t = r; 1169 } 1170 dst.internalComplete(t, ex); 1171 } 1172 } 1173 private static final long serialVersionUID = 5232453952276885070L; 1174 } 1175 1176 static final class ExceptionCompletion<T> extends Completion { 1177 final CompletableFuture<? extends T> src; 1178 final Function<? super Throwable, ? extends T> fn; 1179 final CompletableFuture<T> dst; 1180 ExceptionCompletion(CompletableFuture<? extends T> src, 1181 Function<? super Throwable, ? extends T> fn, 1182 CompletableFuture<T> dst) { 1183 this.src = src; this.fn = fn; this.dst = dst; 1184 } 1185 public final void run() { 1186 final CompletableFuture<? extends T> a; 1187 final Function<? super Throwable, ? extends T> fn; 1188 final CompletableFuture<T> dst; 1189 Object r; T t = null; Throwable ex, dx = null; 1190 if ((dst = this.dst) != null && 1191 (fn = this.fn) != null && 1192 (a = this.src) != null && 1193 (r = a.result) != null && 1194 compareAndSet(0, 1)) { 1195 if ((r instanceof AltResult) && 1196 (ex = ((AltResult)r).ex) != null) { 1197 try { 1198 t = fn.apply(ex); 1199 } catch (Throwable rex) { 1200 dx = rex; 1201 } 1202 } 1203 else { 1204 @SuppressWarnings("unchecked") T tr = (T) r; 1205 t = tr; 1206 } 1207 dst.internalComplete(t, dx); 1208 } 1209 } 1210 private static final long serialVersionUID = 5232453952276885070L; 1211 } 1212 1213 static final class ThenCopy extends Completion { 1214 final CompletableFuture<?> src; 1215 final CompletableFuture<?> dst; 1216 ThenCopy(CompletableFuture<?> src, 1217 CompletableFuture<?> dst) { 1218 this.src = src; this.dst = dst; 1219 } 1220 public final void run() { 1221 final CompletableFuture<?> a; 1222 final CompletableFuture<?> dst; 1223 Object r; Object t; Throwable ex; 1224 if ((dst = this.dst) != null && 1225 (a = this.src) != null && 1226 (r = a.result) != null && 1227 compareAndSet(0, 1)) { 1228 if (r instanceof AltResult) { 1229 ex = ((AltResult)r).ex; 1230 t = null; 1231 } 1232 else { 1233 ex = null; 1234 t = r; 1235 } 1236 dst.internalComplete(t, ex); 1237 } 1238 } 1239 private static final long serialVersionUID = 5232453952276885070L; 1240 } 1241 1242 static final class HandleCompletion<T,U> extends Completion { 1243 final CompletableFuture<? extends T> src; 1244 final BiFunction<? super T, Throwable, ? extends U> fn; 1245 final CompletableFuture<U> dst; 1246 HandleCompletion(CompletableFuture<? extends T> src, 1247 BiFunction<? super T, Throwable, ? extends U> fn, 1248 final CompletableFuture<U> dst) { 1249 this.src = src; this.fn = fn; this.dst = dst; 1250 } 1251 public final void run() { 1252 final CompletableFuture<? extends T> a; 1253 final BiFunction<? super T, Throwable, ? extends U> fn; 1254 final CompletableFuture<U> dst; 1255 Object r; T t; Throwable ex; 1256 if ((dst = this.dst) != null && 1257 (fn = this.fn) != null && 1258 (a = this.src) != null && 1259 (r = a.result) != null && 1260 compareAndSet(0, 1)) { 1261 if (r instanceof AltResult) { 1262 ex = ((AltResult)r).ex; 1263 t = null; 1264 } 1265 else { 1266 ex = null; 1267 @SuppressWarnings("unchecked") T tr = (T) r; 1268 t = tr; 1269 } 1270 U u = null; Throwable dx = null; 1271 try { 1272 u = fn.apply(t, ex); 1273 } catch (Throwable rex) { 1274 dx = rex; 1275 } 1276 dst.internalComplete(u, dx); 1277 } 1278 } 1279 private static final long serialVersionUID = 5232453952276885070L; 1280 } 1281 1282 static final class ComposeCompletion<T,U> extends Completion { 1283 final CompletableFuture<? extends T> src; 1284 final Function<? super T, CompletableFuture<U>> fn; 1285 final CompletableFuture<U> dst; 1286 final Executor executor; 1287 ComposeCompletion(CompletableFuture<? extends T> src, 1288 Function<? super T, CompletableFuture<U>> fn, 1289 final CompletableFuture<U> dst, Executor executor) { 1290 this.src = src; this.fn = fn; this.dst = dst; 1291 this.executor = executor; 1292 } 1293 public final void run() { 1294 final CompletableFuture<? extends T> a; 1295 final Function<? super T, CompletableFuture<U>> fn; 1296 final CompletableFuture<U> dst; 1297 Object r; T t; Throwable ex; Executor e; 1298 if ((dst = this.dst) != null && 1299 (fn = this.fn) != null && 1300 (a = this.src) != null && 1301 (r = a.result) != null && 1302 compareAndSet(0, 1)) { 1303 if (r instanceof AltResult) { 1304 ex = ((AltResult)r).ex; 1305 t = null; 1306 } 1307 else { 1308 ex = null; 1309 @SuppressWarnings("unchecked") T tr = (T) r; 1310 t = tr; 1311 } 1312 CompletableFuture<U> c = null; 1313 U u = null; 1314 boolean complete = false; 1315 if (ex == null) { 1316 if ((e = executor) != null) 1317 e.execute(new AsyncCompose<T,U>(t, fn, dst)); 1318 else { 1319 try { 1320 if ((c = fn.apply(t)) == null) 1321 ex = new NullPointerException(); 1322 } catch (Throwable rex) { 1323 ex = rex; 1324 } 1325 } 1326 } 1327 if (c != null) { 1328 ThenCopy d = null; 1329 Object s; 1330 if ((s = c.result) == null) { 1331 CompletionNode p = new CompletionNode 1332 (d = new ThenCopy(c, dst)); 1333 while ((s = c.result) == null) { 1334 if (UNSAFE.compareAndSwapObject 1335 (c, COMPLETIONS, p.next = c.completions, p)) 1336 break; 1337 } 1338 } 1339 if (s != null && (d == null || d.compareAndSet(0, 1))) { 1340 complete = true; 1341 if (s instanceof AltResult) { 1342 ex = ((AltResult)s).ex; // no rewrap 1343 u = null; 1344 } 1345 else { 1346 @SuppressWarnings("unchecked") U us = (U) s; 1347 u = us; 1348 } 1349 } 1350 } 1351 if (complete || ex != null) 1352 dst.internalComplete(u, ex); 1353 if (c != null) 1354 c.helpPostComplete(); 1355 } 1356 } 1357 private static final long serialVersionUID = 5232453952276885070L; 1358 } 1359 1360 // public methods 1361 1362 /** 1363 * Creates a new incomplete CompletableFuture. 1364 */ 1365 public CompletableFuture() { 1366 } 1367 1368 /** 1369 * Asynchronously executes in the {@link 1370 * ForkJoinPool#commonPool()}, a task that completes the returned 1371 * CompletableFuture with the result of the given Supplier. 1372 * 1373 * @param supplier a function returning the value to be used 1374 * to complete the returned CompletableFuture 1375 * @return the CompletableFuture 1376 */ 1377 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { 1378 if (supplier == null) throw new NullPointerException(); 1379 CompletableFuture<U> f = new CompletableFuture<U>(); 1380 ForkJoinPool.commonPool(). 1381 execute((ForkJoinTask<?>)new AsyncSupply<U>(supplier, f)); 1382 return f; 1383 } 1384 1385 /** 1386 * Asynchronously executes using the given executor, a task that 1387 * completes the returned CompletableFuture with the result of the 1388 * given Supplier. 1389 * 1390 * @param supplier a function returning the value to be used 1391 * to complete the returned CompletableFuture 1392 * @param executor the executor to use for asynchronous execution 1393 * @return the CompletableFuture 1394 */ 1395 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, 1396 Executor executor) { 1397 if (executor == null || supplier == null) 1398 throw new NullPointerException(); 1399 CompletableFuture<U> f = new CompletableFuture<U>(); 1400 executor.execute(new AsyncSupply<U>(supplier, f)); 1401 return f; 1402 } 1403 1404 /** 1405 * Asynchronously executes in the {@link 1406 * ForkJoinPool#commonPool()} a task that runs the given action, 1407 * and then completes the returned CompletableFuture. 1408 * 1409 * @param runnable the action to run before completing the 1410 * returned CompletableFuture 1411 * @return the CompletableFuture 1412 */ 1413 public static CompletableFuture<Void> runAsync(Runnable runnable) { 1414 if (runnable == null) throw new NullPointerException(); 1415 CompletableFuture<Void> f = new CompletableFuture<Void>(); 1416 ForkJoinPool.commonPool(). 1417 execute((ForkJoinTask<?>)new AsyncRun(runnable, f)); 1418 return f; 1419 } 1420 1421 /** 1422 * Asynchronously executes using the given executor, a task that 1423 * runs the given action, and then completes the returned 1424 * CompletableFuture. 1425 * 1426 * @param runnable the action to run before completing the 1427 * returned CompletableFuture 1428 * @param executor the executor to use for asynchronous execution 1429 * @return the CompletableFuture 1430 */ 1431 public static CompletableFuture<Void> runAsync(Runnable runnable, 1432 Executor executor) { 1433 if (executor == null || runnable == null) 1434 throw new NullPointerException(); 1435 CompletableFuture<Void> f = new CompletableFuture<Void>(); 1436 executor.execute(new AsyncRun(runnable, f)); 1437 return f; 1438 } 1439 1440 /** 1441 * Returns {@code true} if completed in any fashion: normally, 1442 * exceptionally, or via cancellation. 1443 * 1444 * @return {@code true} if completed 1445 */ 1446 public boolean isDone() { 1447 return result != null; 1448 } 1449 1450 /** 1451 * Waits if necessary for this future to complete, and then 1452 * returns its result. 1453 * 1454 * @return the result value 1455 * @throws CancellationException if this future was cancelled 1456 * @throws ExecutionException if this future completed exceptionally 1457 * @throws InterruptedException if the current thread was interrupted 1458 * while waiting 1459 */ 1460 public T get() throws InterruptedException, ExecutionException { 1461 Object r; Throwable ex, cause; 1462 if ((r = result) == null && (r = waitingGet(true)) == null) 1463 throw new InterruptedException(); 1464 if (!(r instanceof AltResult)) { 1465 @SuppressWarnings("unchecked") T tr = (T) r; 1466 return tr; 1467 } 1468 if ((ex = ((AltResult)r).ex) == null) 1469 return null; 1470 if (ex instanceof CancellationException) 1471 throw (CancellationException)ex; 1472 if ((ex instanceof CompletionException) && 1473 (cause = ex.getCause()) != null) 1474 ex = cause; 1475 throw new ExecutionException(ex); 1476 } 1477 1478 /** 1479 * Waits if necessary for at most the given time for this future 1480 * to complete, and then returns its result, if available. 1481 * 1482 * @param timeout the maximum time to wait 1483 * @param unit the time unit of the timeout argument 1484 * @return the result value 1485 * @throws CancellationException if this future was cancelled 1486 * @throws ExecutionException if this future completed exceptionally 1487 * @throws InterruptedException if the current thread was interrupted 1488 * while waiting 1489 * @throws TimeoutException if the wait timed out 1490 */ 1491 public T get(long timeout, TimeUnit unit) 1492 throws InterruptedException, ExecutionException, TimeoutException { 1493 Object r; Throwable ex, cause; 1494 long nanos = unit.toNanos(timeout); 1495 if (Thread.interrupted()) 1496 throw new InterruptedException(); 1497 if ((r = result) == null) 1498 r = timedAwaitDone(nanos); 1499 if (!(r instanceof AltResult)) { 1500 @SuppressWarnings("unchecked") T tr = (T) r; 1501 return tr; 1502 } 1503 if ((ex = ((AltResult)r).ex) == null) 1504 return null; 1505 if (ex instanceof CancellationException) 1506 throw (CancellationException)ex; 1507 if ((ex instanceof CompletionException) && 1508 (cause = ex.getCause()) != null) 1509 ex = cause; 1510 throw new ExecutionException(ex); 1511 } 1512 1513 /** 1514 * Returns the result value when complete, or throws an 1515 * (unchecked) exception if completed exceptionally. To better 1516 * conform with the use of common functional forms, if a 1517 * computation involved in the completion of this 1518 * CompletableFuture threw an exception, this method throws an 1519 * (unchecked) {@link CompletionException} with the underlying 1520 * exception as its cause. 1521 * 1522 * @return the result value 1523 * @throws CancellationException if the computation was cancelled 1524 * @throws CompletionException if this future completed 1525 * exceptionally or a completion computation threw an exception 1526 */ 1527 public T join() { 1528 Object r; Throwable ex; 1529 if ((r = result) == null) 1530 r = waitingGet(false); 1531 if (!(r instanceof AltResult)) { 1532 @SuppressWarnings("unchecked") T tr = (T) r; 1533 return tr; 1534 } 1535 if ((ex = ((AltResult)r).ex) == null) 1536 return null; 1537 if (ex instanceof CancellationException) 1538 throw (CancellationException)ex; 1539 if (ex instanceof CompletionException) 1540 throw (CompletionException)ex; 1541 throw new CompletionException(ex); 1542 } 1543 1544 /** 1545 * Returns the result value (or throws any encountered exception) 1546 * if completed, else returns the given valueIfAbsent. 1547 * 1548 * @param valueIfAbsent the value to return if not completed 1549 * @return the result value, if completed, else the given valueIfAbsent 1550 * @throws CancellationException if the computation was cancelled 1551 * @throws CompletionException if this future completed 1552 * exceptionally or a completion computation threw an exception 1553 */ 1554 public T getNow(T valueIfAbsent) { 1555 Object r; Throwable ex; 1556 if ((r = result) == null) 1557 return valueIfAbsent; 1558 if (!(r instanceof AltResult)) { 1559 @SuppressWarnings("unchecked") T tr = (T) r; 1560 return tr; 1561 } 1562 if ((ex = ((AltResult)r).ex) == null) 1563 return null; 1564 if (ex instanceof CancellationException) 1565 throw (CancellationException)ex; 1566 if (ex instanceof CompletionException) 1567 throw (CompletionException)ex; 1568 throw new CompletionException(ex); 1569 } 1570 1571 /** 1572 * If not already completed, sets the value returned by {@link 1573 * #get()} and related methods to the given value. 1574 * 1575 * @param value the result value 1576 * @return {@code true} if this invocation caused this CompletableFuture 1577 * to transition to a completed state, else {@code false} 1578 */ 1579 public boolean complete(T value) { 1580 boolean triggered = result == null && 1581 UNSAFE.compareAndSwapObject(this, RESULT, null, 1582 value == null ? NIL : value); 1583 postComplete(); 1584 return triggered; 1585 } 1586 1587 /** 1588 * If not already completed, causes invocations of {@link #get()} 1589 * and related methods to throw the given exception. 1590 * 1591 * @param ex the exception 1592 * @return {@code true} if this invocation caused this CompletableFuture 1593 * to transition to a completed state, else {@code false} 1594 */ 1595 public boolean completeExceptionally(Throwable ex) { 1596 if (ex == null) throw new NullPointerException(); 1597 boolean triggered = result == null && 1598 UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex)); 1599 postComplete(); 1600 return triggered; 1601 } 1602 1603 /** 1604 * Creates and returns a CompletableFuture that is completed with 1605 * the result of the given function of this CompletableFuture. 1606 * If this CompletableFuture completes exceptionally, 1607 * then the returned CompletableFuture also does so, 1608 * with a CompletionException holding this exception as 1609 * its cause. 1610 * 1611 * @param fn the function to use to compute the value of 1612 * the returned CompletableFuture 1613 * @return the new CompletableFuture 1614 */ 1615 public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) { 1616 return doThenApply(fn, null); 1617 } 1618 1619 /** 1620 * Creates and returns a CompletableFuture that is asynchronously 1621 * completed using the {@link ForkJoinPool#commonPool()} with the 1622 * result of the given function of this CompletableFuture. If 1623 * this CompletableFuture completes exceptionally, then the 1624 * returned CompletableFuture also does so, with a 1625 * CompletionException holding this exception as its cause. 1626 * 1627 * @param fn the function to use to compute the value of 1628 * the returned CompletableFuture 1629 * @return the new CompletableFuture 1630 */ 1631 public <U> CompletableFuture<U> thenApplyAsync 1632 (Function<? super T,? extends U> fn) { 1633 return doThenApply(fn, ForkJoinPool.commonPool()); 1634 } 1635 1636 /** 1637 * Creates and returns a CompletableFuture that is asynchronously 1638 * completed using the given executor with the result of the given 1639 * function of this CompletableFuture. If this CompletableFuture 1640 * completes exceptionally, then the returned CompletableFuture 1641 * also does so, with a CompletionException holding this exception as 1642 * its cause. 1643 * 1644 * @param fn the function to use to compute the value of 1645 * the returned CompletableFuture 1646 * @param executor the executor to use for asynchronous execution 1647 * @return the new CompletableFuture 1648 */ 1649 public <U> CompletableFuture<U> thenApplyAsync 1650 (Function<? super T,? extends U> fn, 1651 Executor executor) { 1652 if (executor == null) throw new NullPointerException(); 1653 return doThenApply(fn, executor); 1654 } 1655 1656 private <U> CompletableFuture<U> doThenApply 1657 (Function<? super T,? extends U> fn, 1658 Executor e) { 1659 if (fn == null) throw new NullPointerException(); 1660 CompletableFuture<U> dst = new CompletableFuture<U>(); 1661 ApplyCompletion<T,U> d = null; 1662 Object r; 1663 if ((r = result) == null) { 1664 CompletionNode p = new CompletionNode 1665 (d = new ApplyCompletion<T,U>(this, fn, dst, e)); 1666 while ((r = result) == null) { 1667 if (UNSAFE.compareAndSwapObject 1668 (this, COMPLETIONS, p.next = completions, p)) 1669 break; 1670 } 1671 } 1672 if (r != null && (d == null || d.compareAndSet(0, 1))) { 1673 T t; Throwable ex; 1674 if (r instanceof AltResult) { 1675 ex = ((AltResult)r).ex; 1676 t = null; 1677 } 1678 else { 1679 ex = null; 1680 @SuppressWarnings("unchecked") T tr = (T) r; 1681 t = tr; 1682 } 1683 U u = null; 1684 if (ex == null) { 1685 try { 1686 if (e != null) 1687 e.execute(new AsyncApply<T,U>(t, fn, dst)); 1688 else 1689 u = fn.apply(t); 1690 } catch (Throwable rex) { 1691 ex = rex; 1692 } 1693 } 1694 if (e == null || ex != null) 1695 dst.internalComplete(u, ex); 1696 } 1697 helpPostComplete(); 1698 return dst; 1699 } 1700 1701 /** 1702 * Creates and returns a CompletableFuture that is completed after 1703 * performing the given action with this CompletableFuture's 1704 * result when it completes. If this CompletableFuture 1705 * completes exceptionally, then the returned CompletableFuture 1706 * also does so, with a CompletionException holding this exception as 1707 * its cause. 1708 * 1709 * @param block the action to perform before completing the 1710 * returned CompletableFuture 1711 * @return the new CompletableFuture 1712 */ 1713 public CompletableFuture<Void> thenAccept(Consumer<? super T> block) { 1714 return doThenAccept(block, null); 1715 } 1716 1717 /** 1718 * Creates and returns a CompletableFuture that is asynchronously 1719 * completed using the {@link ForkJoinPool#commonPool()} with this 1720 * CompletableFuture's result when it completes. If this 1721 * CompletableFuture completes exceptionally, then the returned 1722 * CompletableFuture also does so, with a CompletionException holding 1723 * this exception as its cause. 1724 * 1725 * @param block the action to perform before completing the 1726 * returned CompletableFuture 1727 * @return the new CompletableFuture 1728 */ 1729 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block) { 1730 return doThenAccept(block, ForkJoinPool.commonPool()); 1731 } 1732 1733 /** 1734 * Creates and returns a CompletableFuture that is asynchronously 1735 * completed using the given executor with this 1736 * CompletableFuture's result when it completes. If this 1737 * CompletableFuture completes exceptionally, then the returned 1738 * CompletableFuture also does so, with a CompletionException holding 1739 * this exception as its cause. 1740 * 1741 * @param block the action to perform before completing the 1742 * returned CompletableFuture 1743 * @param executor the executor to use for asynchronous execution 1744 * @return the new CompletableFuture 1745 */ 1746 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block, 1747 Executor executor) { 1748 if (executor == null) throw new NullPointerException(); 1749 return doThenAccept(block, executor); 1750 } 1751 1752 private CompletableFuture<Void> doThenAccept(Consumer<? super T> fn, 1753 Executor e) { 1754 if (fn == null) throw new NullPointerException(); 1755 CompletableFuture<Void> dst = new CompletableFuture<Void>(); 1756 AcceptCompletion<T> d = null; 1757 Object r; 1758 if ((r = result) == null) { 1759 CompletionNode p = new CompletionNode 1760 (d = new AcceptCompletion<T>(this, fn, dst, e)); 1761 while ((r = result) == null) { 1762 if (UNSAFE.compareAndSwapObject 1763 (this, COMPLETIONS, p.next = completions, p)) 1764 break; 1765 } 1766 } 1767 if (r != null && (d == null || d.compareAndSet(0, 1))) { 1768 T t; Throwable ex; 1769 if (r instanceof AltResult) { 1770 ex = ((AltResult)r).ex; 1771 t = null; 1772 } 1773 else { 1774 ex = null; 1775 @SuppressWarnings("unchecked") T tr = (T) r; 1776 t = tr; 1777 } 1778 if (ex == null) { 1779 try { 1780 if (e != null) 1781 e.execute(new AsyncAccept<T>(t, fn, dst)); 1782 else 1783 fn.accept(t); 1784 } catch (Throwable rex) { 1785 ex = rex; 1786 } 1787 } 1788 if (e == null || ex != null) 1789 dst.internalComplete(null, ex); 1790 } 1791 helpPostComplete(); 1792 return dst; 1793 } 1794 1795 /** 1796 * Creates and returns a CompletableFuture that is completed after 1797 * performing the given action when this CompletableFuture 1798 * completes. If this CompletableFuture completes exceptionally, 1799 * then the returned CompletableFuture also does so, with a 1800 * CompletionException holding this exception as its cause. 1801 * 1802 * @param action the action to perform before completing the 1803 * returned CompletableFuture 1804 * @return the new CompletableFuture 1805 */ 1806 public CompletableFuture<Void> thenRun(Runnable action) { 1807 return doThenRun(action, null); 1808 } 1809 1810 /** 1811 * Creates and returns a CompletableFuture that is asynchronously 1812 * completed using the {@link ForkJoinPool#commonPool()} after 1813 * performing the given action when this CompletableFuture 1814 * completes. If this CompletableFuture completes exceptionally, 1815 * then the returned CompletableFuture also does so, with a 1816 * CompletionException holding this exception as its cause. 1817 * 1818 * @param action the action to perform before completing the 1819 * returned CompletableFuture 1820 * @return the new CompletableFuture 1821 */ 1822 public CompletableFuture<Void> thenRunAsync(Runnable action) { 1823 return doThenRun(action, ForkJoinPool.commonPool()); 1824 } 1825 1826 /** 1827 * Creates and returns a CompletableFuture that is asynchronously 1828 * completed using the given executor after performing the given 1829 * action when this CompletableFuture completes. If this 1830 * CompletableFuture completes exceptionally, then the returned 1831 * CompletableFuture also does so, with a CompletionException holding 1832 * this exception as its cause. 1833 * 1834 * @param action the action to perform before completing the 1835 * returned CompletableFuture 1836 * @param executor the executor to use for asynchronous execution 1837 * @return the new CompletableFuture 1838 */ 1839 public CompletableFuture<Void> thenRunAsync(Runnable action, 1840 Executor executor) { 1841 if (executor == null) throw new NullPointerException(); 1842 return doThenRun(action, executor); 1843 } 1844 1845 private CompletableFuture<Void> doThenRun(Runnable action, 1846 Executor e) { 1847 if (action == null) throw new NullPointerException(); 1848 CompletableFuture<Void> dst = new CompletableFuture<Void>(); 1849 RunCompletion<T> d = null; 1850 Object r; 1851 if ((r = result) == null) { 1852 CompletionNode p = new CompletionNode 1853 (d = new RunCompletion<T>(this, action, dst, e)); 1854 while ((r = result) == null) { 1855 if (UNSAFE.compareAndSwapObject 1856 (this, COMPLETIONS, p.next = completions, p)) 1857 break; 1858 } 1859 } 1860 if (r != null && (d == null || d.compareAndSet(0, 1))) { 1861 Throwable ex; 1862 if (r instanceof AltResult) 1863 ex = ((AltResult)r).ex; 1864 else 1865 ex = null; 1866 if (ex == null) { 1867 try { 1868 if (e != null) 1869 e.execute(new AsyncRun(action, dst)); 1870 else 1871 action.run(); 1872 } catch (Throwable rex) { 1873 ex = rex; 1874 } 1875 } 1876 if (e == null || ex != null) 1877 dst.internalComplete(null, ex); 1878 } 1879 helpPostComplete(); 1880 return dst; 1881 } 1882 1883 /** 1884 * Creates and returns a CompletableFuture that is completed with 1885 * the result of the given function of this and the other given 1886 * CompletableFuture's results when both complete. If this or 1887 * the other CompletableFuture complete exceptionally, then the 1888 * returned CompletableFuture also does so, with a 1889 * CompletionException holding the exception as its cause. 1890 * 1891 * @param other the other CompletableFuture 1892 * @param fn the function to use to compute the value of 1893 * the returned CompletableFuture 1894 * @return the new CompletableFuture 1895 */ 1896 public <U,V> CompletableFuture<V> thenCombine 1897 (CompletableFuture<? extends U> other, 1898 BiFunction<? super T,? super U,? extends V> fn) { 1899 return doThenBiApply(other, fn, null); 1900 } 1901 1902 /** 1903 * Creates and returns a CompletableFuture that is asynchronously 1904 * completed using the {@link ForkJoinPool#commonPool()} with 1905 * the result of the given function of this and the other given 1906 * CompletableFuture's results when both complete. If this or 1907 * the other CompletableFuture complete exceptionally, then the 1908 * returned CompletableFuture also does so, with a 1909 * CompletionException holding the exception as its cause. 1910 * 1911 * @param other the other CompletableFuture 1912 * @param fn the function to use to compute the value of 1913 * the returned CompletableFuture 1914 * @return the new CompletableFuture 1915 */ 1916 public <U,V> CompletableFuture<V> thenCombineAsync 1917 (CompletableFuture<? extends U> other, 1918 BiFunction<? super T,? super U,? extends V> fn) { 1919 return doThenBiApply(other, fn, ForkJoinPool.commonPool()); 1920 } 1921 1922 /** 1923 * Creates and returns a CompletableFuture that is 1924 * asynchronously completed using the given executor with the 1925 * result of the given function of this and the other given 1926 * CompletableFuture's results when both complete. If this or 1927 * the other CompletableFuture complete exceptionally, then the 1928 * returned CompletableFuture also does so, with a 1929 * CompletionException holding the exception as its cause. 1930 * 1931 * @param other the other CompletableFuture 1932 * @param fn the function to use to compute the value of 1933 * the returned CompletableFuture 1934 * @param executor the executor to use for asynchronous execution 1935 * @return the new CompletableFuture 1936 */ 1937 public <U,V> CompletableFuture<V> thenCombineAsync 1938 (CompletableFuture<? extends U> other, 1939 BiFunction<? super T,? super U,? extends V> fn, 1940 Executor executor) { 1941 if (executor == null) throw new NullPointerException(); 1942 return doThenBiApply(other, fn, executor); 1943 } 1944 1945 private <U,V> CompletableFuture<V> doThenBiApply 1946 (CompletableFuture<? extends U> other, 1947 BiFunction<? super T,? super U,? extends V> fn, 1948 Executor e) { 1949 if (other == null || fn == null) throw new NullPointerException(); 1950 CompletableFuture<V> dst = new CompletableFuture<V>(); 1951 BiApplyCompletion<T,U,V> d = null; 1952 Object r, s = null; 1953 if ((r = result) == null || (s = other.result) == null) { 1954 d = new BiApplyCompletion<T,U,V>(this, other, fn, dst, e); 1955 CompletionNode q = null, p = new CompletionNode(d); 1956 while ((r == null && (r = result) == null) || 1957 (s == null && (s = other.result) == null)) { 1958 if (q != null) { 1959 if (s != null || 1960 UNSAFE.compareAndSwapObject 1961 (other, COMPLETIONS, q.next = other.completions, q)) 1962 break; 1963 } 1964 else if (r != null || 1965 UNSAFE.compareAndSwapObject 1966 (this, COMPLETIONS, p.next = completions, p)) { 1967 if (s != null) 1968 break; 1969 q = new CompletionNode(d); 1970 } 1971 } 1972 } 1973 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) { 1974 T t; U u; Throwable ex; 1975 if (r instanceof AltResult) { 1976 ex = ((AltResult)r).ex; 1977 t = null; 1978 } 1979 else { 1980 ex = null; 1981 @SuppressWarnings("unchecked") T tr = (T) r; 1982 t = tr; 1983 } 1984 if (ex != null) 1985 u = null; 1986 else if (s instanceof AltResult) { 1987 ex = ((AltResult)s).ex; 1988 u = null; 1989 } 1990 else { 1991 @SuppressWarnings("unchecked") U us = (U) s; 1992 u = us; 1993 } 1994 V v = null; 1995 if (ex == null) { 1996 try { 1997 if (e != null) 1998 e.execute(new AsyncBiApply<T,U,V>(t, u, fn, dst)); 1999 else 2000 v = fn.apply(t, u); 2001 } catch (Throwable rex) { 2002 ex = rex; 2003 } 2004 } 2005 if (e == null || ex != null) 2006 dst.internalComplete(v, ex); 2007 } 2008 helpPostComplete(); 2009 other.helpPostComplete(); 2010 return dst; 2011 } 2012 2013 /** 2014 * Creates and returns a CompletableFuture that is completed with 2015 * the results of this and the other given CompletableFuture if 2016 * both complete. If this and/or the other CompletableFuture 2017 * complete exceptionally, then the returned CompletableFuture 2018 * also does so, with a CompletionException holding one of these 2019 * exceptions as its cause. 2020 * 2021 * @param other the other CompletableFuture 2022 * @param block the action to perform before completing the 2023 * returned CompletableFuture 2024 * @return the new CompletableFuture 2025 */ 2026 public <U> CompletableFuture<Void> thenAcceptBoth 2027 (CompletableFuture<? extends U> other, 2028 BiConsumer<? super T, ? super U> block) { 2029 return doThenBiAccept(other, block, null); 2030 } 2031 2032 /** 2033 * Creates and returns a CompletableFuture that is completed 2034 * asynchronously using the {@link ForkJoinPool#commonPool()} with 2035 * the results of this and the other given CompletableFuture when 2036 * both complete. If this and/or the other CompletableFuture 2037 * complete exceptionally, then the returned CompletableFuture 2038 * also does so, with a CompletionException holding one of these 2039 * exceptions as its cause. 2040 * 2041 * @param other the other CompletableFuture 2042 * @param block the action to perform before completing the 2043 * returned CompletableFuture 2044 * @return the new CompletableFuture 2045 */ 2046 public <U> CompletableFuture<Void> thenAcceptBothAsync 2047 (CompletableFuture<? extends U> other, 2048 BiConsumer<? super T, ? super U> block) { 2049 return doThenBiAccept(other, block, ForkJoinPool.commonPool()); 2050 } 2051 2052 /** 2053 * Creates and returns a CompletableFuture that is completed 2054 * asynchronously using the given executor with the results of 2055 * this and the other given CompletableFuture when both complete. 2056 * If this and/or the other CompletableFuture complete exceptionally, 2057 * then the returned CompletableFuture also does so, with a 2058 * CompletionException holding one of these exceptions as its cause. 2059 * 2060 * @param other the other CompletableFuture 2061 * @param block the action to perform before completing the 2062 * returned CompletableFuture 2063 * @param executor the executor to use for asynchronous execution 2064 * @return the new CompletableFuture 2065 */ 2066 public <U> CompletableFuture<Void> thenAcceptBothAsync 2067 (CompletableFuture<? extends U> other, 2068 BiConsumer<? super T, ? super U> block, 2069 Executor executor) { 2070 if (executor == null) throw new NullPointerException(); 2071 return doThenBiAccept(other, block, executor); 2072 } 2073 2074 private <U> CompletableFuture<Void> doThenBiAccept 2075 (CompletableFuture<? extends U> other, 2076 BiConsumer<? super T,? super U> fn, 2077 Executor e) { 2078 if (other == null || fn == null) throw new NullPointerException(); 2079 CompletableFuture<Void> dst = new CompletableFuture<Void>(); 2080 BiAcceptCompletion<T,U> d = null; 2081 Object r, s = null; 2082 if ((r = result) == null || (s = other.result) == null) { 2083 d = new BiAcceptCompletion<T,U>(this, other, fn, dst, e); 2084 CompletionNode q = null, p = new CompletionNode(d); 2085 while ((r == null && (r = result) == null) || 2086 (s == null && (s = other.result) == null)) { 2087 if (q != null) { 2088 if (s != null || 2089 UNSAFE.compareAndSwapObject 2090 (other, COMPLETIONS, q.next = other.completions, q)) 2091 break; 2092 } 2093 else if (r != null || 2094 UNSAFE.compareAndSwapObject 2095 (this, COMPLETIONS, p.next = completions, p)) { 2096 if (s != null) 2097 break; 2098 q = new CompletionNode(d); 2099 } 2100 } 2101 } 2102 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) { 2103 T t; U u; Throwable ex; 2104 if (r instanceof AltResult) { 2105 ex = ((AltResult)r).ex; 2106 t = null; 2107 } 2108 else { 2109 ex = null; 2110 @SuppressWarnings("unchecked") T tr = (T) r; 2111 t = tr; 2112 } 2113 if (ex != null) 2114 u = null; 2115 else if (s instanceof AltResult) { 2116 ex = ((AltResult)s).ex; 2117 u = null; 2118 } 2119 else { 2120 @SuppressWarnings("unchecked") U us = (U) s; 2121 u = us; 2122 } 2123 if (ex == null) { 2124 try { 2125 if (e != null) 2126 e.execute(new AsyncBiAccept<T,U>(t, u, fn, dst)); 2127 else 2128 fn.accept(t, u); 2129 } catch (Throwable rex) { 2130 ex = rex; 2131 } 2132 } 2133 if (e == null || ex != null) 2134 dst.internalComplete(null, ex); 2135 } 2136 helpPostComplete(); 2137 other.helpPostComplete(); 2138 return dst; 2139 } 2140 2141 /** 2142 * Creates and returns a CompletableFuture that is completed when 2143 * this and the other given CompletableFuture both complete. 2144 * If this and/or the other CompletableFuture complete exceptionally, 2145 * then the returned CompletableFuture also does so, with a 2146 * CompletionException holding one of these exceptions as its cause. 2147 * 2148 * @param other the other CompletableFuture 2149 * @param action the action to perform before completing the 2150 * returned CompletableFuture 2151 * @return the new CompletableFuture 2152 */ 2153 public CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other, 2154 Runnable action) { 2155 return doThenBiRun(other, action, null); 2156 } 2157 2158 /** 2159 * Creates and returns a CompletableFuture that is completed 2160 * asynchronously using the {@link ForkJoinPool#commonPool()} 2161 * when this and the other given CompletableFuture both complete. 2162 * If this and/or the other CompletableFuture complete exceptionally, 2163 * then the returned CompletableFuture also does so, with a 2164 * CompletionException holding one of these exceptions as its cause. 2165 * 2166 * @param other the other CompletableFuture 2167 * @param action the action to perform before completing the 2168 * returned CompletableFuture 2169 * @return the new CompletableFuture 2170 */ 2171 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other, 2172 Runnable action) { 2173 return doThenBiRun(other, action, ForkJoinPool.commonPool()); 2174 } 2175 2176 /** 2177 * Creates and returns a CompletableFuture that is completed 2178 * asynchronously using the given executor when this and the 2179 * other given CompletableFuture both complete. 2180 * If this and/or the other CompletableFuture complete exceptionally, 2181 * then the returned CompletableFuture also does so, with a 2182 * CompletionException holding one of these exceptions as its cause. 2183 * 2184 * @param other the other CompletableFuture 2185 * @param action the action to perform before completing the 2186 * returned CompletableFuture 2187 * @param executor the executor to use for asynchronous execution 2188 * @return the new CompletableFuture 2189 */ 2190 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other, 2191 Runnable action, 2192 Executor executor) { 2193 if (executor == null) throw new NullPointerException(); 2194 return doThenBiRun(other, action, executor); 2195 } 2196 2197 private CompletableFuture<Void> doThenBiRun(CompletableFuture<?> other, 2198 Runnable action, 2199 Executor e) { 2200 if (other == null || action == null) throw new NullPointerException(); 2201 CompletableFuture<Void> dst = new CompletableFuture<Void>(); 2202 BiRunCompletion<T> d = null; 2203 Object r, s = null; 2204 if ((r = result) == null || (s = other.result) == null) { 2205 d = new BiRunCompletion<T>(this, other, action, dst, e); 2206 CompletionNode q = null, p = new CompletionNode(d); 2207 while ((r == null && (r = result) == null) || 2208 (s == null && (s = other.result) == null)) { 2209 if (q != null) { 2210 if (s != null || 2211 UNSAFE.compareAndSwapObject 2212 (other, COMPLETIONS, q.next = other.completions, q)) 2213 break; 2214 } 2215 else if (r != null || 2216 UNSAFE.compareAndSwapObject 2217 (this, COMPLETIONS, p.next = completions, p)) { 2218 if (s != null) 2219 break; 2220 q = new CompletionNode(d); 2221 } 2222 } 2223 } 2224 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) { 2225 Throwable ex; 2226 if (r instanceof AltResult) 2227 ex = ((AltResult)r).ex; 2228 else 2229 ex = null; 2230 if (ex == null && (s instanceof AltResult)) 2231 ex = ((AltResult)s).ex; 2232 if (ex == null) { 2233 try { 2234 if (e != null) 2235 e.execute(new AsyncRun(action, dst)); 2236 else 2237 action.run(); 2238 } catch (Throwable rex) { 2239 ex = rex; 2240 } 2241 } 2242 if (e == null || ex != null) 2243 dst.internalComplete(null, ex); 2244 } 2245 helpPostComplete(); 2246 other.helpPostComplete(); 2247 return dst; 2248 } 2249 2250 /** 2251 * Creates and returns a CompletableFuture that is completed with 2252 * the result of the given function of either this or the other 2253 * given CompletableFuture's results when either complete. 2254 * If this and/or the other CompletableFuture complete exceptionally, 2255 * then the returned CompletableFuture may also do so, with a 2256 * CompletionException holding one of these exceptions as its cause. 2257 * No guarantees are made about which result or exception is used 2258 * in the returned CompletableFuture. 2259 * 2260 * @param other the other CompletableFuture 2261 * @param fn the function to use to compute the value of 2262 * the returned CompletableFuture 2263 * @return the new CompletableFuture 2264 */ 2265 public <U> CompletableFuture<U> applyToEither 2266 (CompletableFuture<? extends T> other, 2267 Function<? super T, U> fn) { 2268 return doOrApply(other, fn, null); 2269 } 2270 2271 /** 2272 * Creates and returns a CompletableFuture that is completed 2273 * asynchronously using the {@link ForkJoinPool#commonPool()} with 2274 * the result of the given function of either this or the other 2275 * given CompletableFuture's results when either complete. 2276 * If this and/or the other CompletableFuture complete exceptionally, 2277 * then the returned CompletableFuture may also do so, with a 2278 * CompletionException holding one of these exceptions as its cause. 2279 * No guarantees are made about which result or exception is used 2280 * in the returned CompletableFuture. 2281 * 2282 * @param other the other CompletableFuture 2283 * @param fn the function to use to compute the value of 2284 * the returned CompletableFuture 2285 * @return the new CompletableFuture 2286 */ 2287 public <U> CompletableFuture<U> applyToEitherAsync 2288 (CompletableFuture<? extends T> other, 2289 Function<? super T, U> fn) { 2290 return doOrApply(other, fn, ForkJoinPool.commonPool()); 2291 } 2292 2293 /** 2294 * Creates and returns a CompletableFuture that is completed 2295 * asynchronously using the given executor with the result of the 2296 * given function of either this or the other given 2297 * CompletableFuture's results when either complete. If this 2298 * and/or the other CompletableFuture complete exceptionally, then 2299 * the returned CompletableFuture may also do so, with a 2300 * CompletionException holding one of these exceptions as its cause. 2301 * No guarantees are made about which result or exception is used 2302 * in the returned CompletableFuture. 2303 * 2304 * @param other the other CompletableFuture 2305 * @param fn the function to use to compute the value of 2306 * the returned CompletableFuture 2307 * @param executor the executor to use for asynchronous execution 2308 * @return the new CompletableFuture 2309 */ 2310 public <U> CompletableFuture<U> applyToEitherAsync 2311 (CompletableFuture<? extends T> other, 2312 Function<? super T, U> fn, 2313 Executor executor) { 2314 if (executor == null) throw new NullPointerException(); 2315 return doOrApply(other, fn, executor); 2316 } 2317 2318 private <U> CompletableFuture<U> doOrApply 2319 (CompletableFuture<? extends T> other, 2320 Function<? super T, U> fn, 2321 Executor e) { 2322 if (other == null || fn == null) throw new NullPointerException(); 2323 CompletableFuture<U> dst = new CompletableFuture<U>(); 2324 OrApplyCompletion<T,U> d = null; 2325 Object r; 2326 if ((r = result) == null && (r = other.result) == null) { 2327 d = new OrApplyCompletion<T,U>(this, other, fn, dst, e); 2328 CompletionNode q = null, p = new CompletionNode(d); 2329 while ((r = result) == null && (r = other.result) == null) { 2330 if (q != null) { 2331 if (UNSAFE.compareAndSwapObject 2332 (other, COMPLETIONS, q.next = other.completions, q)) 2333 break; 2334 } 2335 else if (UNSAFE.compareAndSwapObject 2336 (this, COMPLETIONS, p.next = completions, p)) 2337 q = new CompletionNode(d); 2338 } 2339 } 2340 if (r != null && (d == null || d.compareAndSet(0, 1))) { 2341 T t; Throwable ex; 2342 if (r instanceof AltResult) { 2343 ex = ((AltResult)r).ex; 2344 t = null; 2345 } 2346 else { 2347 ex = null; 2348 @SuppressWarnings("unchecked") T tr = (T) r; 2349 t = tr; 2350 } 2351 U u = null; 2352 if (ex == null) { 2353 try { 2354 if (e != null) 2355 e.execute(new AsyncApply<T,U>(t, fn, dst)); 2356 else 2357 u = fn.apply(t); 2358 } catch (Throwable rex) { 2359 ex = rex; 2360 } 2361 } 2362 if (e == null || ex != null) 2363 dst.internalComplete(u, ex); 2364 } 2365 helpPostComplete(); 2366 other.helpPostComplete(); 2367 return dst; 2368 } 2369 2370 /** 2371 * Creates and returns a CompletableFuture that is completed after 2372 * performing the given action with the result of either this or the 2373 * other given CompletableFuture's result, when either complete. 2374 * If this and/or the other CompletableFuture complete exceptionally, 2375 * then the returned CompletableFuture may also do so, with a 2376 * CompletionException holding one of these exceptions as its cause. 2377 * No guarantees are made about which exception is used in the 2378 * returned CompletableFuture. 2379 * 2380 * @param other the other CompletableFuture 2381 * @param block the action to perform before completing the 2382 * returned CompletableFuture 2383 * @return the new CompletableFuture 2384 */ 2385 public CompletableFuture<Void> acceptEither 2386 (CompletableFuture<? extends T> other, 2387 Consumer<? super T> block) { 2388 return doOrAccept(other, block, null); 2389 } 2390 2391 /** 2392 * Creates and returns a CompletableFuture that is completed 2393 * asynchronously using the {@link ForkJoinPool#commonPool()}, 2394 * performing the given action with the result of either this or 2395 * the other given CompletableFuture's result, when either complete. 2396 * If this and/or the other CompletableFuture complete exceptionally, 2397 * then the returned CompletableFuture may also do so, with a 2398 * CompletionException holding one of these exceptions as its cause. 2399 * No guarantees are made about which exception is used in the 2400 * returned CompletableFuture. 2401 * 2402 * @param other the other CompletableFuture 2403 * @param block the action to perform before completing the 2404 * returned CompletableFuture 2405 * @return the new CompletableFuture 2406 */ 2407 public CompletableFuture<Void> acceptEitherAsync 2408 (CompletableFuture<? extends T> other, 2409 Consumer<? super T> block) { 2410 return doOrAccept(other, block, ForkJoinPool.commonPool()); 2411 } 2412 2413 /** 2414 * Creates and returns a CompletableFuture that is completed 2415 * asynchronously using the given executor, performing the given 2416 * action with the result of either this or the other given 2417 * CompletableFuture's result, when either complete. 2418 * If this and/or the other CompletableFuture complete exceptionally, 2419 * then the returned CompletableFuture may also do so, with a 2420 * CompletionException holding one of these exceptions as its cause. 2421 * No guarantees are made about which exception is used in the 2422 * returned CompletableFuture. 2423 * 2424 * @param other the other CompletableFuture 2425 * @param block the action to perform before completing the 2426 * returned CompletableFuture 2427 * @param executor the executor to use for asynchronous execution 2428 * @return the new CompletableFuture 2429 */ 2430 public CompletableFuture<Void> acceptEitherAsync 2431 (CompletableFuture<? extends T> other, 2432 Consumer<? super T> block, 2433 Executor executor) { 2434 if (executor == null) throw new NullPointerException(); 2435 return doOrAccept(other, block, executor); 2436 } 2437 2438 private CompletableFuture<Void> doOrAccept 2439 (CompletableFuture<? extends T> other, 2440 Consumer<? super T> fn, 2441 Executor e) { 2442 if (other == null || fn == null) throw new NullPointerException(); 2443 CompletableFuture<Void> dst = new CompletableFuture<Void>(); 2444 OrAcceptCompletion<T> d = null; 2445 Object r; 2446 if ((r = result) == null && (r = other.result) == null) { 2447 d = new OrAcceptCompletion<T>(this, other, fn, dst, e); 2448 CompletionNode q = null, p = new CompletionNode(d); 2449 while ((r = result) == null && (r = other.result) == null) { 2450 if (q != null) { 2451 if (UNSAFE.compareAndSwapObject 2452 (other, COMPLETIONS, q.next = other.completions, q)) 2453 break; 2454 } 2455 else if (UNSAFE.compareAndSwapObject 2456 (this, COMPLETIONS, p.next = completions, p)) 2457 q = new CompletionNode(d); 2458 } 2459 } 2460 if (r != null && (d == null || d.compareAndSet(0, 1))) { 2461 T t; Throwable ex; 2462 if (r instanceof AltResult) { 2463 ex = ((AltResult)r).ex; 2464 t = null; 2465 } 2466 else { 2467 ex = null; 2468 @SuppressWarnings("unchecked") T tr = (T) r; 2469 t = tr; 2470 } 2471 if (ex == null) { 2472 try { 2473 if (e != null) 2474 e.execute(new AsyncAccept<T>(t, fn, dst)); 2475 else 2476 fn.accept(t); 2477 } catch (Throwable rex) { 2478 ex = rex; 2479 } 2480 } 2481 if (e == null || ex != null) 2482 dst.internalComplete(null, ex); 2483 } 2484 helpPostComplete(); 2485 other.helpPostComplete(); 2486 return dst; 2487 } 2488 2489 /** 2490 * Creates and returns a CompletableFuture that is completed 2491 * after this or the other given CompletableFuture complete. 2492 * If this and/or the other CompletableFuture complete exceptionally, 2493 * then the returned CompletableFuture may also do so, with a 2494 * CompletionException holding one of these exceptions as its cause. 2495 * No guarantees are made about which exception is used in the 2496 * returned CompletableFuture. 2497 * 2498 * @param other the other CompletableFuture 2499 * @param action the action to perform before completing the 2500 * returned CompletableFuture 2501 * @return the new CompletableFuture 2502 */ 2503 public CompletableFuture<Void> runAfterEither(CompletableFuture<?> other, 2504 Runnable action) { 2505 return doOrRun(other, action, null); 2506 } 2507 2508 /** 2509 * Creates and returns a CompletableFuture that is completed 2510 * asynchronously using the {@link ForkJoinPool#commonPool()} 2511 * after this or the other given CompletableFuture complete. 2512 * If this and/or the other CompletableFuture complete exceptionally, 2513 * then the returned CompletableFuture may also do so, with a 2514 * CompletionException holding one of these exceptions as its cause. 2515 * No guarantees are made about which exception is used in the 2516 * returned CompletableFuture. 2517 * 2518 * @param other the other CompletableFuture 2519 * @param action the action to perform before completing the 2520 * returned CompletableFuture 2521 * @return the new CompletableFuture 2522 */ 2523 public CompletableFuture<Void> runAfterEitherAsync 2524 (CompletableFuture<?> other, 2525 Runnable action) { 2526 return doOrRun(other, action, ForkJoinPool.commonPool()); 2527 } 2528 2529 /** 2530 * Creates and returns a CompletableFuture that is completed 2531 * asynchronously using the given executor after this or the other 2532 * given CompletableFuture complete. 2533 * If this and/or the other CompletableFuture complete exceptionally, 2534 * then the returned CompletableFuture may also do so, with a 2535 * CompletionException holding one of these exceptions as its cause. 2536 * No guarantees are made about which exception is used in the 2537 * returned CompletableFuture. 2538 * 2539 * @param other the other CompletableFuture 2540 * @param action the action to perform before completing the 2541 * returned CompletableFuture 2542 * @param executor the executor to use for asynchronous execution 2543 * @return the new CompletableFuture 2544 */ 2545 public CompletableFuture<Void> runAfterEitherAsync 2546 (CompletableFuture<?> other, 2547 Runnable action, 2548 Executor executor) { 2549 if (executor == null) throw new NullPointerException(); 2550 return doOrRun(other, action, executor); 2551 } 2552 2553 private CompletableFuture<Void> doOrRun(CompletableFuture<?> other, 2554 Runnable action, 2555 Executor e) { 2556 if (other == null || action == null) throw new NullPointerException(); 2557 CompletableFuture<Void> dst = new CompletableFuture<Void>(); 2558 OrRunCompletion<T> d = null; 2559 Object r; 2560 if ((r = result) == null && (r = other.result) == null) { 2561 d = new OrRunCompletion<T>(this, other, action, dst, e); 2562 CompletionNode q = null, p = new CompletionNode(d); 2563 while ((r = result) == null && (r = other.result) == null) { 2564 if (q != null) { 2565 if (UNSAFE.compareAndSwapObject 2566 (other, COMPLETIONS, q.next = other.completions, q)) 2567 break; 2568 } 2569 else if (UNSAFE.compareAndSwapObject 2570 (this, COMPLETIONS, p.next = completions, p)) 2571 q = new CompletionNode(d); 2572 } 2573 } 2574 if (r != null && (d == null || d.compareAndSet(0, 1))) { 2575 Throwable ex; 2576 if (r instanceof AltResult) 2577 ex = ((AltResult)r).ex; 2578 else 2579 ex = null; 2580 if (ex == null) { 2581 try { 2582 if (e != null) 2583 e.execute(new AsyncRun(action, dst)); 2584 else 2585 action.run(); 2586 } catch (Throwable rex) { 2587 ex = rex; 2588 } 2589 } 2590 if (e == null || ex != null) 2591 dst.internalComplete(null, ex); 2592 } 2593 helpPostComplete(); 2594 other.helpPostComplete(); 2595 return dst; 2596 } 2597 2598 /** 2599 * Returns a CompletableFuture (or an equivalent one) produced by 2600 * the given function of the result of this CompletableFuture when 2601 * completed. If this CompletableFuture completes exceptionally, 2602 * then the returned CompletableFuture also does so, with a 2603 * CompletionException holding this exception as its cause. 2604 * 2605 * @param fn the function returning a new CompletableFuture 2606 * @return the CompletableFuture, that {@code isDone()} upon 2607 * return if completed by the given function, or an exception 2608 * occurs 2609 */ 2610 public <U> CompletableFuture<U> thenCompose 2611 (Function<? super T, CompletableFuture<U>> fn) { 2612 return doCompose(fn, null); 2613 } 2614 2615 /** 2616 * Returns a CompletableFuture (or an equivalent one) produced 2617 * asynchronously using the {@link ForkJoinPool#commonPool()} by 2618 * the given function of the result of this CompletableFuture when 2619 * completed. If this CompletableFuture completes exceptionally, 2620 * then the returned CompletableFuture also does so, with a 2621 * CompletionException holding this exception as its cause. 2622 * 2623 * @param fn the function returning a new CompletableFuture 2624 * @return the CompletableFuture, that {@code isDone()} upon 2625 * return if completed by the given function, or an exception 2626 * occurs 2627 */ 2628 public <U> CompletableFuture<U> thenComposeAsync 2629 (Function<? super T, CompletableFuture<U>> fn) { 2630 return doCompose(fn, ForkJoinPool.commonPool()); 2631 } 2632 2633 /** 2634 * Returns a CompletableFuture (or an equivalent one) produced 2635 * asynchronously using the given executor by the given function 2636 * of the result of this CompletableFuture when completed. 2637 * If this CompletableFuture completes exceptionally, then the 2638 * returned CompletableFuture also does so, with a 2639 * CompletionException holding this exception as its cause. 2640 * 2641 * @param fn the function returning a new CompletableFuture 2642 * @param executor the executor to use for asynchronous execution 2643 * @return the CompletableFuture, that {@code isDone()} upon 2644 * return if completed by the given function, or an exception 2645 * occurs 2646 */ 2647 public <U> CompletableFuture<U> thenComposeAsync 2648 (Function<? super T, CompletableFuture<U>> fn, 2649 Executor executor) { 2650 if (executor == null) throw new NullPointerException(); 2651 return doCompose(fn, executor); 2652 } 2653 2654 private <U> CompletableFuture<U> doCompose 2655 (Function<? super T, CompletableFuture<U>> fn, 2656 Executor e) { 2657 if (fn == null) throw new NullPointerException(); 2658 CompletableFuture<U> dst = null; 2659 ComposeCompletion<T,U> d = null; 2660 Object r; 2661 if ((r = result) == null) { 2662 dst = new CompletableFuture<U>(); 2663 CompletionNode p = new CompletionNode 2664 (d = new ComposeCompletion<T,U>(this, fn, dst, e)); 2665 while ((r = result) == null) { 2666 if (UNSAFE.compareAndSwapObject 2667 (this, COMPLETIONS, p.next = completions, p)) 2668 break; 2669 } 2670 } 2671 if (r != null && (d == null || d.compareAndSet(0, 1))) { 2672 T t; Throwable ex; 2673 if (r instanceof AltResult) { 2674 ex = ((AltResult)r).ex; 2675 t = null; 2676 } 2677 else { 2678 ex = null; 2679 @SuppressWarnings("unchecked") T tr = (T) r; 2680 t = tr; 2681 } 2682 if (ex == null) { 2683 if (e != null) { 2684 if (dst == null) 2685 dst = new CompletableFuture<U>(); 2686 e.execute(new AsyncCompose<T,U>(t, fn, dst)); 2687 } 2688 else { 2689 try { 2690 dst = fn.apply(t); 2691 } catch (Throwable rex) { 2692 ex = rex; 2693 } 2694 if (dst == null) { 2695 dst = new CompletableFuture<U>(); 2696 if (ex == null) 2697 ex = new NullPointerException(); 2698 } 2699 } 2700 } 2701 if (e == null && ex != null) 2702 dst.internalComplete(null, ex); 2703 } 2704 helpPostComplete(); 2705 dst.helpPostComplete(); 2706 return dst; 2707 } 2708 2709 /** 2710 * Creates and returns a CompletableFuture that is completed with 2711 * the result of the given function of the exception triggering 2712 * this CompletableFuture's completion when it completes 2713 * exceptionally; Otherwise, if this CompletableFuture completes 2714 * normally, then the returned CompletableFuture also completes 2715 * normally with the same value. 2716 * 2717 * @param fn the function to use to compute the value of the 2718 * returned CompletableFuture if this CompletableFuture completed 2719 * exceptionally 2720 * @return the new CompletableFuture 2721 */ 2722 public CompletableFuture<T> exceptionally 2723 (Function<Throwable, ? extends T> fn) { 2724 if (fn == null) throw new NullPointerException(); 2725 CompletableFuture<T> dst = new CompletableFuture<T>(); 2726 ExceptionCompletion<T> d = null; 2727 Object r; 2728 if ((r = result) == null) { 2729 CompletionNode p = 2730 new CompletionNode(d = new ExceptionCompletion<T>(this, fn, dst)); 2731 while ((r = result) == null) { 2732 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, 2733 p.next = completions, p)) 2734 break; 2735 } 2736 } 2737 if (r != null && (d == null || d.compareAndSet(0, 1))) { 2738 T t = null; Throwable ex, dx = null; 2739 if (r instanceof AltResult) { 2740 if ((ex = ((AltResult)r).ex) != null) { 2741 try { 2742 t = fn.apply(ex); 2743 } catch (Throwable rex) { 2744 dx = rex; 2745 } 2746 } 2747 } 2748 else { 2749 @SuppressWarnings("unchecked") T tr = (T) r; 2750 t = tr; 2751 } 2752 dst.internalComplete(t, dx); 2753 } 2754 helpPostComplete(); 2755 return dst; 2756 } 2757 2758 /** 2759 * Creates and returns a CompletableFuture that is completed with 2760 * the result of the given function of the result and exception of 2761 * this CompletableFuture's completion when it completes. The 2762 * given function is invoked with the result (or {@code null} if 2763 * none) and the exception (or {@code null} if none) of this 2764 * CompletableFuture when complete. 2765 * 2766 * @param fn the function to use to compute the value of the 2767 * returned CompletableFuture 2768 * @return the new CompletableFuture 2769 */ 2770 public <U> CompletableFuture<U> handle 2771 (BiFunction<? super T, Throwable, ? extends U> fn) { 2772 if (fn == null) throw new NullPointerException(); 2773 CompletableFuture<U> dst = new CompletableFuture<U>(); 2774 HandleCompletion<T,U> d = null; 2775 Object r; 2776 if ((r = result) == null) { 2777 CompletionNode p = 2778 new CompletionNode(d = new HandleCompletion<T,U>(this, fn, dst)); 2779 while ((r = result) == null) { 2780 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, 2781 p.next = completions, p)) 2782 break; 2783 } 2784 } 2785 if (r != null && (d == null || d.compareAndSet(0, 1))) { 2786 T t; Throwable ex; 2787 if (r instanceof AltResult) { 2788 ex = ((AltResult)r).ex; 2789 t = null; 2790 } 2791 else { 2792 ex = null; 2793 @SuppressWarnings("unchecked") T tr = (T) r; 2794 t = tr; 2795 } 2796 U u; Throwable dx; 2797 try { 2798 u = fn.apply(t, ex); 2799 dx = null; 2800 } catch (Throwable rex) { 2801 dx = rex; 2802 u = null; 2803 } 2804 dst.internalComplete(u, dx); 2805 } 2806 helpPostComplete(); 2807 return dst; 2808 } 2809 2810 2811 /* ------------- Arbitrary-arity constructions -------------- */ 2812 2813 /* 2814 * The basic plan of attack is to recursively form binary 2815 * completion trees of elements. This can be overkill for small 2816 * sets, but scales nicely. The And/All vs Or/Any forms use the 2817 * same idea, but details differ. 2818 */ 2819 2820 /** 2821 * Returns a new CompletableFuture that is completed when all of 2822 * the given CompletableFutures complete. If any of the component 2823 * CompletableFutures complete exceptionally, then so does the 2824 * returned CompletableFuture. Otherwise, the results, if any, of 2825 * the component CompletableFutures are not reflected in the 2826 * returned CompletableFuture, but may be obtained by inspecting 2827 * them individually. If the number of components is zero, returns 2828 * a CompletableFuture completed with the value {@code null}. 2829 * 2830 * <p>Among the applications of this method is to await completion 2831 * of a set of independent CompletableFutures before continuing a 2832 * program, as in: {@code CompletableFuture.allOf(c1, c2, 2833 * c3).join();}. 2834 * 2835 * @param cfs the CompletableFutures 2836 * @return a CompletableFuture that is complete when all of the 2837 * given CompletableFutures complete 2838 * @throws NullPointerException if the array or any of its elements are 2839 * {@code null} 2840 */ 2841 public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) { 2842 int len = cfs.length; // Directly handle empty and singleton cases 2843 if (len > 1) 2844 return allTree(cfs, 0, len - 1); 2845 else { 2846 CompletableFuture<Void> dst = new CompletableFuture<Void>(); 2847 CompletableFuture<?> f; 2848 if (len == 0) 2849 dst.result = NIL; 2850 else if ((f = cfs[0]) == null) 2851 throw new NullPointerException(); 2852 else { 2853 ThenCopy d = null; 2854 CompletionNode p = null; 2855 Object r; 2856 while ((r = f.result) == null) { 2857 if (d == null) 2858 d = new ThenCopy(f, dst); 2859 else if (p == null) 2860 p = new CompletionNode(d); 2861 else if (UNSAFE.compareAndSwapObject 2862 (f, COMPLETIONS, p.next = f.completions, p)) 2863 break; 2864 } 2865 if (r != null && (d == null || d.compareAndSet(0, 1))) 2866 dst.internalComplete(null, (r instanceof AltResult) ? 2867 ((AltResult)r).ex : null); 2868 f.helpPostComplete(); 2869 } 2870 return dst; 2871 } 2872 } 2873 2874 /** 2875 * Recursively constructs an And'ed tree of CompletableFutures. 2876 * Called only when array known to have at least two elements. 2877 */ 2878 private static CompletableFuture<Void> allTree(CompletableFuture<?>[] cfs, 2879 int lo, int hi) { 2880 CompletableFuture<?> fst, snd; 2881 int mid = (lo + hi) >>> 1; 2882 if ((fst = (lo == mid ? cfs[lo] : allTree(cfs, lo, mid))) == null || 2883 (snd = (hi == mid+1 ? cfs[hi] : allTree(cfs, mid+1, hi))) == null) 2884 throw new NullPointerException(); 2885 CompletableFuture<Void> dst = new CompletableFuture<Void>(); 2886 AndCompletion d = null; 2887 CompletionNode p = null, q = null; 2888 Object r = null, s = null; 2889 while ((r = fst.result) == null || (s = snd.result) == null) { 2890 if (d == null) 2891 d = new AndCompletion(fst, snd, dst); 2892 else if (p == null) 2893 p = new CompletionNode(d); 2894 else if (q == null) { 2895 if (UNSAFE.compareAndSwapObject 2896 (fst, COMPLETIONS, p.next = fst.completions, p)) 2897 q = new CompletionNode(d); 2898 } 2899 else if (UNSAFE.compareAndSwapObject 2900 (snd, COMPLETIONS, q.next = snd.completions, q)) 2901 break; 2902 } 2903 if ((r != null || (r = fst.result) != null) && 2904 (s != null || (s = snd.result) != null) && 2905 (d == null || d.compareAndSet(0, 1))) { 2906 Throwable ex; 2907 if (r instanceof AltResult) 2908 ex = ((AltResult)r).ex; 2909 else 2910 ex = null; 2911 if (ex == null && (s instanceof AltResult)) 2912 ex = ((AltResult)s).ex; 2913 dst.internalComplete(null, ex); 2914 } 2915 fst.helpPostComplete(); 2916 snd.helpPostComplete(); 2917 return dst; 2918 } 2919 2920 /** 2921 * Returns a new CompletableFuture that is completed when any of 2922 * the component CompletableFutures complete; with the same result if 2923 * it completed normally, otherwise exceptionally. If the number 2924 * of components is zero, returns an incomplete CompletableFuture. 2925 * 2926 * @param cfs the CompletableFutures 2927 * @return a CompletableFuture that is complete when any of the 2928 * given CompletableFutures complete 2929 * @throws NullPointerException if the array or any of its elements are 2930 * {@code null} 2931 */ 2932 public static CompletableFuture<?> anyOf(CompletableFuture<?>... cfs) { 2933 int len = cfs.length; // Same idea as allOf 2934 if (len > 1) 2935 return anyTree(cfs, 0, len - 1); 2936 else { 2937 CompletableFuture<?> dst = new CompletableFuture<Object>(); 2938 CompletableFuture<?> f; 2939 if (len == 0) 2940 ; // skip 2941 else if ((f = cfs[0]) == null) 2942 throw new NullPointerException(); 2943 else { 2944 ThenCopy d = null; 2945 CompletionNode p = null; 2946 Object r; 2947 while ((r = f.result) == null) { 2948 if (d == null) 2949 d = new ThenCopy(f, dst); 2950 else if (p == null) 2951 p = new CompletionNode(d); 2952 else if (UNSAFE.compareAndSwapObject 2953 (f, COMPLETIONS, p.next = f.completions, p)) 2954 break; 2955 } 2956 if (r != null && (d == null || d.compareAndSet(0, 1))) { 2957 Throwable ex; Object t; 2958 if (r instanceof AltResult) { 2959 ex = ((AltResult)r).ex; 2960 t = null; 2961 } 2962 else { 2963 ex = null; 2964 t = r; 2965 } 2966 dst.internalComplete(t, ex); 2967 } 2968 f.helpPostComplete(); 2969 } 2970 return dst; 2971 } 2972 } 2973 2974 /** 2975 * Recursively constructs an Or'ed tree of CompletableFutures. 2976 */ 2977 private static CompletableFuture<?> anyTree(CompletableFuture<?>[] cfs, 2978 int lo, int hi) { 2979 CompletableFuture<?> fst, snd; 2980 int mid = (lo + hi) >>> 1; 2981 if ((fst = (lo == mid ? cfs[lo] : anyTree(cfs, lo, mid))) == null || 2982 (snd = (hi == mid+1 ? cfs[hi] : anyTree(cfs, mid+1, hi))) == null) 2983 throw new NullPointerException(); 2984 CompletableFuture<?> dst = new CompletableFuture<Object>(); 2985 OrCompletion d = null; 2986 CompletionNode p = null, q = null; 2987 Object r; 2988 while ((r = fst.result) == null && (r = snd.result) == null) { 2989 if (d == null) 2990 d = new OrCompletion(fst, snd, dst); 2991 else if (p == null) 2992 p = new CompletionNode(d); 2993 else if (q == null) { 2994 if (UNSAFE.compareAndSwapObject 2995 (fst, COMPLETIONS, p.next = fst.completions, p)) 2996 q = new CompletionNode(d); 2997 } 2998 else if (UNSAFE.compareAndSwapObject 2999 (snd, COMPLETIONS, q.next = snd.completions, q)) 3000 break; 3001 } 3002 if ((r != null || (r = fst.result) != null || 3003 (r = snd.result) != null) && 3004 (d == null || d.compareAndSet(0, 1))) { 3005 Throwable ex; Object t; 3006 if (r instanceof AltResult) { 3007 ex = ((AltResult)r).ex; 3008 t = null; 3009 } 3010 else { 3011 ex = null; 3012 t = r; 3013 } 3014 dst.internalComplete(t, ex); 3015 } 3016 fst.helpPostComplete(); 3017 snd.helpPostComplete(); 3018 return dst; 3019 } 3020 3021 3022 /* ------------- Control and status methods -------------- */ 3023 3024 /** 3025 * If not already completed, completes this CompletableFuture with 3026 * a {@link CancellationException}. Dependent CompletableFutures 3027 * that have not already completed will also complete 3028 * exceptionally, with a {@link CompletionException} caused by 3029 * this {@code CancellationException}. 3030 * 3031 * @param mayInterruptIfRunning this value has no effect in this 3032 * implementation because interrupts are not used to control 3033 * processing. 3034 * 3035 * @return {@code true} if this task is now cancelled 3036 */ 3037 public boolean cancel(boolean mayInterruptIfRunning) { 3038 boolean cancelled = (result == null) && 3039 UNSAFE.compareAndSwapObject 3040 (this, RESULT, null, new AltResult(new CancellationException())); 3041 postComplete(); 3042 return cancelled || isCancelled(); 3043 } 3044 3045 /** 3046 * Returns {@code true} if this CompletableFuture was cancelled 3047 * before it completed normally. 3048 * 3049 * @return {@code true} if this CompletableFuture was cancelled 3050 * before it completed normally 3051 */ 3052 public boolean isCancelled() { 3053 Object r; 3054 return ((r = result) instanceof AltResult) && 3055 (((AltResult)r).ex instanceof CancellationException); 3056 } 3057 3058 /** 3059 * Forcibly sets or resets the value subsequently returned by 3060 * method {@link #get()} and related methods, whether or not 3061 * already completed. This method is designed for use only in 3062 * error recovery actions, and even in such situations may result 3063 * in ongoing dependent completions using established versus 3064 * overwritten outcomes. 3065 * 3066 * @param value the completion value 3067 */ 3068 public void obtrudeValue(T value) { 3069 result = (value == null) ? NIL : value; 3070 postComplete(); 3071 } 3072 3073 /** 3074 * Forcibly causes subsequent invocations of method {@link #get()} 3075 * and related methods to throw the given exception, whether or 3076 * not already completed. This method is designed for use only in 3077 * recovery actions, and even in such situations may result in 3078 * ongoing dependent completions using established versus 3079 * overwritten outcomes. 3080 * 3081 * @param ex the exception 3082 */ 3083 public void obtrudeException(Throwable ex) { 3084 if (ex == null) throw new NullPointerException(); 3085 result = new AltResult(ex); 3086 postComplete(); 3087 } 3088 3089 /** 3090 * Returns the estimated number of CompletableFutures whose 3091 * completions are awaiting completion of this CompletableFuture. 3092 * This method is designed for use in monitoring system state, not 3093 * for synchronization control. 3094 * 3095 * @return the number of dependent CompletableFutures 3096 */ 3097 public int getNumberOfDependents() { 3098 int count = 0; 3099 for (CompletionNode p = completions; p != null; p = p.next) 3100 ++count; 3101 return count; 3102 } 3103 3104 /** 3105 * Returns a string identifying this CompletableFuture, as well as 3106 * its completion state. The state, in brackets, contains the 3107 * String {@code "Completed Normally"} or the String {@code 3108 * "Completed Exceptionally"}, or the String {@code "Not 3109 * completed"} followed by the number of CompletableFutures 3110 * dependent upon its completion, if any. 3111 * 3112 * @return a string identifying this CompletableFuture, as well as its state 3113 */ 3114 public String toString() { 3115 Object r = result; 3116 int count; 3117 return super.toString() + 3118 ((r == null) ? 3119 (((count = getNumberOfDependents()) == 0) ? 3120 "[Not completed]" : 3121 "[Not completed, " + count + " dependents]") : 3122 (((r instanceof AltResult) && ((AltResult)r).ex != null) ? 3123 "[Completed exceptionally]" : 3124 "[Completed normally]")); 3125 } 3126 3127 // Unsafe mechanics 3128 private static final sun.misc.Unsafe UNSAFE; 3129 private static final long RESULT; 3130 private static final long WAITERS; 3131 private static final long COMPLETIONS; 3132 static { 3133 try { 3134 UNSAFE = sun.misc.Unsafe.getUnsafe(); 3135 Class<?> k = CompletableFuture.class; 3136 RESULT = UNSAFE.objectFieldOffset 3137 (k.getDeclaredField("result")); 3138 WAITERS = UNSAFE.objectFieldOffset 3139 (k.getDeclaredField("waiters")); 3140 COMPLETIONS = UNSAFE.objectFieldOffset 3141 (k.getDeclaredField("completions")); 3142 } catch (Exception e) { 3143 throw new Error(e); 3144 } 3145 } 3146 }