< prev index next >

src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java

Print this page

        

@@ -28,10 +28,11 @@
 import java.io.IOException;
 import java.net.URI;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.LinkedList;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;

@@ -120,11 +121,11 @@
     HttpResponse.BodyProcessor<T> responseProcessor;
     final HttpRequest.BodyProcessor requestProcessor;
     volatile int responseCode;
     volatile Response response;
     volatile CompletableFuture<Response> responseCF;
-    final AbstractPushPublisher<ByteBuffer> publisher;
+    final AbstractPushPublisher<List<ByteBuffer>> publisher;
     final CompletableFuture<Void> requestBodyCF = new MinimalFuture<>();
 
     /** True if END_STREAM has been seen in a frame received on this stream. */
     private volatile boolean remotelyClosed;
     private volatile boolean closed;

@@ -198,16 +199,20 @@
         // RFC 7540 6.1:
         // The entire DATA frame payload is included in flow control,
         // including the Pad Length and Padding fields if present
         int len = df.payloadLength();
         ByteBufferReference[] buffers = df.getData();
+        LinkedList<ByteBuffer> l = new LinkedList<>();
         for (ByteBufferReference b : buffers) {
             ByteBuffer buf = b.get();
             if (buf.hasRemaining()) {
-                publisher.acceptData(Optional.of(buf));
+                l.add(buf);
             }
         }
+        if (!l.isEmpty()) {
+            publisher.acceptData(Optional.of(l));
+        }
         connection.windowUpdater.update(len);
         if (df.getFlag(DataFrame.END_STREAM)) {
             setEndStreamReceived();
             publisher.acceptData(Optional.empty());
             return false;
< prev index next >