< prev index next >
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java
Print this page
@@ -28,10 +28,11 @@
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.LinkedList;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
@@ -120,11 +121,11 @@
HttpResponse.BodyProcessor<T> responseProcessor;
final HttpRequest.BodyProcessor requestProcessor;
volatile int responseCode;
volatile Response response;
volatile CompletableFuture<Response> responseCF;
- final AbstractPushPublisher<ByteBuffer> publisher;
+ final AbstractPushPublisher<List<ByteBuffer>> publisher;
final CompletableFuture<Void> requestBodyCF = new MinimalFuture<>();
/** True if END_STREAM has been seen in a frame received on this stream. */
private volatile boolean remotelyClosed;
private volatile boolean closed;
@@ -198,16 +199,20 @@
// RFC 7540 6.1:
// The entire DATA frame payload is included in flow control,
// including the Pad Length and Padding fields if present
int len = df.payloadLength();
ByteBufferReference[] buffers = df.getData();
+ LinkedList<ByteBuffer> l = new LinkedList<>();
for (ByteBufferReference b : buffers) {
ByteBuffer buf = b.get();
if (buf.hasRemaining()) {
- publisher.acceptData(Optional.of(buf));
+ l.add(buf);
}
}
+ if (!l.isEmpty()) {
+ publisher.acceptData(Optional.of(l));
+ }
connection.windowUpdater.update(len);
if (df.getFlag(DataFrame.END_STREAM)) {
setEndStreamReceived();
publisher.acceptData(Optional.empty());
return false;
< prev index next >