< prev index next >
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java
Print this page
@@ -45,10 +45,12 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Flow;
+import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import jdk.incubator.http.internal.common.MinimalFuture;
import jdk.incubator.http.internal.common.Utils;
@@ -556,6 +558,53 @@
@Override
public CompletionStage<T> getBody() {
return cf;
}
}
+
+ /** An adapter between {@code BodySubscriber} and {@code Flow.Subscriber}. */
+ static final class SubscriberAdapter<S extends Subscriber<? super List<ByteBuffer>>,R>
+ implements HttpResponse.BodySubscriber<R>
+ {
+ private final CompletableFuture<R> cf = new MinimalFuture<>();
+ private final S subscriber;
+ private final Function<S,R> finisher;
+
+ SubscriberAdapter(S subscriber, Function<S,R> finisher) {
+ this.subscriber = Objects.requireNonNull(subscriber);
+ this.finisher = Objects.requireNonNull(finisher);
+ }
+
+ @Override
+ public void onSubscribe(Subscription subscription) {
+ subscriber.onSubscribe(subscription);
+ }
+
+ @Override
+ public void onNext(List<ByteBuffer> item) {
+ subscriber.onNext(item);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ try {
+ subscriber.onError(throwable);
+ } finally {
+ cf.completeExceptionally(throwable);
+ }
+ }
+
+ @Override
+ public void onComplete() {
+ try {
+ subscriber.onComplete();
+ } finally {
+ cf.complete(finisher.apply(subscriber));
+ }
+ }
+
+ @Override
+ public CompletionStage<R> getBody() {
+ return cf;
+ }
+ }
}
< prev index next >