< prev index next >

src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java

Print this page

        

@@ -24,24 +24,21 @@
  */
 
 package jdk.incubator.http;
 
 import java.io.IOException;
-import java.io.UncheckedIOException;
 import java.net.URI;
-import jdk.incubator.http.ResponseProcessors.MultiFile;
 import jdk.incubator.http.ResponseProcessors.MultiProcessorImpl;
 import static jdk.incubator.http.internal.common.Utils.unchecked;
 import static jdk.incubator.http.internal.common.Utils.charsetFrom;
 import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
 import java.nio.file.OpenOption;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.nio.file.StandardOpenOption;
-import java.util.Map;
+import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.Flow;
 import java.util.function.Consumer;

@@ -395,20 +392,44 @@
          * @return a response body handler
          */
         public static BodyHandler<String> asString() {
             return (status, headers) -> BodyProcessor.asString(charsetFrom(headers));
         }
+
+        /**
+         * Returns a {@code BodyHandler} which, when invoked, returns a {@linkplain
+         * BodyProcessor#buffering(BodyProcessor,int) buffering BodyProcessor}
+         * that buffers data before delivering it to the downstream processor.
+         * These {@code BodyProcessor} instances are created by calling
+         * {@linkplain BodyProcessor#buffering(BodyProcessor,int)
+         * BodyProcessor.buffering} with a processor obtained from the given
+         * downstream handler and the {@code bufferSize} parameter.
+         *
+         * @param downstreamHandler the downstream handler
+         * @param bufferSize the buffer size parameter passed to {@linkplain
+         *        BodyProcessor#buffering(BodyProcessor,int) BodyProcessor.buffering}
+         * @return a body handler
+         * @throws IllegalArgumentException if {@code bufferSize <= 0}
+         */
+         public static <T> BodyHandler<T> buffering(BodyHandler<T> downstreamHandler,
+                                                    int bufferSize) {
+             if (bufferSize <= 0)
+                 throw new IllegalArgumentException("must be greater than 0");
+             return (status, headers) -> BodyProcessor
+                     .buffering(downstreamHandler.apply(status, headers),
+                                bufferSize);
+         }
     }
 
     /**
      * A processor for response bodies.
      * {@Incubating}
      * <p>
-     * The object acts as a {@link Flow.Subscriber}&lt;{@link ByteBuffer}&gt; to
-     * the HTTP client implementation which publishes ByteBuffers containing the
-     * response body. The processor converts the incoming buffers of data to
-     * some user-defined object type {@code T}.
+     * The object acts as a {@link Flow.Subscriber}&lt;{@link List}&lt;{@link
+     * ByteBuffer}&gt;&gt; to the HTTP client implementation which publishes
+     * ByteBuffers containing the response body. The processor converts the
+     * incoming buffers of data to some user-defined object type {@code T}.
      * <p>
      * The {@link #getBody()} method returns a {@link CompletionStage}{@code <T>}
      * that provides the response body object. The {@code CompletionStage} must
      * be obtainable at any time. When it completes depends on the nature
      * of type {@code T}. In many cases, when {@code T} represents the entire body after being

@@ -417,11 +438,11 @@
      * body has been read, because the calling code uses it to consume the data.
      *
      * @param <T> the response body type
      */
     public interface BodyProcessor<T>
-            extends Flow.Subscriber<ByteBuffer> {
+            extends Flow.Subscriber<List<ByteBuffer>> {
 
         /**
          * Returns a {@code CompletionStage} which when completed will return the
          * response body object.
          *

@@ -528,10 +549,34 @@
          * @return a {@code BodyProcessor}
          */
         public static <U> BodyProcessor<U> discard(U value) {
             return new ResponseProcessors.NullProcessor<>(Optional.ofNullable(value));
         }
+
+        /**
+         * Returns a {@code BodyProcessor} which buffers data before delivering
+         * it to the given downstream processor. The processor guarantees to
+         * deliver {@code buffersize} bytes of data to each invocation of the
+         * downstream's {@linkplain #onNext(Object) onNext} method, except for
+         * the final invocation, just before {@linkplain #onComplete() onComplete}
+         * is invoked. The final invocation of {@code onNext} may contain fewer
+         * than {@code buffersize} bytes.
+         * <p>
+         * The returned processor delegates its {@link #getBody()} method to the
+         * downstream processor.
+         *
+         * @param downstream the downstream processor
+         * @param bufferSize the buffer size
+         * @return a buffering body processor
+         * @throws IllegalArgumentException if {@code bufferSize <= 0}
+         */
+         public static <T> BodyProcessor<T> buffering(BodyProcessor<T> downstream,
+                                                      int bufferSize) {
+             if (bufferSize <= 0)
+                 throw new IllegalArgumentException("must be greater than 0");
+             return new BufferingProcessor<T>(downstream, bufferSize);
+         }
     }
 
     /**
      * A response processor for a HTTP/2 multi response.
      * {@Incubating}
< prev index next >