1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent; 37 import java.util.function.Supplier; 38 import java.util.function.Consumer; 39 import java.util.function.BiConsumer; 40 import java.util.function.Function; 41 import java.util.function.BiFunction; 42 import java.util.concurrent.Future; 43 import java.util.concurrent.TimeUnit; 44 import java.util.concurrent.ForkJoinPool; 45 import java.util.concurrent.ForkJoinTask; 46 import java.util.concurrent.Executor; 47 import java.util.concurrent.ThreadLocalRandom; 48 import java.util.concurrent.ExecutionException; 49 import java.util.concurrent.TimeoutException; 50 import java.util.concurrent.CancellationException; 51 import java.util.concurrent.atomic.AtomicInteger; 52 import java.util.concurrent.locks.LockSupport; 53 54 /** 55 * A {@link Future} that may be explicitly completed (setting its 56 * value and status), and may include dependent functions and actions 57 * that trigger upon its completion. 58 * 59 * <p>When two or more threads attempt to 60 * {@link #complete complete}, 61 * {@link #completeExceptionally completeExceptionally}, or 62 * {@link #cancel cancel} 63 * a CompletableFuture, only one of them succeeds. 64 * 65 * <p>Methods are available for adding dependents based on 66 * user-provided Functions, Consumers, or Runnables. The appropriate 67 * form to use depends on whether actions require arguments and/or 68 * produce results. Completion of a dependent action will trigger the 69 * completion of another CompletableFuture. Actions may also be 70 * triggered after either or both the current and another 71 * CompletableFuture complete. Multiple CompletableFutures may also 72 * be grouped as one using {@link #anyOf(CompletableFuture...)} and 73 * {@link #allOf(CompletableFuture...)}. 74 * 75 * <p>CompletableFutures themselves do not execute asynchronously. 76 * However, actions supplied for dependent completions of another 77 * CompletableFuture may do so, depending on whether they are provided 78 * via one of the <em>async</em> methods (that is, methods with names 79 * of the form <tt><var>xxx</var>Async</tt>). The <em>async</em> 80 * methods provide a way to commence asynchronous processing of an 81 * action using either a given {@link Executor} or by default the 82 * {@link ForkJoinPool#commonPool()}. To simplify monitoring, 83 * debugging, and tracking, all generated asynchronous tasks are 84 * instances of the marker interface {@link AsynchronousCompletionTask}. 85 * 86 * <p>Actions supplied for dependent completions of <em>non-async</em> 87 * methods may be performed by the thread that completes the current 88 * CompletableFuture, or by any other caller of these methods. There 89 * are no guarantees about the order of processing completions unless 90 * constrained by these methods. 91 * 92 * <p>Since (unlike {@link FutureTask}) this class has no direct 93 * control over the computation that causes it to be completed, 94 * cancellation is treated as just another form of exceptional completion. 95 * Method {@link #cancel cancel} has the same effect as 96 * {@code completeExceptionally(new CancellationException())}. 97 * 98 * <p>Upon exceptional completion (including cancellation), or when a 99 * completion entails an additional computation which terminates 100 * abruptly with an (unchecked) exception or error, then all of their 101 * dependent completions (and their dependents in turn) generally act 102 * as {@code completeExceptionally} with a {@link CompletionException} 103 * holding that exception as its cause. However, the {@link 104 * #exceptionally exceptionally} and {@link #handle handle} 105 * completions <em>are</em> able to handle exceptional completions of 106 * the CompletableFutures they depend on. 107 * 108 * <p>In case of exceptional completion with a CompletionException, 109 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an 110 * {@link ExecutionException} with the same cause as held in the 111 * corresponding CompletionException. However, in these cases, 112 * methods {@link #join()} and {@link #getNow} throw the 113 * CompletionException, which simplifies usage. 114 * 115 * <p>Arguments used to pass a completion result (that is, for parameters 116 * of type {@code T}) may be null, but passing a null value for any other 117 * parameter will result in a {@link NullPointerException} being thrown. 118 * 119 * @author Doug Lea 120 * @since 1.8 121 */ 122 public class CompletableFuture<T> implements Future<T> { 123 124 /* 125 * Overview: 126 * 127 * 1. Non-nullness of field result (set via CAS) indicates done. 128 * An AltResult is used to box null as a result, as well as to 129 * hold exceptions. Using a single field makes completion fast 130 * and simple to detect and trigger, at the expense of a lot of 131 * encoding and decoding that infiltrates many methods. One minor 132 * simplification relies on the (static) NIL (to box null results) 133 * being the only AltResult with a null exception field, so we 134 * don't usually need explicit comparisons with NIL. The CF 135 * exception propagation mechanics surrounding decoding rely on 136 * unchecked casts of decoded results really being unchecked, 137 * where user type errors are caught at point of use, as is 138 * currently the case in Java. These are highlighted by using 139 * SuppressWarnings-annotated temporaries. 140 * 141 * 2. Waiters are held in a Treiber stack similar to the one used 142 * in FutureTask, Phaser, and SynchronousQueue. See their 143 * internal documentation for algorithmic details. 144 * 145 * 3. Completions are also kept in a list/stack, and pulled off 146 * and run when completion is triggered. (We could even use the 147 * same stack as for waiters, but would give up the potential 148 * parallelism obtained because woken waiters help release/run 149 * others -- see method postComplete). Because post-processing 150 * may race with direct calls, class Completion opportunistically 151 * extends AtomicInteger so callers can claim the action via 152 * compareAndSet(0, 1). The Completion.run methods are all 153 * written a boringly similar uniform way (that sometimes includes 154 * unnecessary-looking checks, kept to maintain uniformity). 155 * There are enough dimensions upon which they differ that 156 * attempts to factor commonalities while maintaining efficiency 157 * require more lines of code than they would save. 158 * 159 * 4. The exported then/and/or methods do support a bit of 160 * factoring (see doThenApply etc). They must cope with the 161 * intrinsic races surrounding addition of a dependent action 162 * versus performing the action directly because the task is 163 * already complete. For example, a CF may not be complete upon 164 * entry, so a dependent completion is added, but by the time it 165 * is added, the target CF is complete, so must be directly 166 * executed. This is all done while avoiding unnecessary object 167 * construction in safe-bypass cases. 168 */ 169 170 // preliminaries 171 172 static final class AltResult { 173 final Throwable ex; // null only for NIL 174 AltResult(Throwable ex) { this.ex = ex; } 175 } 176 177 static final AltResult NIL = new AltResult(null); 178 179 // Fields 180 181 volatile Object result; // Either the result or boxed AltResult 182 volatile WaitNode waiters; // Treiber stack of threads blocked on get() 183 volatile CompletionNode completions; // list (Treiber stack) of completions 184 185 // Basic utilities for triggering and processing completions 186 187 /** 188 * Removes and signals all waiting threads and runs all completions. 189 */ 190 final void postComplete() { 191 WaitNode q; Thread t; 192 while ((q = waiters) != null) { 193 if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) && 194 (t = q.thread) != null) { 195 q.thread = null; 196 LockSupport.unpark(t); 197 } 198 } 199 200 CompletionNode h; Completion c; 201 while ((h = completions) != null) { 202 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) && 203 (c = h.completion) != null) 204 c.run(); 205 } 206 } 207 208 /** 209 * Triggers completion with the encoding of the given arguments: 210 * if the exception is non-null, encodes it as a wrapped 211 * CompletionException unless it is one already. Otherwise uses 212 * the given result, boxed as NIL if null. 213 */ 214 final void internalComplete(T v, Throwable ex) { 215 if (result == null) 216 UNSAFE.compareAndSwapObject 217 (this, RESULT, null, 218 (ex == null) ? (v == null) ? NIL : v : 219 new AltResult((ex instanceof CompletionException) ? ex : 220 new CompletionException(ex))); 221 postComplete(); // help out even if not triggered 222 } 223 224 /** 225 * If triggered, helps release and/or process completions. 226 */ 227 final void helpPostComplete() { 228 if (result != null) 229 postComplete(); 230 } 231 232 /* ------------- waiting for completions -------------- */ 233 234 /** Number of processors, for spin control */ 235 static final int NCPU = Runtime.getRuntime().availableProcessors(); 236 237 /** 238 * Heuristic spin value for waitingGet() before blocking on 239 * multiprocessors 240 */ 241 static final int SPINS = (NCPU > 1) ? 1 << 8 : 0; 242 243 /** 244 * Linked nodes to record waiting threads in a Treiber stack. See 245 * other classes such as Phaser and SynchronousQueue for more 246 * detailed explanation. This class implements ManagedBlocker to 247 * avoid starvation when blocking actions pile up in 248 * ForkJoinPools. 249 */ 250 static final class WaitNode implements ForkJoinPool.ManagedBlocker { 251 long nanos; // wait time if timed 252 final long deadline; // non-zero if timed 253 volatile int interruptControl; // > 0: interruptible, < 0: interrupted 254 volatile Thread thread; 255 volatile WaitNode next; 256 WaitNode(boolean interruptible, long nanos, long deadline) { 257 this.thread = Thread.currentThread(); 258 this.interruptControl = interruptible ? 1 : 0; 259 this.nanos = nanos; 260 this.deadline = deadline; 261 } 262 public boolean isReleasable() { 263 if (thread == null) 264 return true; 265 if (Thread.interrupted()) { 266 int i = interruptControl; 267 interruptControl = -1; 268 if (i > 0) 269 return true; 270 } 271 if (deadline != 0L && 272 (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) { 273 thread = null; 274 return true; 275 } 276 return false; 277 } 278 public boolean block() { 279 if (isReleasable()) 280 return true; 281 else if (deadline == 0L) 282 LockSupport.park(this); 283 else if (nanos > 0L) 284 LockSupport.parkNanos(this, nanos); 285 return isReleasable(); 286 } 287 } 288 289 /** 290 * Returns raw result after waiting, or null if interruptible and 291 * interrupted. 292 */ 293 private Object waitingGet(boolean interruptible) { 294 WaitNode q = null; 295 boolean queued = false; 296 int spins = SPINS; 297 for (Object r;;) { 298 if ((r = result) != null) { 299 if (q != null) { // suppress unpark 300 q.thread = null; 301 if (q.interruptControl < 0) { 302 if (interruptible) { 303 removeWaiter(q); 304 return null; 305 } 306 Thread.currentThread().interrupt(); 307 } 308 } 309 postComplete(); // help release others 310 return r; 311 } 312 else if (spins > 0) { 313 int rnd = ThreadLocalRandom.nextSecondarySeed(); 314 if (rnd == 0) 315 rnd = ThreadLocalRandom.current().nextInt(); 316 if (rnd >= 0) 317 --spins; 318 } 319 else if (q == null) 320 q = new WaitNode(interruptible, 0L, 0L); 321 else if (!queued) 322 queued = UNSAFE.compareAndSwapObject(this, WAITERS, 323 q.next = waiters, q); 324 else if (interruptible && q.interruptControl < 0) { 325 removeWaiter(q); 326 return null; 327 } 328 else if (q.thread != null && result == null) { 329 try { 330 ForkJoinPool.managedBlock(q); 331 } catch (InterruptedException ex) { 332 q.interruptControl = -1; 333 } 334 } 335 } 336 } 337 338 /** 339 * Awaits completion or aborts on interrupt or timeout. 340 * 341 * @param nanos time to wait 342 * @return raw result 343 */ 344 private Object timedAwaitDone(long nanos) 345 throws InterruptedException, TimeoutException { 346 WaitNode q = null; 347 boolean queued = false; 348 for (Object r;;) { 349 if ((r = result) != null) { 350 if (q != null) { 351 q.thread = null; 352 if (q.interruptControl < 0) { 353 removeWaiter(q); 354 throw new InterruptedException(); 355 } 356 } 357 postComplete(); 358 return r; 359 } 360 else if (q == null) { 361 if (nanos <= 0L) 362 throw new TimeoutException(); 363 long d = System.nanoTime() + nanos; 364 q = new WaitNode(true, nanos, d == 0L ? 1L : d); // avoid 0 365 } 366 else if (!queued) 367 queued = UNSAFE.compareAndSwapObject(this, WAITERS, 368 q.next = waiters, q); 369 else if (q.interruptControl < 0) { 370 removeWaiter(q); 371 throw new InterruptedException(); 372 } 373 else if (q.nanos <= 0L) { 374 if (result == null) { 375 removeWaiter(q); 376 throw new TimeoutException(); 377 } 378 } 379 else if (q.thread != null && result == null) { 380 try { 381 ForkJoinPool.managedBlock(q); 382 } catch (InterruptedException ex) { 383 q.interruptControl = -1; 384 } 385 } 386 } 387 } 388 389 /** 390 * Tries to unlink a timed-out or interrupted wait node to avoid 391 * accumulating garbage. Internal nodes are simply unspliced 392 * without CAS since it is harmless if they are traversed anyway 393 * by releasers. To avoid effects of unsplicing from already 394 * removed nodes, the list is retraversed in case of an apparent 395 * race. This is slow when there are a lot of nodes, but we don't 396 * expect lists to be long enough to outweigh higher-overhead 397 * schemes. 398 */ 399 private void removeWaiter(WaitNode node) { 400 if (node != null) { 401 node.thread = null; 402 retry: 403 for (;;) { // restart on removeWaiter race 404 for (WaitNode pred = null, q = waiters, s; q != null; q = s) { 405 s = q.next; 406 if (q.thread != null) 407 pred = q; 408 else if (pred != null) { 409 pred.next = s; 410 if (pred.thread == null) // check for race 411 continue retry; 412 } 413 else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s)) 414 continue retry; 415 } 416 break; 417 } 418 } 419 } 420 421 /* ------------- Async tasks -------------- */ 422 423 /** 424 * A marker interface identifying asynchronous tasks produced by 425 * {@code async} methods. This may be useful for monitoring, 426 * debugging, and tracking asynchronous activities. 427 * 428 * @since 1.8 429 */ 430 public static interface AsynchronousCompletionTask { 431 } 432 433 /** Base class can act as either FJ or plain Runnable */ 434 abstract static class Async extends ForkJoinTask<Void> 435 implements Runnable, AsynchronousCompletionTask { 436 public final Void getRawResult() { return null; } 437 public final void setRawResult(Void v) { } 438 public final void run() { exec(); } 439 } 440 441 static final class AsyncRun extends Async { 442 final Runnable fn; 443 final CompletableFuture<Void> dst; 444 AsyncRun(Runnable fn, CompletableFuture<Void> dst) { 445 this.fn = fn; this.dst = dst; 446 } 447 public final boolean exec() { 448 CompletableFuture<Void> d; Throwable ex; 449 if ((d = this.dst) != null && d.result == null) { 450 try { 451 fn.run(); 452 ex = null; 453 } catch (Throwable rex) { 454 ex = rex; 455 } 456 d.internalComplete(null, ex); 457 } 458 return true; 459 } 460 private static final long serialVersionUID = 5232453952276885070L; 461 } 462 463 static final class AsyncSupply<U> extends Async { 464 final Supplier<U> fn; 465 final CompletableFuture<U> dst; 466 AsyncSupply(Supplier<U> fn, CompletableFuture<U> dst) { 467 this.fn = fn; this.dst = dst; 468 } 469 public final boolean exec() { 470 CompletableFuture<U> d; U u; Throwable ex; 471 if ((d = this.dst) != null && d.result == null) { 472 try { 473 u = fn.get(); 474 ex = null; 475 } catch (Throwable rex) { 476 ex = rex; 477 u = null; 478 } 479 d.internalComplete(u, ex); 480 } 481 return true; 482 } 483 private static final long serialVersionUID = 5232453952276885070L; 484 } 485 486 static final class AsyncApply<T,U> extends Async { 487 final T arg; 488 final Function<? super T,? extends U> fn; 489 final CompletableFuture<U> dst; 490 AsyncApply(T arg, Function<? super T,? extends U> fn, 491 CompletableFuture<U> dst) { 492 this.arg = arg; this.fn = fn; this.dst = dst; 493 } 494 public final boolean exec() { 495 CompletableFuture<U> d; U u; Throwable ex; 496 if ((d = this.dst) != null && d.result == null) { 497 try { 498 u = fn.apply(arg); 499 ex = null; 500 } catch (Throwable rex) { 501 ex = rex; 502 u = null; 503 } 504 d.internalComplete(u, ex); 505 } 506 return true; 507 } 508 private static final long serialVersionUID = 5232453952276885070L; 509 } 510 511 static final class AsyncBiApply<T,U,V> extends Async { 512 final T arg1; 513 final U arg2; 514 final BiFunction<? super T,? super U,? extends V> fn; 515 final CompletableFuture<V> dst; 516 AsyncBiApply(T arg1, U arg2, 517 BiFunction<? super T,? super U,? extends V> fn, 518 CompletableFuture<V> dst) { 519 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst; 520 } 521 public final boolean exec() { 522 CompletableFuture<V> d; V v; Throwable ex; 523 if ((d = this.dst) != null && d.result == null) { 524 try { 525 v = fn.apply(arg1, arg2); 526 ex = null; 527 } catch (Throwable rex) { 528 ex = rex; 529 v = null; 530 } 531 d.internalComplete(v, ex); 532 } 533 return true; 534 } 535 private static final long serialVersionUID = 5232453952276885070L; 536 } 537 538 static final class AsyncAccept<T> extends Async { 539 final T arg; 540 final Consumer<? super T> fn; 541 final CompletableFuture<Void> dst; 542 AsyncAccept(T arg, Consumer<? super T> fn, 543 CompletableFuture<Void> dst) { 544 this.arg = arg; this.fn = fn; this.dst = dst; 545 } 546 public final boolean exec() { 547 CompletableFuture<Void> d; Throwable ex; 548 if ((d = this.dst) != null && d.result == null) { 549 try { 550 fn.accept(arg); 551 ex = null; 552 } catch (Throwable rex) { 553 ex = rex; 554 } 555 d.internalComplete(null, ex); 556 } 557 return true; 558 } 559 private static final long serialVersionUID = 5232453952276885070L; 560 } 561 562 static final class AsyncBiAccept<T,U> extends Async { 563 final T arg1; 564 final U arg2; 565 final BiConsumer<? super T,? super U> fn; 566 final CompletableFuture<Void> dst; 567 AsyncBiAccept(T arg1, U arg2, 568 BiConsumer<? super T,? super U> fn, 569 CompletableFuture<Void> dst) { 570 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst; 571 } 572 public final boolean exec() { 573 CompletableFuture<Void> d; Throwable ex; 574 if ((d = this.dst) != null && d.result == null) { 575 try { 576 fn.accept(arg1, arg2); 577 ex = null; 578 } catch (Throwable rex) { 579 ex = rex; 580 } 581 d.internalComplete(null, ex); 582 } 583 return true; 584 } 585 private static final long serialVersionUID = 5232453952276885070L; 586 } 587 588 static final class AsyncCompose<T,U> extends Async { 589 final T arg; 590 final Function<? super T, CompletableFuture<U>> fn; 591 final CompletableFuture<U> dst; 592 AsyncCompose(T arg, 593 Function<? super T, CompletableFuture<U>> fn, 594 CompletableFuture<U> dst) { 595 this.arg = arg; this.fn = fn; this.dst = dst; 596 } 597 public final boolean exec() { 598 CompletableFuture<U> d, fr; U u; Throwable ex; 599 if ((d = this.dst) != null && d.result == null) { 600 try { 601 fr = fn.apply(arg); 602 ex = (fr == null) ? new NullPointerException() : null; 603 } catch (Throwable rex) { 604 ex = rex; 605 fr = null; 606 } 607 if (ex != null) 608 u = null; 609 else { 610 Object r = fr.result; 611 if (r == null) 612 r = fr.waitingGet(false); 613 if (r instanceof AltResult) { 614 ex = ((AltResult)r).ex; 615 u = null; 616 } 617 else { 618 @SuppressWarnings("unchecked") U ur = (U) r; 619 u = ur; 620 } 621 } 622 d.internalComplete(u, ex); 623 } 624 return true; 625 } 626 private static final long serialVersionUID = 5232453952276885070L; 627 } 628 629 /* ------------- Completions -------------- */ 630 631 /** 632 * Simple linked list nodes to record completions, used in 633 * basically the same way as WaitNodes. (We separate nodes from 634 * the Completions themselves mainly because for the And and Or 635 * methods, the same Completion object resides in two lists.) 636 */ 637 static final class CompletionNode { 638 final Completion completion; 639 volatile CompletionNode next; 640 CompletionNode(Completion completion) { this.completion = completion; } 641 } 642 643 // Opportunistically subclass AtomicInteger to use compareAndSet to claim. 644 abstract static class Completion extends AtomicInteger implements Runnable { 645 } 646 647 static final class ApplyCompletion<T,U> extends Completion { 648 final CompletableFuture<? extends T> src; 649 final Function<? super T,? extends U> fn; 650 final CompletableFuture<U> dst; 651 final Executor executor; 652 ApplyCompletion(CompletableFuture<? extends T> src, 653 Function<? super T,? extends U> fn, 654 CompletableFuture<U> dst, Executor executor) { 655 this.src = src; this.fn = fn; this.dst = dst; 656 this.executor = executor; 657 } 658 public final void run() { 659 final CompletableFuture<? extends T> a; 660 final Function<? super T,? extends U> fn; 661 final CompletableFuture<U> dst; 662 Object r; T t; Throwable ex; 663 if ((dst = this.dst) != null && 664 (fn = this.fn) != null && 665 (a = this.src) != null && 666 (r = a.result) != null && 667 compareAndSet(0, 1)) { 668 if (r instanceof AltResult) { 669 ex = ((AltResult)r).ex; 670 t = null; 671 } 672 else { 673 ex = null; 674 @SuppressWarnings("unchecked") T tr = (T) r; 675 t = tr; 676 } 677 Executor e = executor; 678 U u = null; 679 if (ex == null) { 680 try { 681 if (e != null) 682 e.execute(new AsyncApply<T,U>(t, fn, dst)); 683 else 684 u = fn.apply(t); 685 } catch (Throwable rex) { 686 ex = rex; 687 } 688 } 689 if (e == null || ex != null) 690 dst.internalComplete(u, ex); 691 } 692 } 693 private static final long serialVersionUID = 5232453952276885070L; 694 } 695 696 static final class AcceptCompletion<T> extends Completion { 697 final CompletableFuture<? extends T> src; 698 final Consumer<? super T> fn; 699 final CompletableFuture<Void> dst; 700 final Executor executor; 701 AcceptCompletion(CompletableFuture<? extends T> src, 702 Consumer<? super T> fn, 703 CompletableFuture<Void> dst, Executor executor) { 704 this.src = src; this.fn = fn; this.dst = dst; 705 this.executor = executor; 706 } 707 public final void run() { 708 final CompletableFuture<? extends T> a; 709 final Consumer<? super T> fn; 710 final CompletableFuture<Void> dst; 711 Object r; T t; Throwable ex; 712 if ((dst = this.dst) != null && 713 (fn = this.fn) != null && 714 (a = this.src) != null && 715 (r = a.result) != null && 716 compareAndSet(0, 1)) { 717 if (r instanceof AltResult) { 718 ex = ((AltResult)r).ex; 719 t = null; 720 } 721 else { 722 ex = null; 723 @SuppressWarnings("unchecked") T tr = (T) r; 724 t = tr; 725 } 726 Executor e = executor; 727 if (ex == null) { 728 try { 729 if (e != null) 730 e.execute(new AsyncAccept<T>(t, fn, dst)); 731 else 732 fn.accept(t); 733 } catch (Throwable rex) { 734 ex = rex; 735 } 736 } 737 if (e == null || ex != null) 738 dst.internalComplete(null, ex); 739 } 740 } 741 private static final long serialVersionUID = 5232453952276885070L; 742 } 743 744 static final class RunCompletion<T> extends Completion { 745 final CompletableFuture<? extends T> src; 746 final Runnable fn; 747 final CompletableFuture<Void> dst; 748 final Executor executor; 749 RunCompletion(CompletableFuture<? extends T> src, 750 Runnable fn, 751 CompletableFuture<Void> dst, 752 Executor executor) { 753 this.src = src; this.fn = fn; this.dst = dst; 754 this.executor = executor; 755 } 756 public final void run() { 757 final CompletableFuture<? extends T> a; 758 final Runnable fn; 759 final CompletableFuture<Void> dst; 760 Object r; Throwable ex; 761 if ((dst = this.dst) != null && 762 (fn = this.fn) != null && 763 (a = this.src) != null && 764 (r = a.result) != null && 765 compareAndSet(0, 1)) { 766 if (r instanceof AltResult) 767 ex = ((AltResult)r).ex; 768 else 769 ex = null; 770 Executor e = executor; 771 if (ex == null) { 772 try { 773 if (e != null) 774 e.execute(new AsyncRun(fn, dst)); 775 else 776 fn.run(); 777 } catch (Throwable rex) { 778 ex = rex; 779 } 780 } 781 if (e == null || ex != null) 782 dst.internalComplete(null, ex); 783 } 784 } 785 private static final long serialVersionUID = 5232453952276885070L; 786 } 787 788 static final class BiApplyCompletion<T,U,V> extends Completion { 789 final CompletableFuture<? extends T> src; 790 final CompletableFuture<? extends U> snd; 791 final BiFunction<? super T,? super U,? extends V> fn; 792 final CompletableFuture<V> dst; 793 final Executor executor; 794 BiApplyCompletion(CompletableFuture<? extends T> src, 795 CompletableFuture<? extends U> snd, 796 BiFunction<? super T,? super U,? extends V> fn, 797 CompletableFuture<V> dst, Executor executor) { 798 this.src = src; this.snd = snd; 799 this.fn = fn; this.dst = dst; 800 this.executor = executor; 801 } 802 public final void run() { 803 final CompletableFuture<? extends T> a; 804 final CompletableFuture<? extends U> b; 805 final BiFunction<? super T,? super U,? extends V> fn; 806 final CompletableFuture<V> dst; 807 Object r, s; T t; U u; Throwable ex; 808 if ((dst = this.dst) != null && 809 (fn = this.fn) != null && 810 (a = this.src) != null && 811 (r = a.result) != null && 812 (b = this.snd) != null && 813 (s = b.result) != null && 814 compareAndSet(0, 1)) { 815 if (r instanceof AltResult) { 816 ex = ((AltResult)r).ex; 817 t = null; 818 } 819 else { 820 ex = null; 821 @SuppressWarnings("unchecked") T tr = (T) r; 822 t = tr; 823 } 824 if (ex != null) 825 u = null; 826 else if (s instanceof AltResult) { 827 ex = ((AltResult)s).ex; 828 u = null; 829 } 830 else { 831 @SuppressWarnings("unchecked") U us = (U) s; 832 u = us; 833 } 834 Executor e = executor; 835 V v = null; 836 if (ex == null) { 837 try { 838 if (e != null) 839 e.execute(new AsyncBiApply<T,U,V>(t, u, fn, dst)); 840 else 841 v = fn.apply(t, u); 842 } catch (Throwable rex) { 843 ex = rex; 844 } 845 } 846 if (e == null || ex != null) 847 dst.internalComplete(v, ex); 848 } 849 } 850 private static final long serialVersionUID = 5232453952276885070L; 851 } 852 853 static final class BiAcceptCompletion<T,U> extends Completion { 854 final CompletableFuture<? extends T> src; 855 final CompletableFuture<? extends U> snd; 856 final BiConsumer<? super T,? super U> fn; 857 final CompletableFuture<Void> dst; 858 final Executor executor; 859 BiAcceptCompletion(CompletableFuture<? extends T> src, 860 CompletableFuture<? extends U> snd, 861 BiConsumer<? super T,? super U> fn, 862 CompletableFuture<Void> dst, Executor executor) { 863 this.src = src; this.snd = snd; 864 this.fn = fn; this.dst = dst; 865 this.executor = executor; 866 } 867 public final void run() { 868 final CompletableFuture<? extends T> a; 869 final CompletableFuture<? extends U> b; 870 final BiConsumer<? super T,? super U> fn; 871 final CompletableFuture<Void> dst; 872 Object r, s; T t; U u; Throwable ex; 873 if ((dst = this.dst) != null && 874 (fn = this.fn) != null && 875 (a = this.src) != null && 876 (r = a.result) != null && 877 (b = this.snd) != null && 878 (s = b.result) != null && 879 compareAndSet(0, 1)) { 880 if (r instanceof AltResult) { 881 ex = ((AltResult)r).ex; 882 t = null; 883 } 884 else { 885 ex = null; 886 @SuppressWarnings("unchecked") T tr = (T) r; 887 t = tr; 888 } 889 if (ex != null) 890 u = null; 891 else if (s instanceof AltResult) { 892 ex = ((AltResult)s).ex; 893 u = null; 894 } 895 else { 896 @SuppressWarnings("unchecked") U us = (U) s; 897 u = us; 898 } 899 Executor e = executor; 900 if (ex == null) { 901 try { 902 if (e != null) 903 e.execute(new AsyncBiAccept<T,U>(t, u, fn, dst)); 904 else 905 fn.accept(t, u); 906 } catch (Throwable rex) { 907 ex = rex; 908 } 909 } 910 if (e == null || ex != null) 911 dst.internalComplete(null, ex); 912 } 913 } 914 private static final long serialVersionUID = 5232453952276885070L; 915 } 916 917 static final class BiRunCompletion<T> extends Completion { 918 final CompletableFuture<? extends T> src; 919 final CompletableFuture<?> snd; 920 final Runnable fn; 921 final CompletableFuture<Void> dst; 922 final Executor executor; 923 BiRunCompletion(CompletableFuture<? extends T> src, 924 CompletableFuture<?> snd, 925 Runnable fn, 926 CompletableFuture<Void> dst, Executor executor) { 927 this.src = src; this.snd = snd; 928 this.fn = fn; this.dst = dst; 929 this.executor = executor; 930 } 931 public final void run() { 932 final CompletableFuture<? extends T> a; 933 final CompletableFuture<?> b; 934 final Runnable fn; 935 final CompletableFuture<Void> dst; 936 Object r, s; Throwable ex; 937 if ((dst = this.dst) != null && 938 (fn = this.fn) != null && 939 (a = this.src) != null && 940 (r = a.result) != null && 941 (b = this.snd) != null && 942 (s = b.result) != null && 943 compareAndSet(0, 1)) { 944 if (r instanceof AltResult) 945 ex = ((AltResult)r).ex; 946 else 947 ex = null; 948 if (ex == null && (s instanceof AltResult)) 949 ex = ((AltResult)s).ex; 950 Executor e = executor; 951 if (ex == null) { 952 try { 953 if (e != null) 954 e.execute(new AsyncRun(fn, dst)); 955 else 956 fn.run(); 957 } catch (Throwable rex) { 958 ex = rex; 959 } 960 } 961 if (e == null || ex != null) 962 dst.internalComplete(null, ex); 963 } 964 } 965 private static final long serialVersionUID = 5232453952276885070L; 966 } 967 968 static final class AndCompletion extends Completion { 969 final CompletableFuture<?> src; 970 final CompletableFuture<?> snd; 971 final CompletableFuture<Void> dst; 972 AndCompletion(CompletableFuture<?> src, 973 CompletableFuture<?> snd, 974 CompletableFuture<Void> dst) { 975 this.src = src; this.snd = snd; this.dst = dst; 976 } 977 public final void run() { 978 final CompletableFuture<?> a; 979 final CompletableFuture<?> b; 980 final CompletableFuture<Void> dst; 981 Object r, s; Throwable ex; 982 if ((dst = this.dst) != null && 983 (a = this.src) != null && 984 (r = a.result) != null && 985 (b = this.snd) != null && 986 (s = b.result) != null && 987 compareAndSet(0, 1)) { 988 if (r instanceof AltResult) 989 ex = ((AltResult)r).ex; 990 else 991 ex = null; 992 if (ex == null && (s instanceof AltResult)) 993 ex = ((AltResult)s).ex; 994 dst.internalComplete(null, ex); 995 } 996 } 997 private static final long serialVersionUID = 5232453952276885070L; 998 } 999 1000 static final class OrApplyCompletion<T,U> extends Completion { 1001 final CompletableFuture<? extends T> src; 1002 final CompletableFuture<? extends T> snd; 1003 final Function<? super T,? extends U> fn; 1004 final CompletableFuture<U> dst; 1005 final Executor executor; 1006 OrApplyCompletion(CompletableFuture<? extends T> src, 1007 CompletableFuture<? extends T> snd, 1008 Function<? super T,? extends U> fn, 1009 CompletableFuture<U> dst, Executor executor) { 1010 this.src = src; this.snd = snd; 1011 this.fn = fn; this.dst = dst; 1012 this.executor = executor; 1013 } 1014 public final void run() { 1015 final CompletableFuture<? extends T> a; 1016 final CompletableFuture<? extends T> b; 1017 final Function<? super T,? extends U> fn; 1018 final CompletableFuture<U> dst; 1019 Object r; T t; Throwable ex; 1020 if ((dst = this.dst) != null && 1021 (fn = this.fn) != null && 1022 (((a = this.src) != null && (r = a.result) != null) || 1023 ((b = this.snd) != null && (r = b.result) != null)) && 1024 compareAndSet(0, 1)) { 1025 if (r instanceof AltResult) { 1026 ex = ((AltResult)r).ex; 1027 t = null; 1028 } 1029 else { 1030 ex = null; 1031 @SuppressWarnings("unchecked") T tr = (T) r; 1032 t = tr; 1033 } 1034 Executor e = executor; 1035 U u = null; 1036 if (ex == null) { 1037 try { 1038 if (e != null) 1039 e.execute(new AsyncApply<T,U>(t, fn, dst)); 1040 else 1041 u = fn.apply(t); 1042 } catch (Throwable rex) { 1043 ex = rex; 1044 } 1045 } 1046 if (e == null || ex != null) 1047 dst.internalComplete(u, ex); 1048 } 1049 } 1050 private static final long serialVersionUID = 5232453952276885070L; 1051 } 1052 1053 static final class OrAcceptCompletion<T> extends Completion { 1054 final CompletableFuture<? extends T> src; 1055 final CompletableFuture<? extends T> snd; 1056 final Consumer<? super T> fn; 1057 final CompletableFuture<Void> dst; 1058 final Executor executor; 1059 OrAcceptCompletion(CompletableFuture<? extends T> src, 1060 CompletableFuture<? extends T> snd, 1061 Consumer<? super T> fn, 1062 CompletableFuture<Void> dst, Executor executor) { 1063 this.src = src; this.snd = snd; 1064 this.fn = fn; this.dst = dst; 1065 this.executor = executor; 1066 } 1067 public final void run() { 1068 final CompletableFuture<? extends T> a; 1069 final CompletableFuture<? extends T> b; 1070 final Consumer<? super T> fn; 1071 final CompletableFuture<Void> dst; 1072 Object r; T t; Throwable ex; 1073 if ((dst = this.dst) != null && 1074 (fn = this.fn) != null && 1075 (((a = this.src) != null && (r = a.result) != null) || 1076 ((b = this.snd) != null && (r = b.result) != null)) && 1077 compareAndSet(0, 1)) { 1078 if (r instanceof AltResult) { 1079 ex = ((AltResult)r).ex; 1080 t = null; 1081 } 1082 else { 1083 ex = null; 1084 @SuppressWarnings("unchecked") T tr = (T) r; 1085 t = tr; 1086 } 1087 Executor e = executor; 1088 if (ex == null) { 1089 try { 1090 if (e != null) 1091 e.execute(new AsyncAccept<T>(t, fn, dst)); 1092 else 1093 fn.accept(t); 1094 } catch (Throwable rex) { 1095 ex = rex; 1096 } 1097 } 1098 if (e == null || ex != null) 1099 dst.internalComplete(null, ex); 1100 } 1101 } 1102 private static final long serialVersionUID = 5232453952276885070L; 1103 } 1104 1105 static final class OrRunCompletion<T> extends Completion { 1106 final CompletableFuture<? extends T> src; 1107 final CompletableFuture<?> snd; 1108 final Runnable fn; 1109 final CompletableFuture<Void> dst; 1110 final Executor executor; 1111 OrRunCompletion(CompletableFuture<? extends T> src, 1112 CompletableFuture<?> snd, 1113 Runnable fn, 1114 CompletableFuture<Void> dst, Executor executor) { 1115 this.src = src; this.snd = snd; 1116 this.fn = fn; this.dst = dst; 1117 this.executor = executor; 1118 } 1119 public final void run() { 1120 final CompletableFuture<? extends T> a; 1121 final CompletableFuture<?> b; 1122 final Runnable fn; 1123 final CompletableFuture<Void> dst; 1124 Object r; Throwable ex; 1125 if ((dst = this.dst) != null && 1126 (fn = this.fn) != null && 1127 (((a = this.src) != null && (r = a.result) != null) || 1128 ((b = this.snd) != null && (r = b.result) != null)) && 1129 compareAndSet(0, 1)) { 1130 if (r instanceof AltResult) 1131 ex = ((AltResult)r).ex; 1132 else 1133 ex = null; 1134 Executor e = executor; 1135 if (ex == null) { 1136 try { 1137 if (e != null) 1138 e.execute(new AsyncRun(fn, dst)); 1139 else 1140 fn.run(); 1141 } catch (Throwable rex) { 1142 ex = rex; 1143 } 1144 } 1145 if (e == null || ex != null) 1146 dst.internalComplete(null, ex); 1147 } 1148 } 1149 private static final long serialVersionUID = 5232453952276885070L; 1150 } 1151 1152 static final class OrCompletion extends Completion { 1153 final CompletableFuture<?> src; 1154 final CompletableFuture<?> snd; 1155 final CompletableFuture<Object> dst; 1156 OrCompletion(CompletableFuture<?> src, 1157 CompletableFuture<?> snd, 1158 CompletableFuture<Object> dst) { 1159 this.src = src; this.snd = snd; this.dst = dst; 1160 } 1161 public final void run() { 1162 final CompletableFuture<?> a; 1163 final CompletableFuture<?> b; 1164 final CompletableFuture<Object> dst; 1165 Object r, t; Throwable ex; 1166 if ((dst = this.dst) != null && 1167 (((a = this.src) != null && (r = a.result) != null) || 1168 ((b = this.snd) != null && (r = b.result) != null)) && 1169 compareAndSet(0, 1)) { 1170 if (r instanceof AltResult) { 1171 ex = ((AltResult)r).ex; 1172 t = null; 1173 } 1174 else { 1175 ex = null; 1176 t = r; 1177 } 1178 dst.internalComplete(t, ex); 1179 } 1180 } 1181 private static final long serialVersionUID = 5232453952276885070L; 1182 } 1183 1184 static final class ExceptionCompletion<T> extends Completion { 1185 final CompletableFuture<? extends T> src; 1186 final Function<? super Throwable, ? extends T> fn; 1187 final CompletableFuture<T> dst; 1188 ExceptionCompletion(CompletableFuture<? extends T> src, 1189 Function<? super Throwable, ? extends T> fn, 1190 CompletableFuture<T> dst) { 1191 this.src = src; this.fn = fn; this.dst = dst; 1192 } 1193 public final void run() { 1194 final CompletableFuture<? extends T> a; 1195 final Function<? super Throwable, ? extends T> fn; 1196 final CompletableFuture<T> dst; 1197 Object r; T t = null; Throwable ex, dx = null; 1198 if ((dst = this.dst) != null && 1199 (fn = this.fn) != null && 1200 (a = this.src) != null && 1201 (r = a.result) != null && 1202 compareAndSet(0, 1)) { 1203 if ((r instanceof AltResult) && 1204 (ex = ((AltResult)r).ex) != null) { 1205 try { 1206 t = fn.apply(ex); 1207 } catch (Throwable rex) { 1208 dx = rex; 1209 } 1210 } 1211 else { 1212 @SuppressWarnings("unchecked") T tr = (T) r; 1213 t = tr; 1214 } 1215 dst.internalComplete(t, dx); 1216 } 1217 } 1218 private static final long serialVersionUID = 5232453952276885070L; 1219 } 1220 1221 static final class ThenCopy<T> extends Completion { 1222 final CompletableFuture<?> src; 1223 final CompletableFuture<T> dst; 1224 ThenCopy(CompletableFuture<?> src, 1225 CompletableFuture<T> dst) { 1226 this.src = src; this.dst = dst; 1227 } 1228 public final void run() { 1229 final CompletableFuture<?> a; 1230 final CompletableFuture<T> dst; 1231 Object r; T t; Throwable ex; 1232 if ((dst = this.dst) != null && 1233 (a = this.src) != null && 1234 (r = a.result) != null && 1235 compareAndSet(0, 1)) { 1236 if (r instanceof AltResult) { 1237 ex = ((AltResult)r).ex; 1238 t = null; 1239 } 1240 else { 1241 ex = null; 1242 @SuppressWarnings("unchecked") T tr = (T) r; 1243 t = tr; 1244 } 1245 dst.internalComplete(t, ex); 1246 } 1247 } 1248 private static final long serialVersionUID = 5232453952276885070L; 1249 } 1250 1251 // version of ThenCopy for CompletableFuture<Void> dst 1252 static final class ThenPropagate extends Completion { 1253 final CompletableFuture<?> src; 1254 final CompletableFuture<Void> dst; 1255 ThenPropagate(CompletableFuture<?> src, 1256 CompletableFuture<Void> dst) { 1257 this.src = src; this.dst = dst; 1258 } 1259 public final void run() { 1260 final CompletableFuture<?> a; 1261 final CompletableFuture<Void> dst; 1262 Object r; Throwable ex; 1263 if ((dst = this.dst) != null && 1264 (a = this.src) != null && 1265 (r = a.result) != null && 1266 compareAndSet(0, 1)) { 1267 if (r instanceof AltResult) 1268 ex = ((AltResult)r).ex; 1269 else 1270 ex = null; 1271 dst.internalComplete(null, ex); 1272 } 1273 } 1274 private static final long serialVersionUID = 5232453952276885070L; 1275 } 1276 1277 static final class HandleCompletion<T,U> extends Completion { 1278 final CompletableFuture<? extends T> src; 1279 final BiFunction<? super T, Throwable, ? extends U> fn; 1280 final CompletableFuture<U> dst; 1281 HandleCompletion(CompletableFuture<? extends T> src, 1282 BiFunction<? super T, Throwable, ? extends U> fn, 1283 CompletableFuture<U> dst) { 1284 this.src = src; this.fn = fn; this.dst = dst; 1285 } 1286 public final void run() { 1287 final CompletableFuture<? extends T> a; 1288 final BiFunction<? super T, Throwable, ? extends U> fn; 1289 final CompletableFuture<U> dst; 1290 Object r; T t; Throwable ex; 1291 if ((dst = this.dst) != null && 1292 (fn = this.fn) != null && 1293 (a = this.src) != null && 1294 (r = a.result) != null && 1295 compareAndSet(0, 1)) { 1296 if (r instanceof AltResult) { 1297 ex = ((AltResult)r).ex; 1298 t = null; 1299 } 1300 else { 1301 ex = null; 1302 @SuppressWarnings("unchecked") T tr = (T) r; 1303 t = tr; 1304 } 1305 U u = null; Throwable dx = null; 1306 try { 1307 u = fn.apply(t, ex); 1308 } catch (Throwable rex) { 1309 dx = rex; 1310 } 1311 dst.internalComplete(u, dx); 1312 } 1313 } 1314 private static final long serialVersionUID = 5232453952276885070L; 1315 } 1316 1317 static final class ComposeCompletion<T,U> extends Completion { 1318 final CompletableFuture<? extends T> src; 1319 final Function<? super T, CompletableFuture<U>> fn; 1320 final CompletableFuture<U> dst; 1321 final Executor executor; 1322 ComposeCompletion(CompletableFuture<? extends T> src, 1323 Function<? super T, CompletableFuture<U>> fn, 1324 CompletableFuture<U> dst, Executor executor) { 1325 this.src = src; this.fn = fn; this.dst = dst; 1326 this.executor = executor; 1327 } 1328 public final void run() { 1329 final CompletableFuture<? extends T> a; 1330 final Function<? super T, CompletableFuture<U>> fn; 1331 final CompletableFuture<U> dst; 1332 Object r; T t; Throwable ex; Executor e; 1333 if ((dst = this.dst) != null && 1334 (fn = this.fn) != null && 1335 (a = this.src) != null && 1336 (r = a.result) != null && 1337 compareAndSet(0, 1)) { 1338 if (r instanceof AltResult) { 1339 ex = ((AltResult)r).ex; 1340 t = null; 1341 } 1342 else { 1343 ex = null; 1344 @SuppressWarnings("unchecked") T tr = (T) r; 1345 t = tr; 1346 } 1347 CompletableFuture<U> c = null; 1348 U u = null; 1349 boolean complete = false; 1350 if (ex == null) { 1351 if ((e = executor) != null) 1352 e.execute(new AsyncCompose<T,U>(t, fn, dst)); 1353 else { 1354 try { 1355 if ((c = fn.apply(t)) == null) 1356 ex = new NullPointerException(); 1357 } catch (Throwable rex) { 1358 ex = rex; 1359 } 1360 } 1361 } 1362 if (c != null) { 1363 ThenCopy<U> d = null; 1364 Object s; 1365 if ((s = c.result) == null) { 1366 CompletionNode p = new CompletionNode 1367 (d = new ThenCopy<U>(c, dst)); 1368 while ((s = c.result) == null) { 1369 if (UNSAFE.compareAndSwapObject 1370 (c, COMPLETIONS, p.next = c.completions, p)) 1371 break; 1372 } 1373 } 1374 if (s != null && (d == null || d.compareAndSet(0, 1))) { 1375 complete = true; 1376 if (s instanceof AltResult) { 1377 ex = ((AltResult)s).ex; // no rewrap 1378 u = null; 1379 } 1380 else { 1381 @SuppressWarnings("unchecked") U us = (U) s; 1382 u = us; 1383 } 1384 } 1385 } 1386 if (complete || ex != null) 1387 dst.internalComplete(u, ex); 1388 if (c != null) 1389 c.helpPostComplete(); 1390 } 1391 } 1392 private static final long serialVersionUID = 5232453952276885070L; 1393 } 1394 1395 // public methods 1396 1397 /** 1398 * Creates a new incomplete CompletableFuture. 1399 */ 1400 public CompletableFuture() { 1401 } 1402 1403 /** 1404 * Returns a new CompletableFuture that is asynchronously completed 1405 * by a task running in the {@link ForkJoinPool#commonPool()} with 1406 * the value obtained by calling the given Supplier. 1407 * 1408 * @param supplier a function returning the value to be used 1409 * to complete the returned CompletableFuture 1410 * @return the new CompletableFuture 1411 */ 1412 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { 1413 if (supplier == null) throw new NullPointerException(); 1414 CompletableFuture<U> f = new CompletableFuture<U>(); 1415 ForkJoinPool.commonPool(). 1416 execute((ForkJoinTask<?>)new AsyncSupply<U>(supplier, f)); 1417 return f; 1418 } 1419 1420 /** 1421 * Returns a new CompletableFuture that is asynchronously completed 1422 * by a task running in the given executor with the value obtained 1423 * by calling the given Supplier. 1424 * 1425 * @param supplier a function returning the value to be used 1426 * to complete the returned CompletableFuture 1427 * @param executor the executor to use for asynchronous execution 1428 * @return the new CompletableFuture 1429 */ 1430 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, 1431 Executor executor) { 1432 if (executor == null || supplier == null) 1433 throw new NullPointerException(); 1434 CompletableFuture<U> f = new CompletableFuture<U>(); 1435 executor.execute(new AsyncSupply<U>(supplier, f)); 1436 return f; 1437 } 1438 1439 /** 1440 * Returns a new CompletableFuture that is asynchronously completed 1441 * by a task running in the {@link ForkJoinPool#commonPool()} after 1442 * it runs the given action. 1443 * 1444 * @param runnable the action to run before completing the 1445 * returned CompletableFuture 1446 * @return the new CompletableFuture 1447 */ 1448 public static CompletableFuture<Void> runAsync(Runnable runnable) { 1449 if (runnable == null) throw new NullPointerException(); 1450 CompletableFuture<Void> f = new CompletableFuture<Void>(); 1451 ForkJoinPool.commonPool(). 1452 execute((ForkJoinTask<?>)new AsyncRun(runnable, f)); 1453 return f; 1454 } 1455 1456 /** 1457 * Returns a new CompletableFuture that is asynchronously completed 1458 * by a task running in the given executor after it runs the given 1459 * action. 1460 * 1461 * @param runnable the action to run before completing the 1462 * returned CompletableFuture 1463 * @param executor the executor to use for asynchronous execution 1464 * @return the new CompletableFuture 1465 */ 1466 public static CompletableFuture<Void> runAsync(Runnable runnable, 1467 Executor executor) { 1468 if (executor == null || runnable == null) 1469 throw new NullPointerException(); 1470 CompletableFuture<Void> f = new CompletableFuture<Void>(); 1471 executor.execute(new AsyncRun(runnable, f)); 1472 return f; 1473 } 1474 1475 /** 1476 * Returns a new CompletableFuture that is already completed with 1477 * the given value. 1478 * 1479 * @param value the value 1480 * @return the completed CompletableFuture 1481 */ 1482 public static <U> CompletableFuture<U> completedFuture(U value) { 1483 CompletableFuture<U> f = new CompletableFuture<U>(); 1484 f.result = (value == null) ? NIL : value; 1485 return f; 1486 } 1487 1488 /** 1489 * Returns {@code true} if completed in any fashion: normally, 1490 * exceptionally, or via cancellation. 1491 * 1492 * @return {@code true} if completed 1493 */ 1494 public boolean isDone() { 1495 return result != null; 1496 } 1497 1498 /** 1499 * Waits if necessary for this future to complete, and then 1500 * returns its result. 1501 * 1502 * @return the result value 1503 * @throws CancellationException if this future was cancelled 1504 * @throws ExecutionException if this future completed exceptionally 1505 * @throws InterruptedException if the current thread was interrupted 1506 * while waiting 1507 */ 1508 public T get() throws InterruptedException, ExecutionException { 1509 Object r; Throwable ex, cause; 1510 if ((r = result) == null && (r = waitingGet(true)) == null) 1511 throw new InterruptedException(); 1512 if (!(r instanceof AltResult)) { 1513 @SuppressWarnings("unchecked") T tr = (T) r; 1514 return tr; 1515 } 1516 if ((ex = ((AltResult)r).ex) == null) 1517 return null; 1518 if (ex instanceof CancellationException) 1519 throw (CancellationException)ex; 1520 if ((ex instanceof CompletionException) && 1521 (cause = ex.getCause()) != null) 1522 ex = cause; 1523 throw new ExecutionException(ex); 1524 } 1525 1526 /** 1527 * Waits if necessary for at most the given time for this future 1528 * to complete, and then returns its result, if available. 1529 * 1530 * @param timeout the maximum time to wait 1531 * @param unit the time unit of the timeout argument 1532 * @return the result value 1533 * @throws CancellationException if this future was cancelled 1534 * @throws ExecutionException if this future completed exceptionally 1535 * @throws InterruptedException if the current thread was interrupted 1536 * while waiting 1537 * @throws TimeoutException if the wait timed out 1538 */ 1539 public T get(long timeout, TimeUnit unit) 1540 throws InterruptedException, ExecutionException, TimeoutException { 1541 Object r; Throwable ex, cause; 1542 long nanos = unit.toNanos(timeout); 1543 if (Thread.interrupted()) 1544 throw new InterruptedException(); 1545 if ((r = result) == null) 1546 r = timedAwaitDone(nanos); 1547 if (!(r instanceof AltResult)) { 1548 @SuppressWarnings("unchecked") T tr = (T) r; 1549 return tr; 1550 } 1551 if ((ex = ((AltResult)r).ex) == null) 1552 return null; 1553 if (ex instanceof CancellationException) 1554 throw (CancellationException)ex; 1555 if ((ex instanceof CompletionException) && 1556 (cause = ex.getCause()) != null) 1557 ex = cause; 1558 throw new ExecutionException(ex); 1559 } 1560 1561 /** 1562 * Returns the result value when complete, or throws an 1563 * (unchecked) exception if completed exceptionally. To better 1564 * conform with the use of common functional forms, if a 1565 * computation involved in the completion of this 1566 * CompletableFuture threw an exception, this method throws an 1567 * (unchecked) {@link CompletionException} with the underlying 1568 * exception as its cause. 1569 * 1570 * @return the result value 1571 * @throws CancellationException if the computation was cancelled 1572 * @throws CompletionException if this future completed 1573 * exceptionally or a completion computation threw an exception 1574 */ 1575 public T join() { 1576 Object r; Throwable ex; 1577 if ((r = result) == null) 1578 r = waitingGet(false); 1579 if (!(r instanceof AltResult)) { 1580 @SuppressWarnings("unchecked") T tr = (T) r; 1581 return tr; 1582 } 1583 if ((ex = ((AltResult)r).ex) == null) 1584 return null; 1585 if (ex instanceof CancellationException) 1586 throw (CancellationException)ex; 1587 if (ex instanceof CompletionException) 1588 throw (CompletionException)ex; 1589 throw new CompletionException(ex); 1590 } 1591 1592 /** 1593 * Returns the result value (or throws any encountered exception) 1594 * if completed, else returns the given valueIfAbsent. 1595 * 1596 * @param valueIfAbsent the value to return if not completed 1597 * @return the result value, if completed, else the given valueIfAbsent 1598 * @throws CancellationException if the computation was cancelled 1599 * @throws CompletionException if this future completed 1600 * exceptionally or a completion computation threw an exception 1601 */ 1602 public T getNow(T valueIfAbsent) { 1603 Object r; Throwable ex; 1604 if ((r = result) == null) 1605 return valueIfAbsent; 1606 if (!(r instanceof AltResult)) { 1607 @SuppressWarnings("unchecked") T tr = (T) r; 1608 return tr; 1609 } 1610 if ((ex = ((AltResult)r).ex) == null) 1611 return null; 1612 if (ex instanceof CancellationException) 1613 throw (CancellationException)ex; 1614 if (ex instanceof CompletionException) 1615 throw (CompletionException)ex; 1616 throw new CompletionException(ex); 1617 } 1618 1619 /** 1620 * If not already completed, sets the value returned by {@link 1621 * #get()} and related methods to the given value. 1622 * 1623 * @param value the result value 1624 * @return {@code true} if this invocation caused this CompletableFuture 1625 * to transition to a completed state, else {@code false} 1626 */ 1627 public boolean complete(T value) { 1628 boolean triggered = result == null && 1629 UNSAFE.compareAndSwapObject(this, RESULT, null, 1630 value == null ? NIL : value); 1631 postComplete(); 1632 return triggered; 1633 } 1634 1635 /** 1636 * If not already completed, causes invocations of {@link #get()} 1637 * and related methods to throw the given exception. 1638 * 1639 * @param ex the exception 1640 * @return {@code true} if this invocation caused this CompletableFuture 1641 * to transition to a completed state, else {@code false} 1642 */ 1643 public boolean completeExceptionally(Throwable ex) { 1644 if (ex == null) throw new NullPointerException(); 1645 boolean triggered = result == null && 1646 UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex)); 1647 postComplete(); 1648 return triggered; 1649 } 1650 1651 /** 1652 * Returns a new CompletableFuture that is completed 1653 * when this CompletableFuture completes, with the result of the 1654 * given function of this CompletableFuture's result. 1655 * 1656 * <p>If this CompletableFuture completes exceptionally, or the 1657 * supplied function throws an exception, then the returned 1658 * CompletableFuture completes exceptionally with a 1659 * CompletionException holding the exception as its cause. 1660 * 1661 * @param fn the function to use to compute the value of 1662 * the returned CompletableFuture 1663 * @return the new CompletableFuture 1664 */ 1665 public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) { 1666 return doThenApply(fn, null); 1667 } 1668 1669 /** 1670 * Returns a new CompletableFuture that is asynchronously completed 1671 * when this CompletableFuture completes, with the result of the 1672 * given function of this CompletableFuture's result from a 1673 * task running in the {@link ForkJoinPool#commonPool()}. 1674 * 1675 * <p>If this CompletableFuture completes exceptionally, or the 1676 * supplied function throws an exception, then the returned 1677 * CompletableFuture completes exceptionally with a 1678 * CompletionException holding the exception as its cause. 1679 * 1680 * @param fn the function to use to compute the value of 1681 * the returned CompletableFuture 1682 * @return the new CompletableFuture 1683 */ 1684 public <U> CompletableFuture<U> thenApplyAsync 1685 (Function<? super T,? extends U> fn) { 1686 return doThenApply(fn, ForkJoinPool.commonPool()); 1687 } 1688 1689 /** 1690 * Returns a new CompletableFuture that is asynchronously completed 1691 * when this CompletableFuture completes, with the result of the 1692 * given function of this CompletableFuture's result from a 1693 * task running in the given executor. 1694 * 1695 * <p>If this CompletableFuture completes exceptionally, or the 1696 * supplied function throws an exception, then the returned 1697 * CompletableFuture completes exceptionally with a 1698 * CompletionException holding the exception as its cause. 1699 * 1700 * @param fn the function to use to compute the value of 1701 * the returned CompletableFuture 1702 * @param executor the executor to use for asynchronous execution 1703 * @return the new CompletableFuture 1704 */ 1705 public <U> CompletableFuture<U> thenApplyAsync 1706 (Function<? super T,? extends U> fn, 1707 Executor executor) { 1708 if (executor == null) throw new NullPointerException(); 1709 return doThenApply(fn, executor); 1710 } 1711 1712 private <U> CompletableFuture<U> doThenApply 1713 (Function<? super T,? extends U> fn, 1714 Executor e) { 1715 if (fn == null) throw new NullPointerException(); 1716 CompletableFuture<U> dst = new CompletableFuture<U>(); 1717 ApplyCompletion<T,U> d = null; 1718 Object r; 1719 if ((r = result) == null) { 1720 CompletionNode p = new CompletionNode 1721 (d = new ApplyCompletion<T,U>(this, fn, dst, e)); 1722 while ((r = result) == null) { 1723 if (UNSAFE.compareAndSwapObject 1724 (this, COMPLETIONS, p.next = completions, p)) 1725 break; 1726 } 1727 } 1728 if (r != null && (d == null || d.compareAndSet(0, 1))) { 1729 T t; Throwable ex; 1730 if (r instanceof AltResult) { 1731 ex = ((AltResult)r).ex; 1732 t = null; 1733 } 1734 else { 1735 ex = null; 1736 @SuppressWarnings("unchecked") T tr = (T) r; 1737 t = tr; 1738 } 1739 U u = null; 1740 if (ex == null) { 1741 try { 1742 if (e != null) 1743 e.execute(new AsyncApply<T,U>(t, fn, dst)); 1744 else 1745 u = fn.apply(t); 1746 } catch (Throwable rex) { 1747 ex = rex; 1748 } 1749 } 1750 if (e == null || ex != null) 1751 dst.internalComplete(u, ex); 1752 } 1753 helpPostComplete(); 1754 return dst; 1755 } 1756 1757 /** 1758 * Returns a new CompletableFuture that is completed 1759 * when this CompletableFuture completes, after performing the given 1760 * action with this CompletableFuture's result. 1761 * 1762 * <p>If this CompletableFuture completes exceptionally, or the 1763 * supplied action throws an exception, then the returned 1764 * CompletableFuture completes exceptionally with a 1765 * CompletionException holding the exception as its cause. 1766 * 1767 * @param block the action to perform before completing the 1768 * returned CompletableFuture 1769 * @return the new CompletableFuture 1770 */ 1771 public CompletableFuture<Void> thenAccept(Consumer<? super T> block) { 1772 return doThenAccept(block, null); 1773 } 1774 1775 /** 1776 * Returns a new CompletableFuture that is asynchronously completed 1777 * when this CompletableFuture completes, after performing the given 1778 * action with this CompletableFuture's result from a task running 1779 * in the {@link ForkJoinPool#commonPool()}. 1780 * 1781 * <p>If this CompletableFuture completes exceptionally, or the 1782 * supplied action throws an exception, then the returned 1783 * CompletableFuture completes exceptionally with a 1784 * CompletionException holding the exception as its cause. 1785 * 1786 * @param block the action to perform before completing the 1787 * returned CompletableFuture 1788 * @return the new CompletableFuture 1789 */ 1790 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block) { 1791 return doThenAccept(block, ForkJoinPool.commonPool()); 1792 } 1793 1794 /** 1795 * Returns a new CompletableFuture that is asynchronously completed 1796 * when this CompletableFuture completes, after performing the given 1797 * action with this CompletableFuture's result from a task running 1798 * in the given executor. 1799 * 1800 * <p>If this CompletableFuture completes exceptionally, or the 1801 * supplied action throws an exception, then the returned 1802 * CompletableFuture completes exceptionally with a 1803 * CompletionException holding the exception as its cause. 1804 * 1805 * @param block the action to perform before completing the 1806 * returned CompletableFuture 1807 * @param executor the executor to use for asynchronous execution 1808 * @return the new CompletableFuture 1809 */ 1810 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block, 1811 Executor executor) { 1812 if (executor == null) throw new NullPointerException(); 1813 return doThenAccept(block, executor); 1814 } 1815 1816 private CompletableFuture<Void> doThenAccept(Consumer<? super T> fn, 1817 Executor e) { 1818 if (fn == null) throw new NullPointerException(); 1819 CompletableFuture<Void> dst = new CompletableFuture<Void>(); 1820 AcceptCompletion<T> d = null; 1821 Object r; 1822 if ((r = result) == null) { 1823 CompletionNode p = new CompletionNode 1824 (d = new AcceptCompletion<T>(this, fn, dst, e)); 1825 while ((r = result) == null) { 1826 if (UNSAFE.compareAndSwapObject 1827 (this, COMPLETIONS, p.next = completions, p)) 1828 break; 1829 } 1830 } 1831 if (r != null && (d == null || d.compareAndSet(0, 1))) { 1832 T t; Throwable ex; 1833 if (r instanceof AltResult) { 1834 ex = ((AltResult)r).ex; 1835 t = null; 1836 } 1837 else { 1838 ex = null; 1839 @SuppressWarnings("unchecked") T tr = (T) r; 1840 t = tr; 1841 } 1842 if (ex == null) { 1843 try { 1844 if (e != null) 1845 e.execute(new AsyncAccept<T>(t, fn, dst)); 1846 else 1847 fn.accept(t); 1848 } catch (Throwable rex) { 1849 ex = rex; 1850 } 1851 } 1852 if (e == null || ex != null) 1853 dst.internalComplete(null, ex); 1854 } 1855 helpPostComplete(); 1856 return dst; 1857 } 1858 1859 /** 1860 * Returns a new CompletableFuture that is completed 1861 * when this CompletableFuture completes, after performing the given 1862 * action. 1863 * 1864 * <p>If this CompletableFuture completes exceptionally, or the 1865 * supplied action throws an exception, then the returned 1866 * CompletableFuture completes exceptionally with a 1867 * CompletionException holding the exception as its cause. 1868 * 1869 * @param action the action to perform before completing the 1870 * returned CompletableFuture 1871 * @return the new CompletableFuture 1872 */ 1873 public CompletableFuture<Void> thenRun(Runnable action) { 1874 return doThenRun(action, null); 1875 } 1876 1877 /** 1878 * Returns a new CompletableFuture that is asynchronously completed 1879 * when this CompletableFuture completes, after performing the given 1880 * action from a task running in the {@link ForkJoinPool#commonPool()}. 1881 * 1882 * <p>If this CompletableFuture completes exceptionally, or the 1883 * supplied action throws an exception, then the returned 1884 * CompletableFuture completes exceptionally with a 1885 * CompletionException holding the exception as its cause. 1886 * 1887 * @param action the action to perform before completing the 1888 * returned CompletableFuture 1889 * @return the new CompletableFuture 1890 */ 1891 public CompletableFuture<Void> thenRunAsync(Runnable action) { 1892 return doThenRun(action, ForkJoinPool.commonPool()); 1893 } 1894 1895 /** 1896 * Returns a new CompletableFuture that is asynchronously completed 1897 * when this CompletableFuture completes, after performing the given 1898 * action from a task running in the given executor. 1899 * 1900 * <p>If this CompletableFuture completes exceptionally, or the 1901 * supplied action throws an exception, then the returned 1902 * CompletableFuture completes exceptionally with a 1903 * CompletionException holding the exception as its cause. 1904 * 1905 * @param action the action to perform before completing the 1906 * returned CompletableFuture 1907 * @param executor the executor to use for asynchronous execution 1908 * @return the new CompletableFuture 1909 */ 1910 public CompletableFuture<Void> thenRunAsync(Runnable action, 1911 Executor executor) { 1912 if (executor == null) throw new NullPointerException(); 1913 return doThenRun(action, executor); 1914 } 1915 1916 private CompletableFuture<Void> doThenRun(Runnable action, 1917 Executor e) { 1918 if (action == null) throw new NullPointerException(); 1919 CompletableFuture<Void> dst = new CompletableFuture<Void>(); 1920 RunCompletion<T> d = null; 1921 Object r; 1922 if ((r = result) == null) { 1923 CompletionNode p = new CompletionNode 1924 (d = new RunCompletion<T>(this, action, dst, e)); 1925 while ((r = result) == null) { 1926 if (UNSAFE.compareAndSwapObject 1927 (this, COMPLETIONS, p.next = completions, p)) 1928 break; 1929 } 1930 } 1931 if (r != null && (d == null || d.compareAndSet(0, 1))) { 1932 Throwable ex; 1933 if (r instanceof AltResult) 1934 ex = ((AltResult)r).ex; 1935 else 1936 ex = null; 1937 if (ex == null) { 1938 try { 1939 if (e != null) 1940 e.execute(new AsyncRun(action, dst)); 1941 else 1942 action.run(); 1943 } catch (Throwable rex) { 1944 ex = rex; 1945 } 1946 } 1947 if (e == null || ex != null) 1948 dst.internalComplete(null, ex); 1949 } 1950 helpPostComplete(); 1951 return dst; 1952 } 1953 1954 /** 1955 * Returns a new CompletableFuture that is completed 1956 * when both this and the other given CompletableFuture complete, 1957 * with the result of the given function of the results of the two 1958 * CompletableFutures. 1959 * 1960 * <p>If this and/or the other CompletableFuture complete 1961 * exceptionally, or the supplied function throws an exception, 1962 * then the returned CompletableFuture completes exceptionally 1963 * with a CompletionException holding the exception as its cause. 1964 * 1965 * @param other the other CompletableFuture 1966 * @param fn the function to use to compute the value of 1967 * the returned CompletableFuture 1968 * @return the new CompletableFuture 1969 */ 1970 public <U,V> CompletableFuture<V> thenCombine 1971 (CompletableFuture<? extends U> other, 1972 BiFunction<? super T,? super U,? extends V> fn) { 1973 return doThenBiApply(other, fn, null); 1974 } 1975 1976 /** 1977 * Returns a new CompletableFuture that is asynchronously completed 1978 * when both this and the other given CompletableFuture complete, 1979 * with the result of the given function of the results of the two 1980 * CompletableFutures from a task running in the 1981 * {@link ForkJoinPool#commonPool()}. 1982 * 1983 * <p>If this and/or the other CompletableFuture complete 1984 * exceptionally, or the supplied function throws an exception, 1985 * then the returned CompletableFuture completes exceptionally 1986 * with a CompletionException holding the exception as its cause. 1987 * 1988 * @param other the other CompletableFuture 1989 * @param fn the function to use to compute the value of 1990 * the returned CompletableFuture 1991 * @return the new CompletableFuture 1992 */ 1993 public <U,V> CompletableFuture<V> thenCombineAsync 1994 (CompletableFuture<? extends U> other, 1995 BiFunction<? super T,? super U,? extends V> fn) { 1996 return doThenBiApply(other, fn, ForkJoinPool.commonPool()); 1997 } 1998 1999 /** 2000 * Returns a new CompletableFuture that is asynchronously completed 2001 * when both this and the other given CompletableFuture complete, 2002 * with the result of the given function of the results of the two 2003 * CompletableFutures from a task running in the given executor. 2004 * 2005 * <p>If this and/or the other CompletableFuture complete 2006 * exceptionally, or the supplied function throws an exception, 2007 * then the returned CompletableFuture completes exceptionally 2008 * with a CompletionException holding the exception as its cause. 2009 * 2010 * @param other the other CompletableFuture 2011 * @param fn the function to use to compute the value of 2012 * the returned CompletableFuture 2013 * @param executor the executor to use for asynchronous execution 2014 * @return the new CompletableFuture 2015 */ 2016 public <U,V> CompletableFuture<V> thenCombineAsync 2017 (CompletableFuture<? extends U> other, 2018 BiFunction<? super T,? super U,? extends V> fn, 2019 Executor executor) { 2020 if (executor == null) throw new NullPointerException(); 2021 return doThenBiApply(other, fn, executor); 2022 } 2023 2024 private <U,V> CompletableFuture<V> doThenBiApply 2025 (CompletableFuture<? extends U> other, 2026 BiFunction<? super T,? super U,? extends V> fn, 2027 Executor e) { 2028 if (other == null || fn == null) throw new NullPointerException(); 2029 CompletableFuture<V> dst = new CompletableFuture<V>(); 2030 BiApplyCompletion<T,U,V> d = null; 2031 Object r, s = null; 2032 if ((r = result) == null || (s = other.result) == null) { 2033 d = new BiApplyCompletion<T,U,V>(this, other, fn, dst, e); 2034 CompletionNode q = null, p = new CompletionNode(d); 2035 while ((r == null && (r = result) == null) || 2036 (s == null && (s = other.result) == null)) { 2037 if (q != null) { 2038 if (s != null || 2039 UNSAFE.compareAndSwapObject 2040 (other, COMPLETIONS, q.next = other.completions, q)) 2041 break; 2042 } 2043 else if (r != null || 2044 UNSAFE.compareAndSwapObject 2045 (this, COMPLETIONS, p.next = completions, p)) { 2046 if (s != null) 2047 break; 2048 q = new CompletionNode(d); 2049 } 2050 } 2051 } 2052 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) { 2053 T t; U u; Throwable ex; 2054 if (r instanceof AltResult) { 2055 ex = ((AltResult)r).ex; 2056 t = null; 2057 } 2058 else { 2059 ex = null; 2060 @SuppressWarnings("unchecked") T tr = (T) r; 2061 t = tr; 2062 } 2063 if (ex != null) 2064 u = null; 2065 else if (s instanceof AltResult) { 2066 ex = ((AltResult)s).ex; 2067 u = null; 2068 } 2069 else { 2070 @SuppressWarnings("unchecked") U us = (U) s; 2071 u = us; 2072 } 2073 V v = null; 2074 if (ex == null) { 2075 try { 2076 if (e != null) 2077 e.execute(new AsyncBiApply<T,U,V>(t, u, fn, dst)); 2078 else 2079 v = fn.apply(t, u); 2080 } catch (Throwable rex) { 2081 ex = rex; 2082 } 2083 } 2084 if (e == null || ex != null) 2085 dst.internalComplete(v, ex); 2086 } 2087 helpPostComplete(); 2088 other.helpPostComplete(); 2089 return dst; 2090 } 2091 2092 /** 2093 * Returns a new CompletableFuture that is completed 2094 * when both this and the other given CompletableFuture complete, 2095 * after performing the given action with the results of the two 2096 * CompletableFutures. 2097 * 2098 * <p>If this and/or the other CompletableFuture complete 2099 * exceptionally, or the supplied action throws an exception, 2100 * then the returned CompletableFuture completes exceptionally 2101 * with a CompletionException holding the exception as its cause. 2102 * 2103 * @param other the other CompletableFuture 2104 * @param block the action to perform before completing the 2105 * returned CompletableFuture 2106 * @return the new CompletableFuture 2107 */ 2108 public <U> CompletableFuture<Void> thenAcceptBoth 2109 (CompletableFuture<? extends U> other, 2110 BiConsumer<? super T, ? super U> block) { 2111 return doThenBiAccept(other, block, null); 2112 } 2113 2114 /** 2115 * Returns a new CompletableFuture that is asynchronously completed 2116 * when both this and the other given CompletableFuture complete, 2117 * after performing the given action with the results of the two 2118 * CompletableFutures from a task running in the {@link 2119 * ForkJoinPool#commonPool()}. 2120 * 2121 * <p>If this and/or the other CompletableFuture complete 2122 * exceptionally, or the supplied action throws an exception, 2123 * then the returned CompletableFuture completes exceptionally 2124 * with a CompletionException holding the exception as its cause. 2125 * 2126 * @param other the other CompletableFuture 2127 * @param block the action to perform before completing the 2128 * returned CompletableFuture 2129 * @return the new CompletableFuture 2130 */ 2131 public <U> CompletableFuture<Void> thenAcceptBothAsync 2132 (CompletableFuture<? extends U> other, 2133 BiConsumer<? super T, ? super U> block) { 2134 return doThenBiAccept(other, block, ForkJoinPool.commonPool()); 2135 } 2136 2137 /** 2138 * Returns a new CompletableFuture that is asynchronously completed 2139 * when both this and the other given CompletableFuture complete, 2140 * after performing the given action with the results of the two 2141 * CompletableFutures from a task running in the given executor. 2142 * 2143 * <p>If this and/or the other CompletableFuture complete 2144 * exceptionally, or the supplied action throws an exception, 2145 * then the returned CompletableFuture completes exceptionally 2146 * with a CompletionException holding the exception as its cause. 2147 * 2148 * @param other the other CompletableFuture 2149 * @param block the action to perform before completing the 2150 * returned CompletableFuture 2151 * @param executor the executor to use for asynchronous execution 2152 * @return the new CompletableFuture 2153 */ 2154 public <U> CompletableFuture<Void> thenAcceptBothAsync 2155 (CompletableFuture<? extends U> other, 2156 BiConsumer<? super T, ? super U> block, 2157 Executor executor) { 2158 if (executor == null) throw new NullPointerException(); 2159 return doThenBiAccept(other, block, executor); 2160 } 2161 2162 private <U> CompletableFuture<Void> doThenBiAccept 2163 (CompletableFuture<? extends U> other, 2164 BiConsumer<? super T,? super U> fn, 2165 Executor e) { 2166 if (other == null || fn == null) throw new NullPointerException(); 2167 CompletableFuture<Void> dst = new CompletableFuture<Void>(); 2168 BiAcceptCompletion<T,U> d = null; 2169 Object r, s = null; 2170 if ((r = result) == null || (s = other.result) == null) { 2171 d = new BiAcceptCompletion<T,U>(this, other, fn, dst, e); 2172 CompletionNode q = null, p = new CompletionNode(d); 2173 while ((r == null && (r = result) == null) || 2174 (s == null && (s = other.result) == null)) { 2175 if (q != null) { 2176 if (s != null || 2177 UNSAFE.compareAndSwapObject 2178 (other, COMPLETIONS, q.next = other.completions, q)) 2179 break; 2180 } 2181 else if (r != null || 2182 UNSAFE.compareAndSwapObject 2183 (this, COMPLETIONS, p.next = completions, p)) { 2184 if (s != null) 2185 break; 2186 q = new CompletionNode(d); 2187 } 2188 } 2189 } 2190 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) { 2191 T t; U u; Throwable ex; 2192 if (r instanceof AltResult) { 2193 ex = ((AltResult)r).ex; 2194 t = null; 2195 } 2196 else { 2197 ex = null; 2198 @SuppressWarnings("unchecked") T tr = (T) r; 2199 t = tr; 2200 } 2201 if (ex != null) 2202 u = null; 2203 else if (s instanceof AltResult) { 2204 ex = ((AltResult)s).ex; 2205 u = null; 2206 } 2207 else { 2208 @SuppressWarnings("unchecked") U us = (U) s; 2209 u = us; 2210 } 2211 if (ex == null) { 2212 try { 2213 if (e != null) 2214 e.execute(new AsyncBiAccept<T,U>(t, u, fn, dst)); 2215 else 2216 fn.accept(t, u); 2217 } catch (Throwable rex) { 2218 ex = rex; 2219 } 2220 } 2221 if (e == null || ex != null) 2222 dst.internalComplete(null, ex); 2223 } 2224 helpPostComplete(); 2225 other.helpPostComplete(); 2226 return dst; 2227 } 2228 2229 /** 2230 * Returns a new CompletableFuture that is completed 2231 * when both this and the other given CompletableFuture complete, 2232 * after performing the given action. 2233 * 2234 * <p>If this and/or the other CompletableFuture complete 2235 * exceptionally, or the supplied action throws an exception, 2236 * then the returned CompletableFuture completes exceptionally 2237 * with a CompletionException holding the exception as its cause. 2238 * 2239 * @param other the other CompletableFuture 2240 * @param action the action to perform before completing the 2241 * returned CompletableFuture 2242 * @return the new CompletableFuture 2243 */ 2244 public CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other, 2245 Runnable action) { 2246 return doThenBiRun(other, action, null); 2247 } 2248 2249 /** 2250 * Returns a new CompletableFuture that is asynchronously completed 2251 * when both this and the other given CompletableFuture complete, 2252 * after performing the given action from a task running in the 2253 * {@link ForkJoinPool#commonPool()}. 2254 * 2255 * <p>If this and/or the other CompletableFuture complete 2256 * exceptionally, or the supplied action throws an exception, 2257 * then the returned CompletableFuture completes exceptionally 2258 * with a CompletionException holding the exception as its cause. 2259 * 2260 * @param other the other CompletableFuture 2261 * @param action the action to perform before completing the 2262 * returned CompletableFuture 2263 * @return the new CompletableFuture 2264 */ 2265 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other, 2266 Runnable action) { 2267 return doThenBiRun(other, action, ForkJoinPool.commonPool()); 2268 } 2269 2270 /** 2271 * Returns a new CompletableFuture that is asynchronously completed 2272 * when both this and the other given CompletableFuture complete, 2273 * after performing the given action from a task running in the 2274 * given executor. 2275 * 2276 * <p>If this and/or the other CompletableFuture complete 2277 * exceptionally, or the supplied action throws an exception, 2278 * then the returned CompletableFuture completes exceptionally 2279 * with a CompletionException holding the exception as its cause. 2280 * 2281 * @param other the other CompletableFuture 2282 * @param action the action to perform before completing the 2283 * returned CompletableFuture 2284 * @param executor the executor to use for asynchronous execution 2285 * @return the new CompletableFuture 2286 */ 2287 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other, 2288 Runnable action, 2289 Executor executor) { 2290 if (executor == null) throw new NullPointerException(); 2291 return doThenBiRun(other, action, executor); 2292 } 2293 2294 private CompletableFuture<Void> doThenBiRun(CompletableFuture<?> other, 2295 Runnable action, 2296 Executor e) { 2297 if (other == null || action == null) throw new NullPointerException(); 2298 CompletableFuture<Void> dst = new CompletableFuture<Void>(); 2299 BiRunCompletion<T> d = null; 2300 Object r, s = null; 2301 if ((r = result) == null || (s = other.result) == null) { 2302 d = new BiRunCompletion<T>(this, other, action, dst, e); 2303 CompletionNode q = null, p = new CompletionNode(d); 2304 while ((r == null && (r = result) == null) || 2305 (s == null && (s = other.result) == null)) { 2306 if (q != null) { 2307 if (s != null || 2308 UNSAFE.compareAndSwapObject 2309 (other, COMPLETIONS, q.next = other.completions, q)) 2310 break; 2311 } 2312 else if (r != null || 2313 UNSAFE.compareAndSwapObject 2314 (this, COMPLETIONS, p.next = completions, p)) { 2315 if (s != null) 2316 break; 2317 q = new CompletionNode(d); 2318 } 2319 } 2320 } 2321 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) { 2322 Throwable ex; 2323 if (r instanceof AltResult) 2324 ex = ((AltResult)r).ex; 2325 else 2326 ex = null; 2327 if (ex == null && (s instanceof AltResult)) 2328 ex = ((AltResult)s).ex; 2329 if (ex == null) { 2330 try { 2331 if (e != null) 2332 e.execute(new AsyncRun(action, dst)); 2333 else 2334 action.run(); 2335 } catch (Throwable rex) { 2336 ex = rex; 2337 } 2338 } 2339 if (e == null || ex != null) 2340 dst.internalComplete(null, ex); 2341 } 2342 helpPostComplete(); 2343 other.helpPostComplete(); 2344 return dst; 2345 } 2346 2347 /** 2348 * Returns a new CompletableFuture that is completed 2349 * when either this or the other given CompletableFuture completes, 2350 * with the result of the given function of either this or the other 2351 * CompletableFuture's result. 2352 * 2353 * <p>If this and/or the other CompletableFuture complete 2354 * exceptionally, then the returned CompletableFuture may also do so, 2355 * with a CompletionException holding one of these exceptions as its 2356 * cause. No guarantees are made about which result or exception is 2357 * used in the returned CompletableFuture. If the supplied function 2358 * throws an exception, then the returned CompletableFuture completes 2359 * exceptionally with a CompletionException holding the exception as 2360 * its cause. 2361 * 2362 * @param other the other CompletableFuture 2363 * @param fn the function to use to compute the value of 2364 * the returned CompletableFuture 2365 * @return the new CompletableFuture 2366 */ 2367 public <U> CompletableFuture<U> applyToEither 2368 (CompletableFuture<? extends T> other, 2369 Function<? super T, U> fn) { 2370 return doOrApply(other, fn, null); 2371 } 2372 2373 /** 2374 * Returns a new CompletableFuture that is asynchronously completed 2375 * when either this or the other given CompletableFuture completes, 2376 * with the result of the given function of either this or the other 2377 * CompletableFuture's result from a task running in the 2378 * {@link ForkJoinPool#commonPool()}. 2379 * 2380 * <p>If this and/or the other CompletableFuture complete 2381 * exceptionally, then the returned CompletableFuture may also do so, 2382 * with a CompletionException holding one of these exceptions as its 2383 * cause. No guarantees are made about which result or exception is 2384 * used in the returned CompletableFuture. If the supplied function 2385 * throws an exception, then the returned CompletableFuture completes 2386 * exceptionally with a CompletionException holding the exception as 2387 * its cause. 2388 * 2389 * @param other the other CompletableFuture 2390 * @param fn the function to use to compute the value of 2391 * the returned CompletableFuture 2392 * @return the new CompletableFuture 2393 */ 2394 public <U> CompletableFuture<U> applyToEitherAsync 2395 (CompletableFuture<? extends T> other, 2396 Function<? super T, U> fn) { 2397 return doOrApply(other, fn, ForkJoinPool.commonPool()); 2398 } 2399 2400 /** 2401 * Returns a new CompletableFuture that is asynchronously completed 2402 * when either this or the other given CompletableFuture completes, 2403 * with the result of the given function of either this or the other 2404 * CompletableFuture's result from a task running in the 2405 * given executor. 2406 * 2407 * <p>If this and/or the other CompletableFuture complete 2408 * exceptionally, then the returned CompletableFuture may also do so, 2409 * with a CompletionException holding one of these exceptions as its 2410 * cause. No guarantees are made about which result or exception is 2411 * used in the returned CompletableFuture. If the supplied function 2412 * throws an exception, then the returned CompletableFuture completes 2413 * exceptionally with a CompletionException holding the exception as 2414 * its cause. 2415 * 2416 * @param other the other CompletableFuture 2417 * @param fn the function to use to compute the value of 2418 * the returned CompletableFuture 2419 * @param executor the executor to use for asynchronous execution 2420 * @return the new CompletableFuture 2421 */ 2422 public <U> CompletableFuture<U> applyToEitherAsync 2423 (CompletableFuture<? extends T> other, 2424 Function<? super T, U> fn, 2425 Executor executor) { 2426 if (executor == null) throw new NullPointerException(); 2427 return doOrApply(other, fn, executor); 2428 } 2429 2430 private <U> CompletableFuture<U> doOrApply 2431 (CompletableFuture<? extends T> other, 2432 Function<? super T, U> fn, 2433 Executor e) { 2434 if (other == null || fn == null) throw new NullPointerException(); 2435 CompletableFuture<U> dst = new CompletableFuture<U>(); 2436 OrApplyCompletion<T,U> d = null; 2437 Object r; 2438 if ((r = result) == null && (r = other.result) == null) { 2439 d = new OrApplyCompletion<T,U>(this, other, fn, dst, e); 2440 CompletionNode q = null, p = new CompletionNode(d); 2441 while ((r = result) == null && (r = other.result) == null) { 2442 if (q != null) { 2443 if (UNSAFE.compareAndSwapObject 2444 (other, COMPLETIONS, q.next = other.completions, q)) 2445 break; 2446 } 2447 else if (UNSAFE.compareAndSwapObject 2448 (this, COMPLETIONS, p.next = completions, p)) 2449 q = new CompletionNode(d); 2450 } 2451 } 2452 if (r != null && (d == null || d.compareAndSet(0, 1))) { 2453 T t; Throwable ex; 2454 if (r instanceof AltResult) { 2455 ex = ((AltResult)r).ex; 2456 t = null; 2457 } 2458 else { 2459 ex = null; 2460 @SuppressWarnings("unchecked") T tr = (T) r; 2461 t = tr; 2462 } 2463 U u = null; 2464 if (ex == null) { 2465 try { 2466 if (e != null) 2467 e.execute(new AsyncApply<T,U>(t, fn, dst)); 2468 else 2469 u = fn.apply(t); 2470 } catch (Throwable rex) { 2471 ex = rex; 2472 } 2473 } 2474 if (e == null || ex != null) 2475 dst.internalComplete(u, ex); 2476 } 2477 helpPostComplete(); 2478 other.helpPostComplete(); 2479 return dst; 2480 } 2481 2482 /** 2483 * Returns a new CompletableFuture that is completed 2484 * when either this or the other given CompletableFuture completes, 2485 * after performing the given action with the result of either this 2486 * or the other CompletableFuture's result. 2487 * 2488 * <p>If this and/or the other CompletableFuture complete 2489 * exceptionally, then the returned CompletableFuture may also do so, 2490 * with a CompletionException holding one of these exceptions as its 2491 * cause. No guarantees are made about which result or exception is 2492 * used in the returned CompletableFuture. If the supplied action 2493 * throws an exception, then the returned CompletableFuture completes 2494 * exceptionally with a CompletionException holding the exception as 2495 * its cause. 2496 * 2497 * @param other the other CompletableFuture 2498 * @param block the action to perform before completing the 2499 * returned CompletableFuture 2500 * @return the new CompletableFuture 2501 */ 2502 public CompletableFuture<Void> acceptEither 2503 (CompletableFuture<? extends T> other, 2504 Consumer<? super T> block) { 2505 return doOrAccept(other, block, null); 2506 } 2507 2508 /** 2509 * Returns a new CompletableFuture that is asynchronously completed 2510 * when either this or the other given CompletableFuture completes, 2511 * after performing the given action with the result of either this 2512 * or the other CompletableFuture's result from a task running in 2513 * the {@link ForkJoinPool#commonPool()}. 2514 * 2515 * <p>If this and/or the other CompletableFuture complete 2516 * exceptionally, then the returned CompletableFuture may also do so, 2517 * with a CompletionException holding one of these exceptions as its 2518 * cause. No guarantees are made about which result or exception is 2519 * used in the returned CompletableFuture. If the supplied action 2520 * throws an exception, then the returned CompletableFuture completes 2521 * exceptionally with a CompletionException holding the exception as 2522 * its cause. 2523 * 2524 * @param other the other CompletableFuture 2525 * @param block the action to perform before completing the 2526 * returned CompletableFuture 2527 * @return the new CompletableFuture 2528 */ 2529 public CompletableFuture<Void> acceptEitherAsync 2530 (CompletableFuture<? extends T> other, 2531 Consumer<? super T> block) { 2532 return doOrAccept(other, block, ForkJoinPool.commonPool()); 2533 } 2534 2535 /** 2536 * Returns a new CompletableFuture that is asynchronously completed 2537 * when either this or the other given CompletableFuture completes, 2538 * after performing the given action with the result of either this 2539 * or the other CompletableFuture's result from a task running in 2540 * the given executor. 2541 * 2542 * <p>If this and/or the other CompletableFuture complete 2543 * exceptionally, then the returned CompletableFuture may also do so, 2544 * with a CompletionException holding one of these exceptions as its 2545 * cause. No guarantees are made about which result or exception is 2546 * used in the returned CompletableFuture. If the supplied action 2547 * throws an exception, then the returned CompletableFuture completes 2548 * exceptionally with a CompletionException holding the exception as 2549 * its cause. 2550 * 2551 * @param other the other CompletableFuture 2552 * @param block the action to perform before completing the 2553 * returned CompletableFuture 2554 * @param executor the executor to use for asynchronous execution 2555 * @return the new CompletableFuture 2556 */ 2557 public CompletableFuture<Void> acceptEitherAsync 2558 (CompletableFuture<? extends T> other, 2559 Consumer<? super T> block, 2560 Executor executor) { 2561 if (executor == null) throw new NullPointerException(); 2562 return doOrAccept(other, block, executor); 2563 } 2564 2565 private CompletableFuture<Void> doOrAccept 2566 (CompletableFuture<? extends T> other, 2567 Consumer<? super T> fn, 2568 Executor e) { 2569 if (other == null || fn == null) throw new NullPointerException(); 2570 CompletableFuture<Void> dst = new CompletableFuture<Void>(); 2571 OrAcceptCompletion<T> d = null; 2572 Object r; 2573 if ((r = result) == null && (r = other.result) == null) { 2574 d = new OrAcceptCompletion<T>(this, other, fn, dst, e); 2575 CompletionNode q = null, p = new CompletionNode(d); 2576 while ((r = result) == null && (r = other.result) == null) { 2577 if (q != null) { 2578 if (UNSAFE.compareAndSwapObject 2579 (other, COMPLETIONS, q.next = other.completions, q)) 2580 break; 2581 } 2582 else if (UNSAFE.compareAndSwapObject 2583 (this, COMPLETIONS, p.next = completions, p)) 2584 q = new CompletionNode(d); 2585 } 2586 } 2587 if (r != null && (d == null || d.compareAndSet(0, 1))) { 2588 T t; Throwable ex; 2589 if (r instanceof AltResult) { 2590 ex = ((AltResult)r).ex; 2591 t = null; 2592 } 2593 else { 2594 ex = null; 2595 @SuppressWarnings("unchecked") T tr = (T) r; 2596 t = tr; 2597 } 2598 if (ex == null) { 2599 try { 2600 if (e != null) 2601 e.execute(new AsyncAccept<T>(t, fn, dst)); 2602 else 2603 fn.accept(t); 2604 } catch (Throwable rex) { 2605 ex = rex; 2606 } 2607 } 2608 if (e == null || ex != null) 2609 dst.internalComplete(null, ex); 2610 } 2611 helpPostComplete(); 2612 other.helpPostComplete(); 2613 return dst; 2614 } 2615 2616 /** 2617 * Returns a new CompletableFuture that is completed 2618 * when either this or the other given CompletableFuture completes, 2619 * after performing the given action. 2620 * 2621 * <p>If this and/or the other CompletableFuture complete 2622 * exceptionally, then the returned CompletableFuture may also do so, 2623 * with a CompletionException holding one of these exceptions as its 2624 * cause. No guarantees are made about which result or exception is 2625 * used in the returned CompletableFuture. If the supplied action 2626 * throws an exception, then the returned CompletableFuture completes 2627 * exceptionally with a CompletionException holding the exception as 2628 * its cause. 2629 * 2630 * @param other the other CompletableFuture 2631 * @param action the action to perform before completing the 2632 * returned CompletableFuture 2633 * @return the new CompletableFuture 2634 */ 2635 public CompletableFuture<Void> runAfterEither(CompletableFuture<?> other, 2636 Runnable action) { 2637 return doOrRun(other, action, null); 2638 } 2639 2640 /** 2641 * Returns a new CompletableFuture that is asynchronously completed 2642 * when either this or the other given CompletableFuture completes, 2643 * after performing the given action from a task running in the 2644 * {@link ForkJoinPool#commonPool()}. 2645 * 2646 * <p>If this and/or the other CompletableFuture complete 2647 * exceptionally, then the returned CompletableFuture may also do so, 2648 * with a CompletionException holding one of these exceptions as its 2649 * cause. No guarantees are made about which result or exception is 2650 * used in the returned CompletableFuture. If the supplied action 2651 * throws an exception, then the returned CompletableFuture completes 2652 * exceptionally with a CompletionException holding the exception as 2653 * its cause. 2654 * 2655 * @param other the other CompletableFuture 2656 * @param action the action to perform before completing the 2657 * returned CompletableFuture 2658 * @return the new CompletableFuture 2659 */ 2660 public CompletableFuture<Void> runAfterEitherAsync 2661 (CompletableFuture<?> other, 2662 Runnable action) { 2663 return doOrRun(other, action, ForkJoinPool.commonPool()); 2664 } 2665 2666 /** 2667 * Returns a new CompletableFuture that is asynchronously completed 2668 * when either this or the other given CompletableFuture completes, 2669 * after performing the given action from a task running in the 2670 * given executor. 2671 * 2672 * <p>If this and/or the other CompletableFuture complete 2673 * exceptionally, then the returned CompletableFuture may also do so, 2674 * with a CompletionException holding one of these exceptions as its 2675 * cause. No guarantees are made about which result or exception is 2676 * used in the returned CompletableFuture. If the supplied action 2677 * throws an exception, then the returned CompletableFuture completes 2678 * exceptionally with a CompletionException holding the exception as 2679 * its cause. 2680 * 2681 * @param other the other CompletableFuture 2682 * @param action the action to perform before completing the 2683 * returned CompletableFuture 2684 * @param executor the executor to use for asynchronous execution 2685 * @return the new CompletableFuture 2686 */ 2687 public CompletableFuture<Void> runAfterEitherAsync 2688 (CompletableFuture<?> other, 2689 Runnable action, 2690 Executor executor) { 2691 if (executor == null) throw new NullPointerException(); 2692 return doOrRun(other, action, executor); 2693 } 2694 2695 private CompletableFuture<Void> doOrRun(CompletableFuture<?> other, 2696 Runnable action, 2697 Executor e) { 2698 if (other == null || action == null) throw new NullPointerException(); 2699 CompletableFuture<Void> dst = new CompletableFuture<Void>(); 2700 OrRunCompletion<T> d = null; 2701 Object r; 2702 if ((r = result) == null && (r = other.result) == null) { 2703 d = new OrRunCompletion<T>(this, other, action, dst, e); 2704 CompletionNode q = null, p = new CompletionNode(d); 2705 while ((r = result) == null && (r = other.result) == null) { 2706 if (q != null) { 2707 if (UNSAFE.compareAndSwapObject 2708 (other, COMPLETIONS, q.next = other.completions, q)) 2709 break; 2710 } 2711 else if (UNSAFE.compareAndSwapObject 2712 (this, COMPLETIONS, p.next = completions, p)) 2713 q = new CompletionNode(d); 2714 } 2715 } 2716 if (r != null && (d == null || d.compareAndSet(0, 1))) { 2717 Throwable ex; 2718 if (r instanceof AltResult) 2719 ex = ((AltResult)r).ex; 2720 else 2721 ex = null; 2722 if (ex == null) { 2723 try { 2724 if (e != null) 2725 e.execute(new AsyncRun(action, dst)); 2726 else 2727 action.run(); 2728 } catch (Throwable rex) { 2729 ex = rex; 2730 } 2731 } 2732 if (e == null || ex != null) 2733 dst.internalComplete(null, ex); 2734 } 2735 helpPostComplete(); 2736 other.helpPostComplete(); 2737 return dst; 2738 } 2739 2740 /** 2741 * Returns a CompletableFuture that upon completion, has the same 2742 * value as produced by the given function of the result of this 2743 * CompletableFuture. 2744 * 2745 * <p>If this CompletableFuture completes exceptionally, then the 2746 * returned CompletableFuture also does so, with a 2747 * CompletionException holding this exception as its cause. 2748 * Similarly, if the computed CompletableFuture completes 2749 * exceptionally, then so does the returned CompletableFuture. 2750 * 2751 * @param fn the function returning a new CompletableFuture 2752 * @return the CompletableFuture 2753 */ 2754 public <U> CompletableFuture<U> thenCompose 2755 (Function<? super T, CompletableFuture<U>> fn) { 2756 return doCompose(fn, null); 2757 } 2758 2759 /** 2760 * Returns a CompletableFuture that upon completion, has the same 2761 * value as that produced asynchronously using the {@link 2762 * ForkJoinPool#commonPool()} by the given function of the result 2763 * of this CompletableFuture. 2764 * 2765 * <p>If this CompletableFuture completes exceptionally, then the 2766 * returned CompletableFuture also does so, with a 2767 * CompletionException holding this exception as its cause. 2768 * Similarly, if the computed CompletableFuture completes 2769 * exceptionally, then so does the returned CompletableFuture. 2770 * 2771 * @param fn the function returning a new CompletableFuture 2772 * @return the CompletableFuture 2773 */ 2774 public <U> CompletableFuture<U> thenComposeAsync 2775 (Function<? super T, CompletableFuture<U>> fn) { 2776 return doCompose(fn, ForkJoinPool.commonPool()); 2777 } 2778 2779 /** 2780 * Returns a CompletableFuture that upon completion, has the same 2781 * value as that produced asynchronously using the given executor 2782 * by the given function of this CompletableFuture. 2783 * 2784 * <p>If this CompletableFuture completes exceptionally, then the 2785 * returned CompletableFuture also does so, with a 2786 * CompletionException holding this exception as its cause. 2787 * Similarly, if the computed CompletableFuture completes 2788 * exceptionally, then so does the returned CompletableFuture. 2789 * 2790 * @param fn the function returning a new CompletableFuture 2791 * @param executor the executor to use for asynchronous execution 2792 * @return the CompletableFuture 2793 */ 2794 public <U> CompletableFuture<U> thenComposeAsync 2795 (Function<? super T, CompletableFuture<U>> fn, 2796 Executor executor) { 2797 if (executor == null) throw new NullPointerException(); 2798 return doCompose(fn, executor); 2799 } 2800 2801 private <U> CompletableFuture<U> doCompose 2802 (Function<? super T, CompletableFuture<U>> fn, 2803 Executor e) { 2804 if (fn == null) throw new NullPointerException(); 2805 CompletableFuture<U> dst = null; 2806 ComposeCompletion<T,U> d = null; 2807 Object r; 2808 if ((r = result) == null) { 2809 dst = new CompletableFuture<U>(); 2810 CompletionNode p = new CompletionNode 2811 (d = new ComposeCompletion<T,U>(this, fn, dst, e)); 2812 while ((r = result) == null) { 2813 if (UNSAFE.compareAndSwapObject 2814 (this, COMPLETIONS, p.next = completions, p)) 2815 break; 2816 } 2817 } 2818 if (r != null && (d == null || d.compareAndSet(0, 1))) { 2819 T t; Throwable ex; 2820 if (r instanceof AltResult) { 2821 ex = ((AltResult)r).ex; 2822 t = null; 2823 } 2824 else { 2825 ex = null; 2826 @SuppressWarnings("unchecked") T tr = (T) r; 2827 t = tr; 2828 } 2829 if (ex == null) { 2830 if (e != null) { 2831 if (dst == null) 2832 dst = new CompletableFuture<U>(); 2833 e.execute(new AsyncCompose<T,U>(t, fn, dst)); 2834 } 2835 else { 2836 try { 2837 if ((dst = fn.apply(t)) == null) 2838 ex = new NullPointerException(); 2839 } catch (Throwable rex) { 2840 ex = rex; 2841 } 2842 if (dst == null) 2843 dst = new CompletableFuture<U>(); 2844 } 2845 } 2846 if (e == null && ex != null) 2847 dst.internalComplete(null, ex); 2848 } 2849 helpPostComplete(); 2850 dst.helpPostComplete(); 2851 return dst; 2852 } 2853 2854 /** 2855 * Returns a new CompletableFuture that is completed when this 2856 * CompletableFuture completes, with the result of the given 2857 * function of the exception triggering this CompletableFuture's 2858 * completion when it completes exceptionally; otherwise, if this 2859 * CompletableFuture completes normally, then the returned 2860 * CompletableFuture also completes normally with the same value. 2861 * 2862 * @param fn the function to use to compute the value of the 2863 * returned CompletableFuture if this CompletableFuture completed 2864 * exceptionally 2865 * @return the new CompletableFuture 2866 */ 2867 public CompletableFuture<T> exceptionally 2868 (Function<Throwable, ? extends T> fn) { 2869 if (fn == null) throw new NullPointerException(); 2870 CompletableFuture<T> dst = new CompletableFuture<T>(); 2871 ExceptionCompletion<T> d = null; 2872 Object r; 2873 if ((r = result) == null) { 2874 CompletionNode p = 2875 new CompletionNode(d = new ExceptionCompletion<T>(this, fn, dst)); 2876 while ((r = result) == null) { 2877 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, 2878 p.next = completions, p)) 2879 break; 2880 } 2881 } 2882 if (r != null && (d == null || d.compareAndSet(0, 1))) { 2883 T t = null; Throwable ex, dx = null; 2884 if (r instanceof AltResult) { 2885 if ((ex = ((AltResult)r).ex) != null) { 2886 try { 2887 t = fn.apply(ex); 2888 } catch (Throwable rex) { 2889 dx = rex; 2890 } 2891 } 2892 } 2893 else { 2894 @SuppressWarnings("unchecked") T tr = (T) r; 2895 t = tr; 2896 } 2897 dst.internalComplete(t, dx); 2898 } 2899 helpPostComplete(); 2900 return dst; 2901 } 2902 2903 /** 2904 * Returns a new CompletableFuture that is completed when this 2905 * CompletableFuture completes, with the result of the given 2906 * function of the result and exception of this CompletableFuture's 2907 * completion. The given function is invoked with the result (or 2908 * {@code null} if none) and the exception (or {@code null} if none) 2909 * of this CompletableFuture when complete. 2910 * 2911 * @param fn the function to use to compute the value of the 2912 * returned CompletableFuture 2913 * @return the new CompletableFuture 2914 */ 2915 public <U> CompletableFuture<U> handle 2916 (BiFunction<? super T, Throwable, ? extends U> fn) { 2917 if (fn == null) throw new NullPointerException(); 2918 CompletableFuture<U> dst = new CompletableFuture<U>(); 2919 HandleCompletion<T,U> d = null; 2920 Object r; 2921 if ((r = result) == null) { 2922 CompletionNode p = 2923 new CompletionNode(d = new HandleCompletion<T,U>(this, fn, dst)); 2924 while ((r = result) == null) { 2925 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, 2926 p.next = completions, p)) 2927 break; 2928 } 2929 } 2930 if (r != null && (d == null || d.compareAndSet(0, 1))) { 2931 T t; Throwable ex; 2932 if (r instanceof AltResult) { 2933 ex = ((AltResult)r).ex; 2934 t = null; 2935 } 2936 else { 2937 ex = null; 2938 @SuppressWarnings("unchecked") T tr = (T) r; 2939 t = tr; 2940 } 2941 U u; Throwable dx; 2942 try { 2943 u = fn.apply(t, ex); 2944 dx = null; 2945 } catch (Throwable rex) { 2946 dx = rex; 2947 u = null; 2948 } 2949 dst.internalComplete(u, dx); 2950 } 2951 helpPostComplete(); 2952 return dst; 2953 } 2954 2955 2956 /* ------------- Arbitrary-arity constructions -------------- */ 2957 2958 /* 2959 * The basic plan of attack is to recursively form binary 2960 * completion trees of elements. This can be overkill for small 2961 * sets, but scales nicely. The And/All vs Or/Any forms use the 2962 * same idea, but details differ. 2963 */ 2964 2965 /** 2966 * Returns a new CompletableFuture that is completed when all of 2967 * the given CompletableFutures complete. If any of the given 2968 * CompletableFutures complete exceptionally, then the returned 2969 * CompletableFuture also does so, with a CompletionException 2970 * holding this exception as its cause. Otherwise, the results, 2971 * if any, of the given CompletableFutures are not reflected in 2972 * the returned CompletableFuture, but may be obtained by 2973 * inspecting them individually. If no CompletableFutures are 2974 * provided, returns a CompletableFuture completed with the value 2975 * {@code null}. 2976 * 2977 * <p>Among the applications of this method is to await completion 2978 * of a set of independent CompletableFutures before continuing a 2979 * program, as in: {@code CompletableFuture.allOf(c1, c2, 2980 * c3).join();}. 2981 * 2982 * @param cfs the CompletableFutures 2983 * @return a new CompletableFuture that is completed when all of the 2984 * given CompletableFutures complete 2985 * @throws NullPointerException if the array or any of its elements are 2986 * {@code null} 2987 */ 2988 public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) { 2989 int len = cfs.length; // Directly handle empty and singleton cases 2990 if (len > 1) 2991 return allTree(cfs, 0, len - 1); 2992 else { 2993 CompletableFuture<Void> dst = new CompletableFuture<Void>(); 2994 CompletableFuture<?> f; 2995 if (len == 0) 2996 dst.result = NIL; 2997 else if ((f = cfs[0]) == null) 2998 throw new NullPointerException(); 2999 else { 3000 ThenPropagate d = null; 3001 CompletionNode p = null; 3002 Object r; 3003 while ((r = f.result) == null) { 3004 if (d == null) 3005 d = new ThenPropagate(f, dst); 3006 else if (p == null) 3007 p = new CompletionNode(d); 3008 else if (UNSAFE.compareAndSwapObject 3009 (f, COMPLETIONS, p.next = f.completions, p)) 3010 break; 3011 } 3012 if (r != null && (d == null || d.compareAndSet(0, 1))) 3013 dst.internalComplete(null, (r instanceof AltResult) ? 3014 ((AltResult)r).ex : null); 3015 f.helpPostComplete(); 3016 } 3017 return dst; 3018 } 3019 } 3020 3021 /** 3022 * Recursively constructs an And'ed tree of CompletableFutures. 3023 * Called only when array known to have at least two elements. 3024 */ 3025 private static CompletableFuture<Void> allTree(CompletableFuture<?>[] cfs, 3026 int lo, int hi) { 3027 CompletableFuture<?> fst, snd; 3028 int mid = (lo + hi) >>> 1; 3029 if ((fst = (lo == mid ? cfs[lo] : allTree(cfs, lo, mid))) == null || 3030 (snd = (hi == mid+1 ? cfs[hi] : allTree(cfs, mid+1, hi))) == null) 3031 throw new NullPointerException(); 3032 CompletableFuture<Void> dst = new CompletableFuture<Void>(); 3033 AndCompletion d = null; 3034 CompletionNode p = null, q = null; 3035 Object r = null, s = null; 3036 while ((r = fst.result) == null || (s = snd.result) == null) { 3037 if (d == null) 3038 d = new AndCompletion(fst, snd, dst); 3039 else if (p == null) 3040 p = new CompletionNode(d); 3041 else if (q == null) { 3042 if (UNSAFE.compareAndSwapObject 3043 (fst, COMPLETIONS, p.next = fst.completions, p)) 3044 q = new CompletionNode(d); 3045 } 3046 else if (UNSAFE.compareAndSwapObject 3047 (snd, COMPLETIONS, q.next = snd.completions, q)) 3048 break; 3049 } 3050 if ((r != null || (r = fst.result) != null) && 3051 (s != null || (s = snd.result) != null) && 3052 (d == null || d.compareAndSet(0, 1))) { 3053 Throwable ex; 3054 if (r instanceof AltResult) 3055 ex = ((AltResult)r).ex; 3056 else 3057 ex = null; 3058 if (ex == null && (s instanceof AltResult)) 3059 ex = ((AltResult)s).ex; 3060 dst.internalComplete(null, ex); 3061 } 3062 fst.helpPostComplete(); 3063 snd.helpPostComplete(); 3064 return dst; 3065 } 3066 3067 /** 3068 * Returns a new CompletableFuture that is completed when any of 3069 * the given CompletableFutures complete, with the same result. 3070 * Otherwise, if it completed exceptionally, the returned 3071 * CompletableFuture also does so, with a CompletionException 3072 * holding this exception as its cause. If no CompletableFutures 3073 * are provided, returns an incomplete CompletableFuture. 3074 * 3075 * @param cfs the CompletableFutures 3076 * @return a new CompletableFuture that is completed with the 3077 * result or exception of any of the given CompletableFutures when 3078 * one completes 3079 * @throws NullPointerException if the array or any of its elements are 3080 * {@code null} 3081 */ 3082 public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) { 3083 int len = cfs.length; // Same idea as allOf 3084 if (len > 1) 3085 return anyTree(cfs, 0, len - 1); 3086 else { 3087 CompletableFuture<Object> dst = new CompletableFuture<Object>(); 3088 CompletableFuture<?> f; 3089 if (len == 0) 3090 ; // skip 3091 else if ((f = cfs[0]) == null) 3092 throw new NullPointerException(); 3093 else { 3094 ThenCopy<Object> d = null; 3095 CompletionNode p = null; 3096 Object r; 3097 while ((r = f.result) == null) { 3098 if (d == null) 3099 d = new ThenCopy<Object>(f, dst); 3100 else if (p == null) 3101 p = new CompletionNode(d); 3102 else if (UNSAFE.compareAndSwapObject 3103 (f, COMPLETIONS, p.next = f.completions, p)) 3104 break; 3105 } 3106 if (r != null && (d == null || d.compareAndSet(0, 1))) { 3107 Throwable ex; Object t; 3108 if (r instanceof AltResult) { 3109 ex = ((AltResult)r).ex; 3110 t = null; 3111 } 3112 else { 3113 ex = null; 3114 t = r; 3115 } 3116 dst.internalComplete(t, ex); 3117 } 3118 f.helpPostComplete(); 3119 } 3120 return dst; 3121 } 3122 } 3123 3124 /** 3125 * Recursively constructs an Or'ed tree of CompletableFutures. 3126 */ 3127 private static CompletableFuture<Object> anyTree(CompletableFuture<?>[] cfs, 3128 int lo, int hi) { 3129 CompletableFuture<?> fst, snd; 3130 int mid = (lo + hi) >>> 1; 3131 if ((fst = (lo == mid ? cfs[lo] : anyTree(cfs, lo, mid))) == null || 3132 (snd = (hi == mid+1 ? cfs[hi] : anyTree(cfs, mid+1, hi))) == null) 3133 throw new NullPointerException(); 3134 CompletableFuture<Object> dst = new CompletableFuture<Object>(); 3135 OrCompletion d = null; 3136 CompletionNode p = null, q = null; 3137 Object r; 3138 while ((r = fst.result) == null && (r = snd.result) == null) { 3139 if (d == null) 3140 d = new OrCompletion(fst, snd, dst); 3141 else if (p == null) 3142 p = new CompletionNode(d); 3143 else if (q == null) { 3144 if (UNSAFE.compareAndSwapObject 3145 (fst, COMPLETIONS, p.next = fst.completions, p)) 3146 q = new CompletionNode(d); 3147 } 3148 else if (UNSAFE.compareAndSwapObject 3149 (snd, COMPLETIONS, q.next = snd.completions, q)) 3150 break; 3151 } 3152 if ((r != null || (r = fst.result) != null || 3153 (r = snd.result) != null) && 3154 (d == null || d.compareAndSet(0, 1))) { 3155 Throwable ex; Object t; 3156 if (r instanceof AltResult) { 3157 ex = ((AltResult)r).ex; 3158 t = null; 3159 } 3160 else { 3161 ex = null; 3162 t = r; 3163 } 3164 dst.internalComplete(t, ex); 3165 } 3166 fst.helpPostComplete(); 3167 snd.helpPostComplete(); 3168 return dst; 3169 } 3170 3171 /* ------------- Control and status methods -------------- */ 3172 3173 /** 3174 * If not already completed, completes this CompletableFuture with 3175 * a {@link CancellationException}. Dependent CompletableFutures 3176 * that have not already completed will also complete 3177 * exceptionally, with a {@link CompletionException} caused by 3178 * this {@code CancellationException}. 3179 * 3180 * @param mayInterruptIfRunning this value has no effect in this 3181 * implementation because interrupts are not used to control 3182 * processing. 3183 * 3184 * @return {@code true} if this task is now cancelled 3185 */ 3186 public boolean cancel(boolean mayInterruptIfRunning) { 3187 boolean cancelled = (result == null) && 3188 UNSAFE.compareAndSwapObject 3189 (this, RESULT, null, new AltResult(new CancellationException())); 3190 postComplete(); 3191 return cancelled || isCancelled(); 3192 } 3193 3194 /** 3195 * Returns {@code true} if this CompletableFuture was cancelled 3196 * before it completed normally. 3197 * 3198 * @return {@code true} if this CompletableFuture was cancelled 3199 * before it completed normally 3200 */ 3201 public boolean isCancelled() { 3202 Object r; 3203 return ((r = result) instanceof AltResult) && 3204 (((AltResult)r).ex instanceof CancellationException); 3205 } 3206 3207 /** 3208 * Forcibly sets or resets the value subsequently returned by 3209 * method {@link #get()} and related methods, whether or not 3210 * already completed. This method is designed for use only in 3211 * error recovery actions, and even in such situations may result 3212 * in ongoing dependent completions using established versus 3213 * overwritten outcomes. 3214 * 3215 * @param value the completion value 3216 */ 3217 public void obtrudeValue(T value) { 3218 result = (value == null) ? NIL : value; 3219 postComplete(); 3220 } 3221 3222 /** 3223 * Forcibly causes subsequent invocations of method {@link #get()} 3224 * and related methods to throw the given exception, whether or 3225 * not already completed. This method is designed for use only in 3226 * recovery actions, and even in such situations may result in 3227 * ongoing dependent completions using established versus 3228 * overwritten outcomes. 3229 * 3230 * @param ex the exception 3231 */ 3232 public void obtrudeException(Throwable ex) { 3233 if (ex == null) throw new NullPointerException(); 3234 result = new AltResult(ex); 3235 postComplete(); 3236 } 3237 3238 /** 3239 * Returns the estimated number of CompletableFutures whose 3240 * completions are awaiting completion of this CompletableFuture. 3241 * This method is designed for use in monitoring system state, not 3242 * for synchronization control. 3243 * 3244 * @return the number of dependent CompletableFutures 3245 */ 3246 public int getNumberOfDependents() { 3247 int count = 0; 3248 for (CompletionNode p = completions; p != null; p = p.next) 3249 ++count; 3250 return count; 3251 } 3252 3253 /** 3254 * Returns a string identifying this CompletableFuture, as well as 3255 * its completion state. The state, in brackets, contains the 3256 * String {@code "Completed Normally"} or the String {@code 3257 * "Completed Exceptionally"}, or the String {@code "Not 3258 * completed"} followed by the number of CompletableFutures 3259 * dependent upon its completion, if any. 3260 * 3261 * @return a string identifying this CompletableFuture, as well as its state 3262 */ 3263 public String toString() { 3264 Object r = result; 3265 int count; 3266 return super.toString() + 3267 ((r == null) ? 3268 (((count = getNumberOfDependents()) == 0) ? 3269 "[Not completed]" : 3270 "[Not completed, " + count + " dependents]") : 3271 (((r instanceof AltResult) && ((AltResult)r).ex != null) ? 3272 "[Completed exceptionally]" : 3273 "[Completed normally]")); 3274 } 3275 3276 // Unsafe mechanics 3277 private static final sun.misc.Unsafe UNSAFE; 3278 private static final long RESULT; 3279 private static final long WAITERS; 3280 private static final long COMPLETIONS; 3281 static { 3282 try { 3283 UNSAFE = sun.misc.Unsafe.getUnsafe(); 3284 Class<?> k = CompletableFuture.class; 3285 RESULT = UNSAFE.objectFieldOffset 3286 (k.getDeclaredField("result")); 3287 WAITERS = UNSAFE.objectFieldOffset 3288 (k.getDeclaredField("waiters")); 3289 COMPLETIONS = UNSAFE.objectFieldOffset 3290 (k.getDeclaredField("completions")); 3291 } catch (Exception e) { 3292 throw new Error(e); 3293 } 3294 } 3295 }