1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. 7 * 8 * This code is distributed in the hope that it will be useful, but WITHOUT 9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 11 * version 2 for more details (a copy is included in the LICENSE file that 12 * accompanied this code). 13 * 14 * You should have received a copy of the GNU General Public License version 15 * 2 along with this work; if not, write to the Free Software Foundation, 16 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 17 * 18 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 19 * or visit www.oracle.com if you need additional information or have any 20 * questions. 21 */ 22 23 /* 24 * This file is available under and governed by the GNU General Public 25 * License version 2 only, as published by the Free Software Foundation. 26 * However, the following notice accompanied the original version of this 27 * file: 28 * 29 * Written by Doug Lea with assistance from members of JCP JSR-166 30 * Expert Group and released to the public domain, as explained at 31 * http://creativecommons.org/publicdomain/zero/1.0/ 32 */ 33 34 /* 35 * @test 36 * @bug 8005696 37 * @summary Basic tests for CompletableFuture 38 * @author Chris Hegarty 39 */ 40 41 import java.util.concurrent.Phaser; 42 import static java.util.concurrent.TimeUnit.*; 43 import java.util.concurrent.CompletableFuture; 44 import static java.util.concurrent.CompletableFuture.*; 45 import java.util.concurrent.CompletionException; 46 import java.util.concurrent.CancellationException; 47 import java.util.concurrent.ExecutionException; 48 import java.util.concurrent.ExecutorService; 49 import java.util.concurrent.Executors; 50 import static java.util.concurrent.ForkJoinPool.*; 51 import java.util.concurrent.atomic.AtomicInteger; 52 53 54 public class Basic { 55 56 static void checkCompletedNormally(CompletableFuture<?> cf, Object value) { 57 checkCompletedNormally(cf, value == null ? null : new Object[] { value }); 58 } 59 60 static void checkCompletedNormally(CompletableFuture<?> cf, Object[] values) { 61 try { equalAnyOf(cf.join(), values); } catch (Throwable x) { unexpected(x); } 62 try { equalAnyOf(cf.getNow(null), values); } catch (Throwable x) { unexpected(x); } 63 try { equalAnyOf(cf.get(), values); } catch (Throwable x) { unexpected(x); } 64 try { equalAnyOf(cf.get(0L, SECONDS), values); } catch (Throwable x) { unexpected(x); } 65 check(cf.isDone(), "Expected isDone to be true, got:" + cf); 66 check(!cf.isCancelled(), "Expected isCancelled to be false"); 67 check(!cf.cancel(true), "Expected cancel to return false"); 68 check(cf.toString().contains("[Completed normally]")); 69 check(cf.complete(null) == false, "Expected complete() to fail"); 70 check(cf.completeExceptionally(new Throwable()) == false, 71 "Expected completeExceptionally() to fail"); 72 } 73 74 static <T> void checkCompletedExceptionally(CompletableFuture<T> cf) 75 throws Exception 76 { 77 checkCompletedExceptionally(cf, false); 78 } 79 80 @SuppressWarnings("unchecked") 81 static <T> void checkCompletedExceptionally(CompletableFuture<T> cf, boolean cancelled) 82 throws Exception 83 { 84 try { cf.join(); fail("Excepted exception to be thrown"); } 85 catch (CompletionException x) { if (cancelled) fail(); else pass(); } 86 catch (CancellationException x) { if (cancelled) pass(); else fail(); } 87 try { cf.getNow(null); fail("Excepted exception to be thrown"); } 88 catch (CompletionException x) { if (cancelled) fail(); else pass(); } 89 catch (CancellationException x) { if (cancelled) pass(); else fail(); } 90 try { cf.get(); fail("Excepted exception to be thrown");} 91 catch (CancellationException x) { if (cancelled) pass(); else fail(); } 92 catch (ExecutionException x) { if (cancelled) check(x.getCause() instanceof CancellationException); else pass(); } 93 try { cf.get(0L, SECONDS); fail("Excepted exception to be thrown");} 94 catch (CancellationException x) { if (cancelled) pass(); else fail(); } 95 catch (ExecutionException x) { if (cancelled) check(x.getCause() instanceof CancellationException); else pass(); } 96 check(cf.isDone(), "Expected isDone to be true, got:" + cf); 97 check(cf.isCancelled() == cancelled, "Expected isCancelled: " + cancelled + ", got:" + cf.isCancelled()); 98 check(cf.cancel(true) == cancelled, "Expected cancel: " + cancelled + ", got:" + cf.cancel(true)); 99 check(cf.toString().contains("[Completed exceptionally]")); // ## TODO: 'E'xceptionally 100 check(cf.complete((T)new Object()) == false, "Expected complete() to fail"); 101 check(cf.completeExceptionally(new Throwable()) == false, 102 "Expected completeExceptionally() to fail, already completed"); 103 } 104 105 private static void realMain(String[] args) throws Throwable { 106 ExecutorService executor = Executors.newFixedThreadPool(2); 107 try { 108 test(executor); 109 } finally { 110 executor.shutdown(); 111 executor.awaitTermination(30, SECONDS); 112 } 113 } 114 115 static AtomicInteger atomicInt = new AtomicInteger(0); 116 117 private static void test(ExecutorService executor) throws Throwable { 118 119 Thread.currentThread().setName("mainThread"); 120 121 //---------------------------------------------------------------- 122 // supplyAsync tests 123 //---------------------------------------------------------------- 124 try { 125 CompletableFuture<String> cf = supplyAsync(() -> "a test string"); 126 checkCompletedNormally(cf, cf.join()); 127 cf = supplyAsync(() -> "a test string", commonPool()); 128 checkCompletedNormally(cf, cf.join()); 129 cf = supplyAsync(() -> "a test string", executor); 130 checkCompletedNormally(cf, cf.join()); 131 cf = supplyAsync(() -> { throw new RuntimeException(); }); 132 checkCompletedExceptionally(cf); 133 cf = supplyAsync(() -> { throw new RuntimeException(); }, commonPool()); 134 checkCompletedExceptionally(cf); 135 cf = supplyAsync(() -> { throw new RuntimeException(); }, executor); 136 checkCompletedExceptionally(cf); 137 } catch (Throwable t) { unexpected(t); } 138 139 //---------------------------------------------------------------- 140 // runAsync tests 141 //---------------------------------------------------------------- 142 try { 143 CompletableFuture<Void> cf = runAsync(() -> { }); 144 checkCompletedNormally(cf, cf.join()); 145 cf = runAsync(() -> { }, commonPool()); 146 checkCompletedNormally(cf, cf.join()); 147 cf = runAsync(() -> { }, executor); 148 checkCompletedNormally(cf, cf.join()); 149 cf = runAsync(() -> { throw new RuntimeException(); }); 150 checkCompletedExceptionally(cf); 151 cf = runAsync(() -> { throw new RuntimeException(); }, commonPool()); 152 checkCompletedExceptionally(cf); 153 cf = runAsync(() -> { throw new RuntimeException(); }, executor); 154 checkCompletedExceptionally(cf); 155 } catch (Throwable t) { unexpected(t); } 156 157 //---------------------------------------------------------------- 158 // explicit completion 159 //---------------------------------------------------------------- 160 try { 161 final Phaser phaser = new Phaser(1); 162 final int phase = phaser.getPhase(); 163 CompletableFuture<Integer> cf; 164 cf = supplyAsync(() -> { phaser.awaitAdvance(phase); return 1; }); 165 cf.complete(2); 166 phaser.arrive(); 167 checkCompletedNormally(cf, 2); 168 169 cf = supplyAsync(() -> { phaser.awaitAdvance(phase+1); return 1; }); 170 cf.completeExceptionally(new Throwable()); 171 phaser.arrive(); 172 checkCompletedExceptionally(cf); 173 174 cf = supplyAsync(() -> { phaser.awaitAdvance(phase+2); return 1; }); 175 cf.cancel(true); 176 phaser.arrive(); 177 checkCompletedExceptionally(cf, true); 178 179 cf = supplyAsync(() -> { phaser.awaitAdvance(phase+3); return 1; }); 180 check(cf.getNow(2) == 2); 181 phaser.arrive(); 182 checkCompletedNormally(cf, 1); 183 check(cf.getNow(2) == 1); 184 } catch (Throwable t) { unexpected(t); } 185 186 //---------------------------------------------------------------- 187 // thenApplyXXX tests 188 //---------------------------------------------------------------- 189 try { 190 CompletableFuture<Integer> cf2; 191 CompletableFuture<String> cf1 = supplyAsync(() -> "a test string"); 192 cf2 = cf1.thenApply((x) -> { if (x.equals("a test string")) return 1; else return 0; }); 193 checkCompletedNormally(cf1, "a test string"); 194 checkCompletedNormally(cf2, 1); 195 196 cf1 = supplyAsync(() -> "a test string"); 197 cf2 = cf1.thenApplyAsync((x) -> { if (x.equals("a test string")) return 1; else return 0; }); 198 checkCompletedNormally(cf1, "a test string"); 199 checkCompletedNormally(cf2, 1); 200 201 cf1 = supplyAsync(() -> "a test string"); 202 cf2 = cf1.thenApplyAsync((x) -> { if (x.equals("a test string")) return 1; else return 0; }, executor); 203 checkCompletedNormally(cf1, "a test string"); 204 checkCompletedNormally(cf2, 1); 205 206 cf1 = supplyAsync(() -> { throw new RuntimeException(); }); 207 cf2 = cf1.thenApply((x) -> { return 0; } ); 208 checkCompletedExceptionally(cf1); 209 checkCompletedExceptionally(cf2); 210 211 cf1 = supplyAsync(() -> { throw new RuntimeException(); }); 212 cf2 = cf1.thenApplyAsync((x) -> { return 0; } ); 213 checkCompletedExceptionally(cf1); 214 checkCompletedExceptionally(cf2); 215 216 cf1 = supplyAsync(() -> { throw new RuntimeException(); }); 217 cf2 = cf1.thenApplyAsync((x) -> { return 0; }, executor); 218 checkCompletedExceptionally(cf1); 219 checkCompletedExceptionally(cf2); 220 } catch (Throwable t) { unexpected(t); } 221 222 //---------------------------------------------------------------- 223 // thenAcceptXXX tests 224 //---------------------------------------------------------------- 225 try { 226 CompletableFuture<Void> cf2; 227 int before = atomicInt.get(); 228 CompletableFuture<String> cf1 = supplyAsync(() -> "a test string"); 229 cf2 = cf1.thenAccept((x) -> { if (x.equals("a test string")) { atomicInt.incrementAndGet(); return; } throw new RuntimeException(); }); 230 checkCompletedNormally(cf1, "a test string"); 231 checkCompletedNormally(cf2, null); 232 check(atomicInt.get() == (before + 1)); 233 234 before = atomicInt.get(); 235 cf1 = supplyAsync(() -> "a test string"); 236 cf2 = cf1.thenAcceptAsync((x) -> { if (x.equals("a test string")) { atomicInt.incrementAndGet(); return; } throw new RuntimeException(); }); 237 checkCompletedNormally(cf1, "a test string"); 238 checkCompletedNormally(cf2, null); 239 check(atomicInt.get() == (before + 1)); 240 241 before = atomicInt.get(); 242 cf1 = supplyAsync(() -> "a test string"); 243 cf2 = cf1.thenAcceptAsync((x) -> { if (x.equals("a test string")) { atomicInt.incrementAndGet(); return; } throw new RuntimeException(); }, executor); 244 checkCompletedNormally(cf1, "a test string"); 245 checkCompletedNormally(cf2, null); 246 check(atomicInt.get() == (before + 1)); 247 248 before = atomicInt.get(); 249 cf1 = supplyAsync(() -> { throw new RuntimeException(); }); 250 cf2 = cf1.thenAccept((x) -> { atomicInt.incrementAndGet(); } ); 251 checkCompletedExceptionally(cf1); 252 checkCompletedExceptionally(cf2); 253 check(atomicInt.get() == before); 254 255 cf1 = supplyAsync(() -> { throw new RuntimeException(); }); 256 cf2 = cf1.thenAcceptAsync((x) -> { atomicInt.incrementAndGet(); } ); 257 checkCompletedExceptionally(cf1); 258 checkCompletedExceptionally(cf2); 259 check(atomicInt.get() == before); 260 261 cf1 = supplyAsync(() -> { throw new RuntimeException(); }); 262 cf2 = cf1.thenAcceptAsync((x) -> { atomicInt.incrementAndGet(); }, executor ); 263 checkCompletedExceptionally(cf1); 264 checkCompletedExceptionally(cf2); 265 check(atomicInt.get() == before); 266 } catch (Throwable t) { unexpected(t); } 267 268 //---------------------------------------------------------------- 269 // thenRunXXX tests 270 //---------------------------------------------------------------- 271 try { 272 CompletableFuture<Void> cf2; 273 int before = atomicInt.get(); 274 CompletableFuture<String> cf1 = supplyAsync(() -> "a test string"); 275 cf2 = cf1.thenRun(() -> { atomicInt.incrementAndGet(); }); 276 checkCompletedNormally(cf1, "a test string"); 277 checkCompletedNormally(cf2, null); 278 check(atomicInt.get() == (before + 1)); 279 280 before = atomicInt.get(); 281 cf1 = supplyAsync(() -> "a test string"); 282 cf2 = cf1.thenRunAsync(() -> { atomicInt.incrementAndGet(); }); 283 checkCompletedNormally(cf1, "a test string"); 284 checkCompletedNormally(cf2, null); 285 check(atomicInt.get() == (before + 1)); 286 287 before = atomicInt.get(); 288 cf1 = supplyAsync(() -> "a test string"); 289 cf2 = cf1.thenRunAsync(() -> { atomicInt.incrementAndGet(); }, executor); 290 checkCompletedNormally(cf1, "a test string"); 291 checkCompletedNormally(cf2, null); 292 check(atomicInt.get() == (before + 1)); 293 294 before = atomicInt.get(); 295 cf1 = supplyAsync(() -> { throw new RuntimeException(); }); 296 cf2 = cf1.thenRun(() -> { atomicInt.incrementAndGet(); }); 297 checkCompletedExceptionally(cf1); 298 checkCompletedExceptionally(cf2); 299 check(atomicInt.get() == before); 300 301 cf1 = supplyAsync(() -> { throw new RuntimeException(); }); 302 cf2 = cf1.thenRunAsync(() -> { atomicInt.incrementAndGet(); }); 303 checkCompletedExceptionally(cf1); 304 checkCompletedExceptionally(cf2); 305 check(atomicInt.get() == before); 306 307 cf1 = supplyAsync(() -> { throw new RuntimeException(); }); 308 cf2 = cf1.thenRunAsync(() -> { atomicInt.incrementAndGet(); }, executor); 309 checkCompletedExceptionally(cf1); 310 checkCompletedExceptionally(cf2); 311 check(atomicInt.get() == before); 312 } catch (Throwable t) { unexpected(t); } 313 314 //---------------------------------------------------------------- 315 // thenCombineXXX tests 316 //---------------------------------------------------------------- 317 try { 318 CompletableFuture<Integer> cf3; 319 CompletableFuture<Integer> cf1 = supplyAsync(() -> 1); 320 CompletableFuture<Integer> cf2 = supplyAsync(() -> 1); 321 cf3 = cf1.thenCombine(cf2, (x, y) -> { return x + y; }); 322 checkCompletedNormally(cf1, 1); 323 checkCompletedNormally(cf2, 1); 324 checkCompletedNormally(cf3, 2); 325 326 cf1 = supplyAsync(() -> 1); 327 cf2 = supplyAsync(() -> 1); 328 cf3 = cf1.thenCombineAsync(cf2, (x, y) -> { return x + y; }); 329 checkCompletedNormally(cf1, 1); 330 checkCompletedNormally(cf2, 1); 331 checkCompletedNormally(cf3, 2); 332 333 cf1 = supplyAsync(() -> 1); 334 cf2 = supplyAsync(() -> 1); 335 cf3 = cf1.thenCombineAsync(cf2, (x, y) -> { return x + y; }, executor); 336 checkCompletedNormally(cf1, 1); 337 checkCompletedNormally(cf2, 1); 338 checkCompletedNormally(cf3, 2); 339 340 cf1 = supplyAsync(() -> { throw new RuntimeException(); }); 341 cf2 = supplyAsync(() -> 1); 342 cf3 = cf1.thenCombine(cf2, (x, y) -> { return 0; }); 343 checkCompletedExceptionally(cf1); 344 checkCompletedNormally(cf2, 1); 345 checkCompletedExceptionally(cf3); 346 347 cf1 = supplyAsync(() -> 1); 348 cf2 = supplyAsync(() -> { throw new RuntimeException(); }); 349 cf3 = cf1.thenCombineAsync(cf2, (x, y) -> { return 0; }); 350 checkCompletedNormally(cf1, 1); 351 checkCompletedExceptionally(cf2); 352 checkCompletedExceptionally(cf3); 353 354 cf1 = supplyAsync(() -> { throw new RuntimeException(); }); 355 cf2 = supplyAsync(() -> { throw new RuntimeException(); }); 356 cf3 = cf1.thenCombineAsync(cf2, (x, y) -> { return 0; }, executor); 357 checkCompletedExceptionally(cf1); 358 checkCompletedExceptionally(cf2); 359 checkCompletedExceptionally(cf3); 360 } catch (Throwable t) { unexpected(t); } 361 362 //---------------------------------------------------------------- 363 // thenAcceptBothXXX tests 364 //---------------------------------------------------------------- 365 try { 366 CompletableFuture<Void> cf3; 367 int before = atomicInt.get(); 368 CompletableFuture<Integer> cf1 = supplyAsync(() -> 1); 369 CompletableFuture<Integer> cf2 = supplyAsync(() -> 1); 370 cf3 = cf1.thenAcceptBoth(cf2, (x, y) -> { check(x + y == 2); atomicInt.incrementAndGet(); }); 371 checkCompletedNormally(cf1, 1); 372 checkCompletedNormally(cf2, 1); 373 checkCompletedNormally(cf3, null); 374 check(atomicInt.get() == (before + 1)); 375 376 before = atomicInt.get(); 377 cf1 = supplyAsync(() -> 1); 378 cf2 = supplyAsync(() -> 1); 379 cf3 = cf1.thenAcceptBothAsync(cf2, (x, y) -> { check(x + y == 2); atomicInt.incrementAndGet(); }); 380 checkCompletedNormally(cf1, 1); 381 checkCompletedNormally(cf2, 1); 382 checkCompletedNormally(cf3, null); 383 check(atomicInt.get() == (before + 1)); 384 385 before = atomicInt.get(); 386 cf1 = supplyAsync(() -> 1); 387 cf2 = supplyAsync(() -> 1); 388 cf3 = cf1.thenAcceptBothAsync(cf2, (x, y) -> { check(x + y == 2); atomicInt.incrementAndGet(); }, executor); 389 checkCompletedNormally(cf1, 1); 390 checkCompletedNormally(cf2, 1); 391 checkCompletedNormally(cf3, null); 392 check(atomicInt.get() == (before + 1)); 393 394 before = atomicInt.get(); 395 cf1 = supplyAsync(() -> { throw new RuntimeException(); }); 396 cf2 = supplyAsync(() -> 1); 397 cf3 = cf1.thenAcceptBoth(cf2, (x, y) -> { atomicInt.incrementAndGet(); }); 398 checkCompletedExceptionally(cf1); 399 checkCompletedNormally(cf2, 1); 400 checkCompletedExceptionally(cf3); 401 check(atomicInt.get() == before); 402 403 cf1 = supplyAsync(() -> 1); 404 cf2 = supplyAsync(() -> { throw new RuntimeException(); }); 405 cf3 = cf1.thenAcceptBothAsync(cf2, (x, y) -> { atomicInt.incrementAndGet(); }); 406 checkCompletedNormally(cf1, 1); 407 checkCompletedExceptionally(cf2); 408 checkCompletedExceptionally(cf3); 409 check(atomicInt.get() == before); 410 411 cf1 = supplyAsync(() -> { throw new RuntimeException(); }); 412 cf2 = supplyAsync(() -> { throw new RuntimeException(); }); 413 cf3 = cf1.thenAcceptBothAsync(cf2, (x, y) -> { atomicInt.incrementAndGet(); }, executor); 414 checkCompletedExceptionally(cf1); 415 checkCompletedExceptionally(cf2); 416 checkCompletedExceptionally(cf3); 417 check(atomicInt.get() == before); 418 } catch (Throwable t) { unexpected(t); } 419 420 //---------------------------------------------------------------- 421 // runAfterBothXXX tests 422 //---------------------------------------------------------------- 423 try { 424 CompletableFuture<Void> cf3; 425 int before = atomicInt.get(); 426 CompletableFuture<Integer> cf1 = supplyAsync(() -> 1); 427 CompletableFuture<Integer> cf2 = supplyAsync(() -> 1); 428 cf3 = cf1.runAfterBoth(cf2, () -> { check(cf1.isDone()); check(cf2.isDone()); atomicInt.incrementAndGet(); }); 429 checkCompletedNormally(cf1, 1); 430 checkCompletedNormally(cf2, 1); 431 checkCompletedNormally(cf3, null); 432 check(atomicInt.get() == (before + 1)); 433 434 before = atomicInt.get(); 435 CompletableFuture<Integer> cfa = supplyAsync(() -> 1); 436 CompletableFuture<Integer> cfb = supplyAsync(() -> 1); 437 cf3 = cfa.runAfterBothAsync(cfb, () -> { check(cfa.isDone()); check(cfb.isDone()); atomicInt.incrementAndGet(); }); 438 checkCompletedNormally(cfa, 1); 439 checkCompletedNormally(cfb, 1); 440 checkCompletedNormally(cf3, null); 441 check(atomicInt.get() == (before + 1)); 442 443 before = atomicInt.get(); 444 CompletableFuture<Integer> cfx = supplyAsync(() -> 1); 445 CompletableFuture<Integer> cfy = supplyAsync(() -> 1); 446 cf3 = cfy.runAfterBothAsync(cfx, () -> { check(cfx.isDone()); check(cfy.isDone()); atomicInt.incrementAndGet(); }, executor); 447 checkCompletedNormally(cfx, 1); 448 checkCompletedNormally(cfy, 1); 449 checkCompletedNormally(cf3, null); 450 check(atomicInt.get() == (before + 1)); 451 452 before = atomicInt.get(); 453 CompletableFuture<Integer> cf4 = supplyAsync(() -> { throw new RuntimeException(); }); 454 CompletableFuture<Integer> cf5 = supplyAsync(() -> 1); 455 cf3 = cf5.runAfterBothAsync(cf4, () -> { atomicInt.incrementAndGet(); }, executor); 456 checkCompletedExceptionally(cf4); 457 checkCompletedNormally(cf5, 1); 458 checkCompletedExceptionally(cf3); 459 check(atomicInt.get() == before); 460 461 before = atomicInt.get(); 462 cf4 = supplyAsync(() -> 1); 463 cf5 = supplyAsync(() -> { throw new RuntimeException(); }); 464 cf3 = cf5.runAfterBothAsync(cf4, () -> { atomicInt.incrementAndGet(); }); 465 checkCompletedNormally(cf4, 1); 466 checkCompletedExceptionally(cf5); 467 checkCompletedExceptionally(cf3); 468 check(atomicInt.get() == before); 469 470 before = atomicInt.get(); 471 cf4 = supplyAsync(() -> { throw new RuntimeException(); }); 472 cf5 = supplyAsync(() -> { throw new RuntimeException(); }); 473 cf3 = cf5.runAfterBoth(cf4, () -> { atomicInt.incrementAndGet(); }); 474 checkCompletedExceptionally(cf4); 475 checkCompletedExceptionally(cf5); 476 checkCompletedExceptionally(cf3); 477 check(atomicInt.get() == before); 478 } catch (Throwable t) { unexpected(t); } 479 480 //---------------------------------------------------------------- 481 // applyToEitherXXX tests 482 //---------------------------------------------------------------- 483 try { 484 CompletableFuture<Integer> cf3; 485 CompletableFuture<Integer> cf1 = supplyAsync(() -> 1); 486 CompletableFuture<Integer> cf2 = supplyAsync(() -> 2); 487 cf3 = cf1.applyToEither(cf2, (x) -> { check(x == 1 || x == 2); return x; }); 488 check(cf1.isDone() || cf2.isDone()); 489 checkCompletedNormally(cf3, new Object[] {1, 2}); 490 491 cf1 = supplyAsync(() -> 1); 492 cf2 = supplyAsync(() -> 2); 493 cf3 = cf1.applyToEitherAsync(cf2, (x) -> { check(x == 1 || x == 2); return x; }); 494 check(cf1.isDone() || cf2.isDone()); 495 checkCompletedNormally(cf3, new Object[] {1, 2}); 496 497 cf1 = supplyAsync(() -> 1); 498 cf2 = supplyAsync(() -> 2); 499 cf3 = cf1.applyToEitherAsync(cf2, (x) -> { check(x == 1 || x == 2); return x; }, executor); 500 check(cf1.isDone() || cf2.isDone()); 501 checkCompletedNormally(cf3, new Object[] {1, 2}); 502 503 cf1 = supplyAsync(() -> { throw new RuntimeException(); }); 504 cf2 = supplyAsync(() -> 2); 505 cf3 = cf1.applyToEither(cf2, (x) -> { check(x == 2); return x; }); 506 check(cf1.isDone() || cf2.isDone()); 507 try { check(cf3.join() == 1); } catch (CompletionException x) { pass(); } 508 check(cf3.isDone()); 509 510 cf1 = supplyAsync(() -> 1); 511 cf2 = supplyAsync(() -> { throw new RuntimeException(); }); 512 cf3 = cf1.applyToEitherAsync(cf2, (x) -> { check(x == 1); return x; }); 513 check(cf1.isDone() || cf2.isDone()); 514 try { check(cf3.join() == 1); } catch (CompletionException x) { pass(); } 515 check(cf3.isDone()); 516 517 cf1 = supplyAsync(() -> { throw new RuntimeException(); }); 518 cf2 = supplyAsync(() -> { throw new RuntimeException(); }); 519 cf3 = cf1.applyToEitherAsync(cf2, (x) -> { fail(); return x; }); 520 check(cf1.isDone() || cf2.isDone()); 521 checkCompletedExceptionally(cf3); 522 } catch (Throwable t) { unexpected(t); } 523 524 //---------------------------------------------------------------- 525 // acceptEitherXXX tests 526 //---------------------------------------------------------------- 527 try { 528 CompletableFuture<Void> cf3; 529 int before = atomicInt.get(); 530 CompletableFuture<Integer> cf1 = supplyAsync(() -> 1); 531 CompletableFuture<Integer> cf2 = supplyAsync(() -> 2); 532 cf3 = cf1.acceptEither(cf2, (x) -> { check(x == 1 || x == 2); atomicInt.incrementAndGet(); }); 533 check(cf1.isDone() || cf2.isDone()); 534 checkCompletedNormally(cf3, null); 535 check(atomicInt.get() == (before + 1)); 536 537 before = atomicInt.get(); 538 cf1 = supplyAsync(() -> 1); 539 cf2 = supplyAsync(() -> 2); 540 cf3 = cf1.acceptEitherAsync(cf2, (x) -> { check(x == 1 || x == 2); atomicInt.incrementAndGet(); }); 541 check(cf1.isDone() || cf2.isDone()); 542 checkCompletedNormally(cf3, null); 543 check(atomicInt.get() == (before + 1)); 544 545 before = atomicInt.get(); 546 cf1 = supplyAsync(() -> 1); 547 cf2 = supplyAsync(() -> 2); 548 cf3 = cf2.acceptEitherAsync(cf1, (x) -> { check(x == 1 || x == 2); atomicInt.incrementAndGet(); }, executor); 549 check(cf1.isDone() || cf2.isDone()); 550 checkCompletedNormally(cf3, null); 551 check(atomicInt.get() == (before + 1)); 552 553 cf1 = supplyAsync(() -> { throw new RuntimeException(); }); 554 cf2 = supplyAsync(() -> 2); 555 cf3 = cf2.acceptEitherAsync(cf1, (x) -> { check(x == 2); }, executor); 556 check(cf1.isDone() || cf2.isDone()); 557 try { check(cf3.join() == null); } catch (CompletionException x) { pass(); } 558 check(cf3.isDone()); 559 560 cf1 = supplyAsync(() -> 1); 561 cf2 = supplyAsync(() -> { throw new RuntimeException(); }); 562 cf3 = cf2.acceptEitherAsync(cf1, (x) -> { check(x == 1); }); 563 check(cf1.isDone() || cf2.isDone()); 564 try { check(cf3.join() == null); } catch (CompletionException x) { pass(); } 565 check(cf3.isDone()); 566 567 cf1 = supplyAsync(() -> { throw new RuntimeException(); }); 568 cf2 = supplyAsync(() -> { throw new RuntimeException(); }); 569 cf3 = cf2.acceptEitherAsync(cf1, (x) -> { fail(); }); 570 check(cf1.isDone() || cf2.isDone()); 571 checkCompletedExceptionally(cf3); 572 } catch (Throwable t) { unexpected(t); } 573 574 //---------------------------------------------------------------- 575 // runAfterEitherXXX tests 576 //---------------------------------------------------------------- 577 try { 578 CompletableFuture<Void> cf3; 579 int before = atomicInt.get(); 580 CompletableFuture<Void> cf1 = runAsync(() -> { }); 581 CompletableFuture<Void> cf2 = runAsync(() -> { }); 582 cf3 = cf1.runAfterEither(cf2, () -> { atomicInt.incrementAndGet(); }); 583 check(cf1.isDone() || cf2.isDone()); 584 checkCompletedNormally(cf3, null); 585 check(atomicInt.get() == (before + 1)); 586 587 before = atomicInt.get(); 588 cf1 = runAsync(() -> { }); 589 cf2 = runAsync(() -> { }); 590 cf3 = cf1.runAfterEitherAsync(cf2, () -> { atomicInt.incrementAndGet(); }); 591 check(cf1.isDone() || cf2.isDone()); 592 checkCompletedNormally(cf3, null); 593 check(atomicInt.get() == (before + 1)); 594 595 before = atomicInt.get(); 596 cf1 = runAsync(() -> { }); 597 cf2 = runAsync(() -> { }); 598 cf3 = cf2.runAfterEitherAsync(cf1, () -> { atomicInt.incrementAndGet(); }, executor); 599 check(cf1.isDone() || cf2.isDone()); 600 checkCompletedNormally(cf3, null); 601 check(atomicInt.get() == (before + 1)); 602 603 before = atomicInt.get(); 604 cf1 = runAsync(() -> { throw new RuntimeException(); }); 605 cf2 = runAsync(() -> { }); 606 cf3 = cf2.runAfterEither(cf1, () -> { atomicInt.incrementAndGet(); }); 607 check(cf1.isDone() || cf2.isDone()); 608 try { check(cf3.join() == null); } catch (CompletionException x) { pass(); } 609 check(cf3.isDone()); 610 check(atomicInt.get() == (before + 1)); 611 612 before = atomicInt.get(); 613 cf1 = runAsync(() -> { }); 614 cf2 = runAsync(() -> { throw new RuntimeException(); }); 615 cf3 = cf1.runAfterEitherAsync(cf2, () -> { atomicInt.incrementAndGet(); }); 616 check(cf1.isDone() || cf2.isDone()); 617 try { check(cf3.join() == null); } catch (CompletionException x) { pass(); } 618 check(cf3.isDone()); 619 check(atomicInt.get() == (before + 1)); 620 621 before = atomicInt.get(); 622 cf1 = runAsync(() -> { throw new RuntimeException(); }); 623 cf2 = runAsync(() -> { throw new RuntimeException(); }); 624 cf3 = cf2.runAfterEitherAsync(cf1, () -> { atomicInt.incrementAndGet(); }, executor); 625 check(cf1.isDone() || cf2.isDone()); 626 checkCompletedExceptionally(cf3); 627 check(atomicInt.get() == before); 628 } catch (Throwable t) { unexpected(t); } 629 630 //---------------------------------------------------------------- 631 // thenComposeXXX tests 632 //---------------------------------------------------------------- 633 try { 634 CompletableFuture<Void> cf3; 635 int before = atomicInt.get(); 636 CompletableFuture<Void> cf1 = runAsync(() -> { }); 637 CompletableFuture<Void> cf2 = runAsync(() -> { }); 638 cf3 = cf1.runAfterEither(cf2, () -> { atomicInt.incrementAndGet(); }); 639 check(cf1.isDone() || cf2.isDone()); 640 checkCompletedNormally(cf3, null); 641 check(atomicInt.get() == (before + 1)); 642 643 before = atomicInt.get(); 644 cf1 = runAsync(() -> { }); 645 cf2 = runAsync(() -> { }); 646 cf3 = cf1.runAfterEitherAsync(cf2, () -> { atomicInt.incrementAndGet(); }); 647 check(cf1.isDone() || cf2.isDone()); 648 checkCompletedNormally(cf3, null); 649 check(atomicInt.get() == (before + 1)); 650 651 before = atomicInt.get(); 652 cf1 = runAsync(() -> { }); 653 cf2 = runAsync(() -> { }); 654 cf3 = cf2.runAfterEitherAsync(cf1, () -> { atomicInt.incrementAndGet(); }, executor); 655 check(cf1.isDone() || cf2.isDone()); 656 checkCompletedNormally(cf3, null); 657 check(atomicInt.get() == (before + 1)); 658 659 before = atomicInt.get(); 660 cf1 = runAsync(() -> { throw new RuntimeException(); }); 661 cf2 = runAsync(() -> { }); 662 cf3 = cf2.runAfterEither(cf1, () -> { atomicInt.incrementAndGet(); }); 663 check(cf1.isDone() || cf2.isDone()); 664 try { check(cf3.join() == null); } catch (CompletionException x) { pass(); } 665 check(cf3.isDone()); 666 check(atomicInt.get() == (before + 1)); 667 668 before = atomicInt.get(); 669 cf1 = runAsync(() -> { }); 670 cf2 = runAsync(() -> { throw new RuntimeException(); }); 671 cf3 = cf1.runAfterEitherAsync(cf2, () -> { atomicInt.incrementAndGet(); }); 672 check(cf1.isDone() || cf2.isDone()); 673 try { check(cf3.join() == null); } catch (CompletionException x) { pass(); } 674 check(cf3.isDone()); 675 check(atomicInt.get() == (before + 1)); 676 677 before = atomicInt.get(); 678 cf1 = runAsync(() -> { throw new RuntimeException(); }); 679 cf2 = runAsync(() -> { throw new RuntimeException(); }); 680 cf3 = cf2.runAfterEitherAsync(cf1, () -> { atomicInt.incrementAndGet(); }, executor); 681 check(cf1.isDone() || cf2.isDone()); 682 checkCompletedExceptionally(cf3); 683 check(atomicInt.get() == before); 684 } catch (Throwable t) { unexpected(t); } 685 686 } 687 688 //--------------------- Infrastructure --------------------------- 689 static volatile int passed = 0, failed = 0; 690 static void pass() {passed++;} 691 static void fail() {failed++; Thread.dumpStack();} 692 static void fail(String msg) {System.out.println(msg); fail();} 693 static void unexpected(Throwable t) {failed++; t.printStackTrace();} 694 static void check(boolean cond) {if (cond) pass(); else fail();} 695 static void check(boolean cond, String msg) {if (cond) pass(); else fail(msg);} 696 static void equal(Object x, Object y) { 697 if (x == null ? y == null : x.equals(y)) pass(); 698 else fail(x + " not equal to " + y);} 699 static void equalAnyOf(Object x, Object[] y) { 700 if (x == null && y == null) { pass(); return; } 701 for (Object z : y) { if (x.equals(z)) { pass(); return; } } 702 fail(x + " not equal to one of " + y);} 703 public static void main(String[] args) throws Throwable { 704 try {realMain(args);} catch (Throwable t) {unexpected(t);} 705 System.out.printf("%nPassed = %d, failed = %d%n%n", passed, failed); 706 if (failed > 0) throw new AssertionError("Some tests failed");} 707 }