< prev index next >

src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java

Print this page

        

@@ -45,10 +45,12 @@
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Flow;
+import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Subscription;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import jdk.incubator.http.internal.common.MinimalFuture;
 import jdk.incubator.http.internal.common.Utils;

@@ -556,6 +558,53 @@
         @Override
         public CompletionStage<T> getBody() {
             return cf;
         }
     }
+
+    /** An adapter between {@code BodySubscriber} and {@code Flow.Subscriber}. */
+    static final class SubscriberAdapter<S extends Subscriber<? super List<ByteBuffer>>,R>
+        implements HttpResponse.BodySubscriber<R>
+    {
+        private final CompletableFuture<R> cf = new MinimalFuture<>();
+        private final S subscriber;
+        private final Function<S,R> finisher;
+
+        SubscriberAdapter(S subscriber, Function<S,R> finisher) {
+            this.subscriber = Objects.requireNonNull(subscriber);
+            this.finisher = Objects.requireNonNull(finisher);
+        }
+
+        @Override
+        public void onSubscribe(Subscription subscription) {
+            subscriber.onSubscribe(subscription);
+        }
+
+        @Override
+        public void onNext(List<ByteBuffer> item) {
+            subscriber.onNext(item);
+        }
+
+        @Override
+        public void onError(Throwable throwable) {
+            try {
+                subscriber.onError(throwable);
+            } finally {
+                cf.completeExceptionally(throwable);
+            }
+        }
+
+        @Override
+        public void onComplete() {
+            try {
+                subscriber.onComplete();
+            } finally {
+                cf.complete(finisher.apply(subscriber));
+            }
+        }
+
+        @Override
+        public CompletionStage<R> getBody() {
+            return cf;
+        }
+    }
 }
< prev index next >