< prev index next >

src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java

Print this page

        

*** 45,54 **** --- 45,56 ---- import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Flow; + import java.util.concurrent.Flow.Subscriber; + import java.util.concurrent.Flow.Subscription; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.function.Function; import jdk.incubator.http.internal.common.MinimalFuture; import jdk.incubator.http.internal.common.Utils;
*** 556,561 **** --- 558,610 ---- @Override public CompletionStage<T> getBody() { return cf; } } + + /** An adapter between {@code BodySubscriber} and {@code Flow.Subscriber}. */ + static final class SubscriberAdapter<S extends Subscriber<? super List<ByteBuffer>>,R> + implements HttpResponse.BodySubscriber<R> + { + private final CompletableFuture<R> cf = new MinimalFuture<>(); + private final S subscriber; + private final Function<S,R> finisher; + + SubscriberAdapter(S subscriber, Function<S,R> finisher) { + this.subscriber = Objects.requireNonNull(subscriber); + this.finisher = Objects.requireNonNull(finisher); + } + + @Override + public void onSubscribe(Subscription subscription) { + subscriber.onSubscribe(subscription); + } + + @Override + public void onNext(List<ByteBuffer> item) { + subscriber.onNext(item); + } + + @Override + public void onError(Throwable throwable) { + try { + subscriber.onError(throwable); + } finally { + cf.completeExceptionally(throwable); + } + } + + @Override + public void onComplete() { + try { + subscriber.onComplete(); + } finally { + cf.complete(finisher.apply(subscriber)); + } + } + + @Override + public CompletionStage<R> getBody() { + return cf; + } + } }
< prev index next >