< prev index next >
test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java
Print this page
*** 1,7 ****
/*
! * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
--- 1,7 ----
/*
! * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*** 34,103 ****
import javax.net.ssl.*;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
-
- import jdk.incubator.http.internal.common.ByteBufferReference;
- import jdk.incubator.http.internal.frame.FramesDecoder;
-
- import jdk.incubator.http.internal.common.BufferHandler;
import jdk.incubator.http.internal.common.HttpHeadersImpl;
- import jdk.incubator.http.internal.common.Queue;
import jdk.incubator.http.internal.frame.*;
import jdk.incubator.http.internal.hpack.Decoder;
import jdk.incubator.http.internal.hpack.DecodingCallback;
import jdk.incubator.http.internal.hpack.Encoder;
import static jdk.incubator.http.internal.frame.SettingsFrame.HEADER_TABLE_SIZE;
/**
* Represents one HTTP2 connection, either plaintext upgraded from HTTP/1.1
* or HTTPS opened using "h2" ALPN.
*/
public class Http2TestServerConnection {
final Http2TestServer server;
@SuppressWarnings({"rawtypes","unchecked"})
final Map<Integer, Queue> streams; // input q per stream
final HashSet<Integer> pushStreams;
final Queue<Http2Frame> outputQ;
volatile int nextstream;
final Socket socket;
final InputStream is;
final OutputStream os;
volatile Encoder hpackOut;
volatile Decoder hpackIn;
volatile SettingsFrame clientSettings;
final SettingsFrame serverSettings;
final ExecutorService exec;
final boolean secure;
volatile boolean stopping;
volatile int nextPushStreamId = 2;
final static ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
final static byte[] EMPTY_BARRAY = new byte[0];
final static byte[] clientPreface = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n".getBytes();
! Http2TestServerConnection(Http2TestServer server, Socket socket) throws IOException {
if (socket instanceof SSLSocket) {
handshake(server.serverName(), (SSLSocket)socket);
}
System.err.println("TestServer: New connection from " + socket);
this.server = server;
this.streams = Collections.synchronizedMap(new HashMap<>());
! this.outputQ = new Queue<>();
this.socket = socket;
this.serverSettings = SettingsFrame.getDefaultSettings();
this.exec = server.exec;
this.secure = server.secure;
this.pushStreams = new HashSet<>();
is = new BufferedInputStream(socket.getInputStream());
os = new BufferedOutputStream(socket.getOutputStream());
}
private static boolean compareIPAddrs(InetAddress addr1, String host) {
try {
InetAddress addr2 = InetAddress.getByName(host);
return addr1.equals(addr2);
} catch (IOException e) {
--- 34,202 ----
import javax.net.ssl.*;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.*;
+ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
+ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Consumer;
import jdk.incubator.http.internal.common.HttpHeadersImpl;
import jdk.incubator.http.internal.frame.*;
import jdk.incubator.http.internal.hpack.Decoder;
import jdk.incubator.http.internal.hpack.DecodingCallback;
import jdk.incubator.http.internal.hpack.Encoder;
+ import sun.net.www.http.ChunkedInputStream;
+ import sun.net.www.http.HttpClient;
import static jdk.incubator.http.internal.frame.SettingsFrame.HEADER_TABLE_SIZE;
/**
* Represents one HTTP2 connection, either plaintext upgraded from HTTP/1.1
* or HTTPS opened using "h2" ALPN.
*/
public class Http2TestServerConnection {
final Http2TestServer server;
@SuppressWarnings({"rawtypes","unchecked"})
final Map<Integer, Queue> streams; // input q per stream
+ final Map<Integer, BodyOutputStream> outStreams; // output q per stream
final HashSet<Integer> pushStreams;
final Queue<Http2Frame> outputQ;
volatile int nextstream;
final Socket socket;
+ final Http2TestExchangeSupplier exchangeSupplier;
final InputStream is;
final OutputStream os;
volatile Encoder hpackOut;
volatile Decoder hpackIn;
volatile SettingsFrame clientSettings;
final SettingsFrame serverSettings;
final ExecutorService exec;
final boolean secure;
volatile boolean stopping;
volatile int nextPushStreamId = 2;
+ ConcurrentLinkedQueue<PingRequest> pings = new ConcurrentLinkedQueue<>();
final static ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
final static byte[] EMPTY_BARRAY = new byte[0];
+ final Random random;
final static byte[] clientPreface = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n".getBytes();
! static class Sentinel extends Http2Frame {
! Sentinel() { super(-1,-1);}
! }
!
! class PingRequest {
! final byte[] pingData;
! final long pingStamp;
! final CompletableFuture<Long> response;
!
! PingRequest() {
! pingData = new byte[8];
! random.nextBytes(pingData);
! pingStamp = System.currentTimeMillis();
! response = new CompletableFuture<>();
! }
!
! PingFrame frame() {
! return new PingFrame(0, pingData);
! }
!
! CompletableFuture<Long> response() {
! return response;
! }
!
! void success() {
! response.complete(System.currentTimeMillis() - pingStamp);
! }
!
! void fail(Throwable t) {
! response.completeExceptionally(t);
! }
! }
!
! static Sentinel sentinel;
!
! Http2TestServerConnection(Http2TestServer server,
! Socket socket,
! Http2TestExchangeSupplier exchangeSupplier)
! throws IOException
! {
if (socket instanceof SSLSocket) {
handshake(server.serverName(), (SSLSocket)socket);
}
System.err.println("TestServer: New connection from " + socket);
this.server = server;
+ this.exchangeSupplier = exchangeSupplier;
this.streams = Collections.synchronizedMap(new HashMap<>());
! this.outStreams = Collections.synchronizedMap(new HashMap<>());
! this.outputQ = new Queue<>(sentinel);
! this.random = new Random();
this.socket = socket;
+ this.socket.setTcpNoDelay(true);
this.serverSettings = SettingsFrame.getDefaultSettings();
this.exec = server.exec;
this.secure = server.secure;
this.pushStreams = new HashSet<>();
is = new BufferedInputStream(socket.getInputStream());
os = new BufferedOutputStream(socket.getOutputStream());
}
+ /**
+ * Sends a PING frame on this connection, and completes the returned
+ * CF when the PING ack is received. The CF is given
+ * an integer, whose value is the number of milliseconds
+ * between PING and ACK.
+ */
+ CompletableFuture<Long> sendPing() {
+ PingRequest ping = null;
+ try {
+ ping = new PingRequest();
+ pings.add(ping);
+ outputQ.put(ping.frame());
+ } catch (Throwable t) {
+ ping.fail(t);
+ }
+ return ping.response();
+ }
+
+ /**
+ * Returns the first PingRequest from Queue
+ */
+ private PingRequest getNextRequest() {
+ return pings.poll();
+ }
+
+ /**
+ * Handles incoming Ping, which could be an ack
+ * or a client originated Ping
+ */
+ void handlePing(PingFrame ping) throws IOException {
+ if (ping.streamid() != 0) {
+ System.err.println("Invalid ping received");
+ close();
+ return;
+ }
+ if (ping.getFlag(PingFrame.ACK)) {
+ // did we send a Ping?
+ PingRequest request = getNextRequest();
+ if (request == null) {
+ System.err.println("Invalid ping ACK received");
+ close();
+ return;
+ } else if (!Arrays.equals(request.pingData, ping.getData())) {
+ request.fail(new RuntimeException("Wrong ping data in ACK"));
+ } else {
+ request.success();
+ }
+ } else {
+ // client originated PING. Just send it back with ACK set
+ ping.setFlag(PingFrame.ACK);
+ outputQ.put(ping);
+ }
+ }
+
private static boolean compareIPAddrs(InetAddress addr1, String host) {
try {
InetAddress addr2 = InetAddress.getByName(host);
return addr1.equals(addr2);
} catch (IOException e) {
*** 184,195 ****
// simulate header of Settings Frame
ByteBuffer bb0 = ByteBuffer.wrap(
new byte[] {0, 0, (byte)payload.length, 4, 0, 0, 0, 0, 0});
List<Http2Frame> frames = new ArrayList<>();
FramesDecoder reader = new FramesDecoder(frames::add);
! reader.decode(ByteBufferReference.of(bb0));
! reader.decode(ByteBufferReference.of(bb1));
if (frames.size()!=1)
throw new IOException("Expected 1 frame got "+frames.size()) ;
Http2Frame frame = frames.get(0);
if (!(frame instanceof SettingsFrame))
throw new IOException("Expected SettingsFrame");
--- 283,294 ----
// simulate header of Settings Frame
ByteBuffer bb0 = ByteBuffer.wrap(
new byte[] {0, 0, (byte)payload.length, 4, 0, 0, 0, 0, 0});
List<Http2Frame> frames = new ArrayList<>();
FramesDecoder reader = new FramesDecoder(frames::add);
! reader.decode(bb0);
! reader.decode(bb1);
if (frames.size()!=1)
throw new IOException("Expected 1 frame got "+frames.size()) ;
Http2Frame frame = frames.get(0);
if (!(frame instanceof SettingsFrame))
throw new IOException("Expected SettingsFrame");
*** 224,269 ****
exec.submit(this::readLoop);
exec.submit(this::writeLoop);
}
- static class BufferPool implements BufferHandler {
-
- public void setMinBufferSize(int size) {
- }
-
- @Override
- public ByteBuffer getBuffer() {
- int size = 32 * 1024;
- return ByteBuffer.allocate(size);
- }
-
- @Override
- public void returnBuffer(ByteBuffer buffer) {
- }
- }
-
private void writeFrame(Http2Frame frame) throws IOException {
! ByteBufferReference[] refs = new FramesEncoder().encodeFrame(frame);
//System.err.println("TestServer: Writing frame " + frame.toString());
int c = 0;
! for (ByteBufferReference ref : refs) {
! ByteBuffer buf = ref.get();
byte[] ba = buf.array();
int start = buf.arrayOffset() + buf.position();
c += buf.remaining();
os.write(ba, start, buf.remaining());
}
os.flush();
//System.err.printf("TestServer: wrote %d bytes\n", c);
}
- void handleStreamReset(ResetFrame resetFrame) throws IOException {
- // TODO: cleanup
- throw new IOException("Stream reset");
- }
-
private void handleCommonFrame(Http2Frame f) throws IOException {
if (f instanceof SettingsFrame) {
SettingsFrame sf = (SettingsFrame) f;
if (sf.getFlag(SettingsFrame.ACK)) // ignore
{
--- 323,360 ----
exec.submit(this::readLoop);
exec.submit(this::writeLoop);
}
private void writeFrame(Http2Frame frame) throws IOException {
! List<ByteBuffer> bufs = new FramesEncoder().encodeFrame(frame);
//System.err.println("TestServer: Writing frame " + frame.toString());
int c = 0;
! for (ByteBuffer buf : bufs) {
byte[] ba = buf.array();
int start = buf.arrayOffset() + buf.position();
c += buf.remaining();
os.write(ba, start, buf.remaining());
+
+ // System.out.println("writing byte at a time");
+ // while (buf.hasRemaining()) {
+ // byte b = buf.get();
+ // os.write(b);
+ // os.flush();
+ // try {
+ // Thread.sleep(1);
+ // } catch(InterruptedException e) {
+ // UncheckedIOException uie = new UncheckedIOException(new IOException(""));
+ // uie.addSuppressed(e);
+ // throw uie;
+ // }
+ // }
}
os.flush();
//System.err.printf("TestServer: wrote %d bytes\n", c);
}
private void handleCommonFrame(Http2Frame f) throws IOException {
if (f instanceof SettingsFrame) {
SettingsFrame sf = (SettingsFrame) f;
if (sf.getFlag(SettingsFrame.ACK)) // ignore
{
*** 274,286 ****
SettingsFrame frame = new SettingsFrame();
frame.setFlag(SettingsFrame.ACK);
frame.streamid(0);
outputQ.put(frame);
return;
! }
! //System.err.println("TestServer: Received ---> " + f.toString());
! throw new UnsupportedOperationException("Not supported yet.");
}
void sendWindowUpdates(int len, int streamid) throws IOException {
if (len == 0)
return;
--- 365,381 ----
SettingsFrame frame = new SettingsFrame();
frame.setFlag(SettingsFrame.ACK);
frame.streamid(0);
outputQ.put(frame);
return;
! } else if (f instanceof GoAwayFrame) {
! System.err.println("Closing: "+ f.toString());
! close();
! } else if (f instanceof PingFrame) {
! handlePing((PingFrame)f);
! } else
! throw new UnsupportedOperationException("Not supported yet: " + f.toString());
}
void sendWindowUpdates(int len, int streamid) throws IOException {
if (len == 0)
return;
*** 288,308 ****
outputQ.put(wup);
wup = new WindowUpdateFrame(0 , len);
outputQ.put(wup);
}
! HttpHeadersImpl decodeHeaders(List<HeaderFrame> frames) {
HttpHeadersImpl headers = new HttpHeadersImpl();
DecodingCallback cb = (name, value) -> {
headers.addHeader(name.toString(), value.toString());
};
for (HeaderFrame frame : frames) {
! ByteBufferReference[] buffers = frame.getHeaderBlock();
! for (ByteBufferReference buffer : buffers) {
! hpackIn.decode(buffer.get(), false, cb);
}
}
hpackIn.decode(EMPTY_BUFFER, true, cb);
return headers;
}
--- 383,403 ----
outputQ.put(wup);
wup = new WindowUpdateFrame(0 , len);
outputQ.put(wup);
}
! HttpHeadersImpl decodeHeaders(List<HeaderFrame> frames) throws IOException {
HttpHeadersImpl headers = new HttpHeadersImpl();
DecodingCallback cb = (name, value) -> {
headers.addHeader(name.toString(), value.toString());
};
for (HeaderFrame frame : frames) {
! List<ByteBuffer> buffers = frame.getHeaderBlock();
! for (ByteBuffer buffer : buffers) {
! hpackIn.decode(buffer, false, cb);
}
}
hpackIn.decode(EMPTY_BUFFER, true, cb);
return headers;
}
*** 357,367 ****
headers.setHeader(":method", tokens[0]);
headers.setHeader(":scheme", "http"); // always in this case
headers.setHeader(":authority", host);
headers.setHeader(":path", uri.getPath());
! Queue q = new Queue();
String body = getRequestBody(request);
addHeaders(getHeaders(request), headers);
headers.setHeader("Content-length", Integer.toString(body.length()));
addRequestBodyToQueue(body, q);
--- 452,462 ----
headers.setHeader(":method", tokens[0]);
headers.setHeader(":scheme", "http"); // always in this case
headers.setHeader(":authority", host);
headers.setHeader(":path", uri.getPath());
! Queue q = new Queue(sentinel);
String body = getRequestBody(request);
addHeaders(getHeaders(request), headers);
headers.setHeader("Content-length", Integer.toString(body.length()));
addRequestBodyToQueue(body, q);
*** 399,409 ****
frames.add(frame);
}
}
boolean endStreamReceived = endStream;
HttpHeadersImpl headers = decodeHeaders(frames);
! Queue q = new Queue();
streams.put(streamid, q);
exec.submit(() -> {
handleRequest(headers, q, streamid, endStreamReceived);
});
}
--- 494,504 ----
frames.add(frame);
}
}
boolean endStreamReceived = endStream;
HttpHeadersImpl headers = decodeHeaders(frames);
! Queue q = new Queue(sentinel);
streams.put(streamid, q);
exec.submit(() -> {
handleRequest(headers, q, streamid, endStreamReceived);
});
}
*** 440,453 ****
bis = new BodyInputStream(queue, streamid, this);
}
try (bis;
BodyOutputStream bos = new BodyOutputStream(streamid, winsize, this))
{
String us = scheme + "://" + authority + path;
URI uri = new URI(us);
boolean pushAllowed = clientSettings.getParameter(SettingsFrame.ENABLE_PUSH) == 1;
! Http2TestExchange exchange = new Http2TestExchange(streamid, method,
headers, rspheaders, uri, bis, getSSLSession(),
bos, this, pushAllowed);
// give to user
Http2Handler handler = server.getHandlerFor(uri.getPath());
--- 535,549 ----
bis = new BodyInputStream(queue, streamid, this);
}
try (bis;
BodyOutputStream bos = new BodyOutputStream(streamid, winsize, this))
{
+ outStreams.put(streamid, bos);
String us = scheme + "://" + authority + path;
URI uri = new URI(us);
boolean pushAllowed = clientSettings.getParameter(SettingsFrame.ENABLE_PUSH) == 1;
! Http2TestExchange exchange = exchangeSupplier.get(streamid, method,
headers, rspheaders, uri, bis, getSSLSession(),
bos, this, pushAllowed);
// give to user
Http2Handler handler = server.getHandlerFor(uri.getPath());
*** 505,514 ****
--- 601,621 ----
WindowUpdateFrame wup = (WindowUpdateFrame) frame;
synchronized (updaters) {
Consumer<Integer> r = updaters.get(stream);
r.accept(wup.getUpdate());
}
+ } else if (frame.type() == ResetFrame.TYPE) {
+ // do orderly close on input q
+ // and close the output q immediately
+ // This should mean depending on what the
+ // handler is doing: either an EOF on read
+ // or an IOException if writing the response.
+ q.orderlyClose();
+ BodyOutputStream oq = outStreams.get(stream);
+ if (oq != null)
+ oq.closeInternal();
+
} else {
q.put(frame);
}
}
}
*** 520,530 ****
}
close();
}
}
! ByteBufferReference[] encodeHeaders(HttpHeadersImpl headers) {
List<ByteBuffer> buffers = new LinkedList<>();
ByteBuffer buf = getBuffer();
boolean encoded;
for (Map.Entry<String, List<String>> entry : headers.map().entrySet()) {
--- 627,637 ----
}
close();
}
}
! List<ByteBuffer> encodeHeaders(HttpHeadersImpl headers) {
List<ByteBuffer> buffers = new LinkedList<>();
ByteBuffer buf = getBuffer();
boolean encoded;
for (Map.Entry<String, List<String>> entry : headers.map().entrySet()) {
*** 542,552 ****
} while (!encoded);
}
}
buf.flip();
buffers.add(buf);
! return ByteBufferReference.toReferences(buffers.toArray(bbarray));
}
static void closeIgnore(Closeable c) {
try {
c.close();
--- 649,659 ----
} while (!encoded);
}
}
buf.flip();
buffers.add(buf);
! return buffers;
}
static void closeIgnore(Closeable c) {
try {
c.close();
*** 585,604 ****
}
}
private void handlePush(OutgoingPushPromise op) throws IOException {
int promisedStreamid = nextPushStreamId;
! PushPromiseFrame pp = new PushPromiseFrame(op.parentStream, HeaderFrame.END_HEADERS, promisedStreamid, encodeHeaders(op.headers), 0);
pushStreams.add(promisedStreamid);
nextPushStreamId += 2;
pp.streamid(op.parentStream);
writeFrame(pp);
final InputStream ii = op.is;
final BodyOutputStream oo = new BodyOutputStream(
promisedStreamid,
clientSettings.getParameter(
SettingsFrame.INITIAL_WINDOW_SIZE), this);
oo.goodToGo();
exec.submit(() -> {
try {
ResponseHeaders oh = getPushResponse(promisedStreamid);
outputQ.put(oh);
--- 692,716 ----
}
}
private void handlePush(OutgoingPushPromise op) throws IOException {
int promisedStreamid = nextPushStreamId;
! PushPromiseFrame pp = new PushPromiseFrame(op.parentStream,
! HeaderFrame.END_HEADERS,
! promisedStreamid,
! encodeHeaders(op.headers),
! 0);
pushStreams.add(promisedStreamid);
nextPushStreamId += 2;
pp.streamid(op.parentStream);
writeFrame(pp);
final InputStream ii = op.is;
final BodyOutputStream oo = new BodyOutputStream(
promisedStreamid,
clientSettings.getParameter(
SettingsFrame.INITIAL_WINDOW_SIZE), this);
+ outStreams.put(promisedStreamid, oo);
oo.goodToGo();
exec.submit(() -> {
try {
ResponseHeaders oh = getPushResponse(promisedStreamid);
outputQ.put(oh);
*** 643,654 ****
int n = is.readNBytes(rest, 0, len);
if (n != len)
throw new IOException("Error reading frame");
List<Http2Frame> frames = new ArrayList<>();
FramesDecoder reader = new FramesDecoder(frames::add);
! reader.decode(ByteBufferReference.of(ByteBuffer.wrap(buf)));
! reader.decode(ByteBufferReference.of(ByteBuffer.wrap(rest)));
if (frames.size()!=1)
throw new IOException("Expected 1 frame got "+frames.size()) ;
return frames.get(0);
}
--- 755,766 ----
int n = is.readNBytes(rest, 0, len);
if (n != len)
throw new IOException("Error reading frame");
List<Http2Frame> frames = new ArrayList<>();
FramesDecoder reader = new FramesDecoder(frames::add);
! reader.decode(ByteBuffer.wrap(buf));
! reader.decode(ByteBuffer.wrap(rest));
if (frames.size()!=1)
throw new IOException("Expected 1 frame got "+frames.size()) ;
return frames.get(0);
}
*** 719,735 ****
final static String CRLFCRLF = "\r\n\r\n";
String readHttp1Request() throws IOException {
String headers = readUntil(CRLF + CRLF);
int clen = getContentLength(headers);
! // read the content.
! byte[] buf = new byte[clen];
is.readNBytes(buf, 0, clen);
String body = new String(buf, StandardCharsets.US_ASCII);
return headers + body;
}
void sendHttp1Response(int code, String msg, String... headers) throws IOException {
StringBuilder sb = new StringBuilder();
sb.append("HTTP/1.1 ")
.append(code)
.append(' ')
--- 831,859 ----
final static String CRLFCRLF = "\r\n\r\n";
String readHttp1Request() throws IOException {
String headers = readUntil(CRLF + CRLF);
int clen = getContentLength(headers);
! byte[] buf;
! if (clen >= 0) {
! // HTTP/1.1 fixed length content ( may be 0 ), read it
! buf = new byte[clen];
is.readNBytes(buf, 0, clen);
+ } else {
+ // HTTP/1.1 chunked data, read it
+ buf = readChunkedInputStream(is);
+ }
String body = new String(buf, StandardCharsets.US_ASCII);
return headers + body;
}
+ // This is a quick hack to get a chunked input stream reader.
+ private static byte[] readChunkedInputStream(InputStream is) throws IOException {
+ ChunkedInputStream cis = new ChunkedInputStream(is, new HttpClient() {}, null);
+ return cis.readAllBytes();
+ }
+
void sendHttp1Response(int code, String msg, String... headers) throws IOException {
StringBuilder sb = new StringBuilder();
sb.append("HTTP/1.1 ")
.append(code)
.append(' ')
*** 769,779 ****
}
@SuppressWarnings({"rawtypes","unchecked"})
void addRequestBodyToQueue(String body, Queue q) throws IOException {
ByteBuffer buf = ByteBuffer.wrap(body.getBytes(StandardCharsets.US_ASCII));
! DataFrame df = new DataFrame(1, DataFrame.END_STREAM, ByteBufferReference.of(buf));
// only used for primordial stream
q.put(df);
}
// window updates done in main reader thread because they may
--- 893,903 ----
}
@SuppressWarnings({"rawtypes","unchecked"})
void addRequestBodyToQueue(String body, Queue q) throws IOException {
ByteBuffer buf = ByteBuffer.wrap(body.getBytes(StandardCharsets.US_ASCII));
! DataFrame df = new DataFrame(1, DataFrame.END_STREAM, buf);
// only used for primordial stream
q.put(df);
}
// window updates done in main reader thread because they may
< prev index next >