< prev index next >

src/java.base/share/classes/java/util/concurrent/CompletableFuture.java

Print this page
8210971: Add exception handling methods to CompletionStage and CompletableFuture
Reviewed-by: martin, chegar

*** 955,975 **** } @SuppressWarnings("serial") static final class UniExceptionally<T> extends UniCompletion<T,T> { Function<? super Throwable, ? extends T> fn; ! UniExceptionally(CompletableFuture<T> dep, CompletableFuture<T> src, Function<? super Throwable, ? extends T> fn) { ! super(null, dep, src); this.fn = fn; } ! final CompletableFuture<T> tryFire(int mode) { // never ASYNC ! // assert mode != ASYNC; CompletableFuture<T> d; CompletableFuture<T> a; Object r; Function<? super Throwable, ? extends T> f; if ((d = dep) == null || (f = fn) == null || (a = src) == null || (r = a.result) == null ! || !d.uniExceptionally(r, f, this)) return null; dep = null; src = null; fn = null; return d.postFire(a, mode); } } --- 955,975 ---- } @SuppressWarnings("serial") static final class UniExceptionally<T> extends UniCompletion<T,T> { Function<? super Throwable, ? extends T> fn; ! UniExceptionally(Executor executor, ! CompletableFuture<T> dep, CompletableFuture<T> src, Function<? super Throwable, ? extends T> fn) { ! super(executor, dep, src); this.fn = fn; } ! final CompletableFuture<T> tryFire(int mode) { CompletableFuture<T> d; CompletableFuture<T> a; Object r; Function<? super Throwable, ? extends T> f; if ((d = dep) == null || (f = fn) == null || (a = src) == null || (r = a.result) == null ! || !d.uniExceptionally(r, f, mode > 0 ? null : this)) return null; dep = null; src = null; fn = null; return d.postFire(a, mode); } }
*** 978,1009 **** Function<? super Throwable, ? extends T> f, UniExceptionally<T> c) { Throwable x; if (result == null) { try { - if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) { if (c != null && !c.claim()) return false; completeValue(f.apply(x)); ! } else internalComplete(r); } catch (Throwable ex) { completeThrowable(ex); } } return true; } private CompletableFuture<T> uniExceptionallyStage( ! Function<Throwable, ? extends T> f) { if (f == null) throw new NullPointerException(); CompletableFuture<T> d = newIncompleteFuture(); Object r; if ((r = result) == null) ! unipush(new UniExceptionally<T>(d, this, f)); ! else d.uniExceptionally(r, f, null); return d; } @SuppressWarnings("serial") static final class UniRelay<U, T extends U> extends UniCompletion<T,U> { --- 978,1083 ---- Function<? super Throwable, ? extends T> f, UniExceptionally<T> c) { Throwable x; if (result == null) { try { if (c != null && !c.claim()) return false; + if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) completeValue(f.apply(x)); ! else internalComplete(r); } catch (Throwable ex) { completeThrowable(ex); } } return true; } private CompletableFuture<T> uniExceptionallyStage( ! Executor e, Function<Throwable, ? extends T> f) { if (f == null) throw new NullPointerException(); CompletableFuture<T> d = newIncompleteFuture(); Object r; if ((r = result) == null) ! unipush(new UniExceptionally<T>(e, d, this, f)); ! else if (e == null) d.uniExceptionally(r, f, null); + else { + try { + e.execute(new UniExceptionally<T>(null, d, this, f)); + } catch (Throwable ex) { + d.result = encodeThrowable(ex); + } + } + return d; + } + + @SuppressWarnings("serial") + static final class UniComposeExceptionally<T> extends UniCompletion<T,T> { + Function<Throwable, ? extends CompletionStage<T>> fn; + UniComposeExceptionally(Executor executor, CompletableFuture<T> dep, + CompletableFuture<T> src, + Function<Throwable, ? extends CompletionStage<T>> fn) { + super(executor, dep, src); this.fn = fn; + } + final CompletableFuture<T> tryFire(int mode) { + CompletableFuture<T> d; CompletableFuture<T> a; + Function<Throwable, ? extends CompletionStage<T>> f; + Object r; Throwable x; + if ((d = dep) == null || (f = fn) == null + || (a = src) == null || (r = a.result) == null) + return null; + if (d.result == null) { + if ((r instanceof AltResult) && + (x = ((AltResult)r).ex) != null) { + try { + if (mode <= 0 && !claim()) + return null; + CompletableFuture<T> g = f.apply(x).toCompletableFuture(); + if ((r = g.result) != null) + d.completeRelay(r); + else { + g.unipush(new UniRelay<T,T>(d, g)); + if (d.result == null) + return null; + } + } catch (Throwable ex) { + d.completeThrowable(ex); + } + } + else + d.internalComplete(r); + } + dep = null; src = null; fn = null; + return d.postFire(a, mode); + } + } + + private CompletableFuture<T> uniComposeExceptionallyStage( + Executor e, Function<Throwable, ? extends CompletionStage<T>> f) { + if (f == null) throw new NullPointerException(); + CompletableFuture<T> d = newIncompleteFuture(); + Object r, s; Throwable x; + if ((r = result) == null) + unipush(new UniComposeExceptionally<T>(e, d, this, f)); + else if (!(r instanceof AltResult) || (x = ((AltResult)r).ex) == null) + d.internalComplete(r); + else + try { + if (e != null) + e.execute(new UniComposeExceptionally<T>(null, d, this, f)); + else { + CompletableFuture<T> g = f.apply(x).toCompletableFuture(); + if ((s = g.result) != null) + d.result = encodeRelay(s); + else + g.unipush(new UniRelay<T,T>(d, g)); + } + } catch (Throwable ex) { + d.result = encodeThrowable(ex); + } return d; } @SuppressWarnings("serial") static final class UniRelay<U, T extends U> extends UniCompletion<T,U> {
*** 1091,1126 **** if (f == null) throw new NullPointerException(); CompletableFuture<V> d = newIncompleteFuture(); Object r, s; Throwable x; if ((r = result) == null) unipush(new UniCompose<T,V>(e, d, this, f)); ! else if (e == null) { if (r instanceof AltResult) { if ((x = ((AltResult)r).ex) != null) { d.result = encodeThrowable(x, r); return d; } r = null; } try { @SuppressWarnings("unchecked") T t = (T) r; CompletableFuture<V> g = f.apply(t).toCompletableFuture(); if ((s = g.result) != null) d.result = encodeRelay(s); ! else { g.unipush(new UniRelay<V,V>(d, g)); } } catch (Throwable ex) { d.result = encodeThrowable(ex); } } - else - try { - e.execute(new UniCompose<T,V>(null, d, this, f)); - } catch (Throwable ex) { - d.result = encodeThrowable(ex); - } return d; } /* ------------- Two-input Completions -------------- */ --- 1165,1197 ---- if (f == null) throw new NullPointerException(); CompletableFuture<V> d = newIncompleteFuture(); Object r, s; Throwable x; if ((r = result) == null) unipush(new UniCompose<T,V>(e, d, this, f)); ! else { if (r instanceof AltResult) { if ((x = ((AltResult)r).ex) != null) { d.result = encodeThrowable(x, r); return d; } r = null; } try { + if (e != null) + e.execute(new UniCompose<T,V>(null, d, this, f)); + else { @SuppressWarnings("unchecked") T t = (T) r; CompletableFuture<V> g = f.apply(t).toCompletableFuture(); if ((s = g.result) != null) d.result = encodeRelay(s); ! else g.unipush(new UniRelay<V,V>(d, g)); } } catch (Throwable ex) { d.result = encodeThrowable(ex); } } return d; } /* ------------- Two-input Completions -------------- */
*** 1896,1906 **** /** * Creates a new complete CompletableFuture with given encoded result. */ CompletableFuture(Object r) { ! this.result = r; } /** * Returns a new CompletableFuture that is asynchronously completed * by a task running in the {@link ForkJoinPool#commonPool()} with --- 1967,1977 ---- /** * Creates a new complete CompletableFuture with given encoded result. */ CompletableFuture(Object r) { ! RESULT.setRelease(this, r); } /** * Returns a new CompletableFuture that is asynchronously completed * by a task running in the {@link ForkJoinPool#commonPool()} with
*** 2283,2314 **** */ public CompletableFuture<T> toCompletableFuture() { return this; } - // not in interface CompletionStage - - /** - * Returns a new CompletableFuture that is completed when this - * CompletableFuture completes, with the result of the given - * function of the exception triggering this CompletableFuture's - * completion when it completes exceptionally; otherwise, if this - * CompletableFuture completes normally, then the returned - * CompletableFuture also completes normally with the same value. - * Note: More flexible versions of this functionality are - * available using methods {@code whenComplete} and {@code handle}. - * - * @param fn the function to use to compute the value of the - * returned CompletableFuture if this CompletableFuture completed - * exceptionally - * @return the new CompletableFuture - */ public CompletableFuture<T> exceptionally( Function<Throwable, ? extends T> fn) { ! return uniExceptionallyStage(fn); } /* ------------- Arbitrary-arity constructions -------------- */ /** * Returns a new CompletableFuture that is completed when all of --- 2354,2393 ---- */ public CompletableFuture<T> toCompletableFuture() { return this; } public CompletableFuture<T> exceptionally( Function<Throwable, ? extends T> fn) { ! return uniExceptionallyStage(null, fn); } + public CompletableFuture<T> exceptionallyAsync( + Function<Throwable, ? extends T> fn) { + return uniExceptionallyStage(defaultExecutor(), fn); + } + + public CompletableFuture<T> exceptionallyAsync( + Function<Throwable, ? extends T> fn, Executor executor) { + return uniExceptionallyStage(screenExecutor(executor), fn); + } + + public CompletableFuture<T> exceptionallyCompose( + Function<Throwable, ? extends CompletionStage<T>> fn) { + return uniComposeExceptionallyStage(null, fn); + } + + public CompletableFuture<T> exceptionallyComposeAsync( + Function<Throwable, ? extends CompletionStage<T>> fn) { + return uniComposeExceptionallyStage(defaultExecutor(), fn); + } + + public CompletableFuture<T> exceptionallyComposeAsync( + Function<Throwable, ? extends CompletionStage<T>> fn, + Executor executor) { + return uniComposeExceptionallyStage(screenExecutor(executor), fn); + } /* ------------- Arbitrary-arity constructions -------------- */ /** * Returns a new CompletableFuture that is completed when all of
< prev index next >