< prev index next >

src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java

Print this page

        

@@ -27,10 +27,11 @@
 
 import java.io.IOException;
 import java.net.URI;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;

@@ -42,10 +43,11 @@
 import java.util.function.Consumer;
 
 import jdk.incubator.http.internal.common.*;
 import jdk.incubator.http.internal.frame.*;
 import jdk.incubator.http.internal.hpack.DecodingCallback;
+import static java.util.stream.Collectors.toList;
 
 /**
  * Http/2 Stream handling.
  *
  * REQUESTS

@@ -120,11 +122,11 @@
     HttpResponse.BodyProcessor<T> responseProcessor;
     final HttpRequest.BodyProcessor requestProcessor;
     volatile int responseCode;
     volatile Response response;
     volatile CompletableFuture<Response> responseCF;
-    final AbstractPushPublisher<ByteBuffer> publisher;
+    final AbstractPushPublisher<List<ByteBuffer>> publisher;
     final CompletableFuture<Void> requestBodyCF = new MinimalFuture<>();
 
     /** True if END_STREAM has been seen in a frame received on this stream. */
     private volatile boolean remotelyClosed;
     private volatile boolean closed;

@@ -198,15 +200,16 @@
         // RFC 7540 6.1:
         // The entire DATA frame payload is included in flow control,
         // including the Pad Length and Padding fields if present
         int len = df.payloadLength();
         ByteBufferReference[] buffers = df.getData();
-        for (ByteBufferReference b : buffers) {
-            ByteBuffer buf = b.get();
-            if (buf.hasRemaining()) {
-                publisher.acceptData(Optional.of(buf));
-            }
+        List<ByteBuffer> dsts = Arrays.stream(buffers)
+                .map(ByteBufferReference::get)
+                .filter(ByteBuffer::hasRemaining)
+                .collect(toList());
+        if (!dsts.isEmpty()) {
+            publisher.acceptData(Optional.of(dsts));
         }
         connection.windowUpdater.update(len);
         if (df.getFlag(DataFrame.END_STREAM)) {
             setEndStreamReceived();
             publisher.acceptData(Optional.empty());
< prev index next >