--- old/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java 2017-12-15 15:27:28.541027225 +0000 +++ new/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java 2017-12-15 15:27:28.305032340 +0000 @@ -47,6 +47,8 @@ import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Flow; +import java.util.concurrent.Flow.Subscriber; +import java.util.concurrent.Flow.Subscription; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.function.Function; @@ -558,4 +560,51 @@ return cf; } } + + /** An adapter between {@code BodySubscriber} and {@code Flow.Subscriber}. */ + static final class SubscriberAdapter>,R> + implements HttpResponse.BodySubscriber + { + private final CompletableFuture cf = new MinimalFuture<>(); + private final S subscriber; + private final Function finisher; + + SubscriberAdapter(S subscriber, Function finisher) { + this.subscriber = Objects.requireNonNull(subscriber); + this.finisher = Objects.requireNonNull(finisher); + } + + @Override + public void onSubscribe(Subscription subscription) { + subscriber.onSubscribe(subscription); + } + + @Override + public void onNext(List item) { + subscriber.onNext(item); + } + + @Override + public void onError(Throwable throwable) { + try { + subscriber.onError(throwable); + } finally { + cf.completeExceptionally(throwable); + } + } + + @Override + public void onComplete() { + try { + subscriber.onComplete(); + } finally { + cf.complete(finisher.apply(subscriber)); + } + } + + @Override + public CompletionStage getBody() { + return cf; + } + } }