< prev index next >
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java
Print this page
*** 23,53 ****
* questions.
*/
package jdk.incubator.http;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
- import jdk.incubator.http.HttpConnection.Mode;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
- import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.ArrayList;
! import java.util.Collections;
! import java.util.Formatter;
import java.util.concurrent.ConcurrentHashMap;
! import java.util.concurrent.CountDownLatch;
! import java.util.stream.Collectors;
import javax.net.ssl.SSLEngine;
! import jdk.incubator.http.internal.common.*;
! import jdk.incubator.http.internal.frame.*;
import jdk.incubator.http.internal.hpack.Encoder;
import jdk.incubator.http.internal.hpack.Decoder;
import jdk.incubator.http.internal.hpack.DecodingCallback;
import static jdk.incubator.http.internal.frame.SettingsFrame.*;
--- 23,76 ----
* questions.
*/
package jdk.incubator.http;
+ import java.io.EOFException;
import java.io.IOException;
+ import java.lang.System.Logger.Level;
import java.net.InetSocketAddress;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.ArrayList;
! import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
! import java.util.concurrent.ConcurrentLinkedQueue;
! import java.util.concurrent.Flow;
! import java.util.function.Function;
! import java.util.function.Supplier;
import javax.net.ssl.SSLEngine;
! import jdk.incubator.http.HttpConnection.HttpPublisher;
! import jdk.incubator.http.internal.common.FlowTube;
! import jdk.incubator.http.internal.common.FlowTube.TubeSubscriber;
! import jdk.incubator.http.internal.common.HttpHeadersImpl;
! import jdk.incubator.http.internal.common.Log;
! import jdk.incubator.http.internal.common.MinimalFuture;
! import jdk.incubator.http.internal.common.SequentialScheduler;
! import jdk.incubator.http.internal.common.Utils;
! import jdk.incubator.http.internal.frame.ContinuationFrame;
! import jdk.incubator.http.internal.frame.DataFrame;
! import jdk.incubator.http.internal.frame.ErrorFrame;
! import jdk.incubator.http.internal.frame.FramesDecoder;
! import jdk.incubator.http.internal.frame.FramesEncoder;
! import jdk.incubator.http.internal.frame.GoAwayFrame;
! import jdk.incubator.http.internal.frame.HeaderFrame;
! import jdk.incubator.http.internal.frame.HeadersFrame;
! import jdk.incubator.http.internal.frame.Http2Frame;
! import jdk.incubator.http.internal.frame.MalformedFrame;
! import jdk.incubator.http.internal.frame.OutgoingHeaders;
! import jdk.incubator.http.internal.frame.PingFrame;
! import jdk.incubator.http.internal.frame.PushPromiseFrame;
! import jdk.incubator.http.internal.frame.ResetFrame;
! import jdk.incubator.http.internal.frame.SettingsFrame;
! import jdk.incubator.http.internal.frame.WindowUpdateFrame;
import jdk.incubator.http.internal.hpack.Encoder;
import jdk.incubator.http.internal.hpack.Decoder;
import jdk.incubator.http.internal.hpack.DecodingCallback;
import static jdk.incubator.http.internal.frame.SettingsFrame.*;
*** 81,95 ****
* or handles them directly itself. This thread performs hpack decompression
* and incoming stream creation (Server push). Incoming frames destined for a
* stream are provided by calling Stream.incoming().
*/
class Http2Connection {
/*
* ByteBuffer pooling strategy for HTTP/2 protocol:
*
* In general there are 4 points where ByteBuffers are used:
! * - incoming/outgoing frames from/to ByteBufers plus incoming/outgoing encrypted data
* in case of SSL connection.
*
* 1. Outgoing frames encoded to ByteBuffers.
* Outgoing ByteBuffers are created with requited size and frequently small (except DataFrames, etc)
* At this place no pools at all. All outgoing buffers should be collected by GC.
--- 104,128 ----
* or handles them directly itself. This thread performs hpack decompression
* and incoming stream creation (Server push). Incoming frames destined for a
* stream are provided by calling Stream.incoming().
*/
class Http2Connection {
+
+ static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
+ static final boolean DEBUG_HPACK = Utils.DEBUG_HPACK; // Revisit: temporary dev flag.
+ final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG);
+ final static System.Logger DEBUG_LOGGER =
+ Utils.getDebugLogger("Http2Connection"::toString, DEBUG);
+ private final System.Logger debugHpack =
+ Utils.getHpackLogger(this::dbgString, DEBUG_HPACK);
+ static final ByteBuffer EMPTY_TRIGGER = ByteBuffer.allocate(0);
+
/*
* ByteBuffer pooling strategy for HTTP/2 protocol:
*
* In general there are 4 points where ByteBuffers are used:
! * - incoming/outgoing frames from/to ByteBuffers plus incoming/outgoing encrypted data
* in case of SSL connection.
*
* 1. Outgoing frames encoded to ByteBuffers.
* Outgoing ByteBuffers are created with requited size and frequently small (except DataFrames, etc)
* At this place no pools at all. All outgoing buffers should be collected by GC.
*** 114,157 ****
// A small class that allows to control frames with respect to the state of
// the connection preface. Any data received before the connection
// preface is sent will be buffered.
private final class FramesController {
volatile boolean prefaceSent;
! volatile List<ByteBufferReference> pending;
! boolean processReceivedData(FramesDecoder decoder, ByteBufferReference buf)
throws IOException
{
// if preface is not sent, buffers data in the pending list
if (!prefaceSent) {
synchronized (this) {
if (!prefaceSent) {
if (pending == null) pending = new ArrayList<>();
pending.add(buf);
return false;
}
}
}
// Preface is sent. Checks for pending data and flush it.
! // We rely on this method being called from within the readlock,
! // so we know that no other thread could execute this method
// concurrently while we're here.
// This ensures that later incoming buffers will not
// be processed before we have flushed the pending queue.
// No additional synchronization is therefore necessary here.
! List<ByteBufferReference> pending = this.pending;
this.pending = null;
if (pending != null) {
// flush pending data
! for (ByteBufferReference b : pending) {
decoder.decode(b);
}
}
-
// push the received buffer to the frames decoder.
decoder.decode(buf);
return true;
}
// Mark that the connection preface is sent
void markPrefaceSent() {
--- 147,199 ----
// A small class that allows to control frames with respect to the state of
// the connection preface. Any data received before the connection
// preface is sent will be buffered.
private final class FramesController {
volatile boolean prefaceSent;
! volatile List<ByteBuffer> pending;
! boolean processReceivedData(FramesDecoder decoder, ByteBuffer buf)
throws IOException
{
// if preface is not sent, buffers data in the pending list
if (!prefaceSent) {
+ debug.log(Level.DEBUG, "Preface is not sent: buffering %d",
+ buf.remaining());
synchronized (this) {
if (!prefaceSent) {
if (pending == null) pending = new ArrayList<>();
pending.add(buf);
+ debug.log(Level.DEBUG, () -> "there are now "
+ + Utils.remaining(pending)
+ + " bytes buffered waiting for preface to be sent");
return false;
}
}
}
// Preface is sent. Checks for pending data and flush it.
! // We rely on this method being called from within the Http2TubeSubscriber
! // scheduler, so we know that no other thread could execute this method
// concurrently while we're here.
// This ensures that later incoming buffers will not
// be processed before we have flushed the pending queue.
// No additional synchronization is therefore necessary here.
! List<ByteBuffer> pending = this.pending;
this.pending = null;
if (pending != null) {
// flush pending data
! debug.log(Level.DEBUG, () -> "Processing buffered data: "
! + Utils.remaining(pending));
! for (ByteBuffer b : pending) {
decoder.decode(b);
}
}
// push the received buffer to the frames decoder.
+ if (buf != EMPTY_TRIGGER) {
+ debug.log(Level.DEBUG, "Processing %d", buf.remaining());
decoder.decode(buf);
+ }
return true;
}
// Mark that the connection preface is sent
void markPrefaceSent() {
*** 165,175 ****
volatile boolean closed;
//-------------------------------------
final HttpConnection connection;
- private final HttpClientImpl client;
private final Http2ClientImpl client2;
private final Map<Integer,Stream<?>> streams = new ConcurrentHashMap<>();
private int nextstreamid;
private int nextPushStream = 2;
private final Encoder hpackOut;
--- 207,216 ----
*** 184,194 ****
--- 225,238 ----
* Send Window controller for both connection and stream windows.
* Each of this connection's Streams MUST use this controller.
*/
private final WindowController windowController = new WindowController();
private final FramesController framesController = new FramesController();
+ private final Http2TubeSubscriber subscriber = new Http2TubeSubscriber();
final WindowUpdateSender windowUpdater;
+ private volatile Throwable cause;
+ private volatile Supplier<ByteBuffer> initial;
static final int DEFAULT_FRAME_SIZE = 16 * 1024;
// TODO: need list of control frames from other threads
*** 197,312 ****
private Http2Connection(HttpConnection connection,
Http2ClientImpl client2,
int nextstreamid,
String key) {
this.connection = connection;
- this.client = client2.client();
this.client2 = client2;
this.nextstreamid = nextstreamid;
this.key = key;
this.clientSettings = this.client2.getClientSettings();
this.framesDecoder = new FramesDecoder(this::processFrame, clientSettings.getParameter(SettingsFrame.MAX_FRAME_SIZE));
// serverSettings will be updated by server
this.serverSettings = SettingsFrame.getDefaultSettings();
this.hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE));
this.hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE));
! this.windowUpdater = new ConnectionWindowUpdateSender(this, client.getReceiveBufferSize());
}
/**
* Case 1) Create from upgraded HTTP/1.1 connection.
! * Is ready to use. Will not be SSL. exchange is the Exchange
* that initiated the connection, whose response will be delivered
* on a Stream.
*/
! Http2Connection(HttpConnection connection,
Http2ClientImpl client2,
Exchange<?> exchange,
! ByteBuffer initial)
throws IOException, InterruptedException
{
this(connection,
client2,
3, // stream 1 is registered during the upgrade
keyFor(connection));
- assert !(connection instanceof SSLConnection);
Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize());
Stream<?> initialStream = createStream(exchange);
initialStream.registerStream(1);
windowController.registerStream(1, getInitialSendWindowSize());
initialStream.requestSent();
sendConnectionPreface();
- // start reading and writing
- // start reading
- AsyncConnection asyncConn = (AsyncConnection)connection;
- asyncConn.setAsyncCallbacks(this::asyncReceive, this::shutdown, this::getReadBuffer);
- connection.configureMode(Mode.ASYNC); // set mode only AFTER setAsyncCallbacks to provide visibility.
- asyncReceive(ByteBufferReference.of(initial));
- asyncConn.startReading();
}
! // async style but completes immediately
static CompletableFuture<Http2Connection> createAsync(HttpConnection connection,
Http2ClientImpl client2,
Exchange<?> exchange,
! ByteBuffer initial) {
return MinimalFuture.supply(() -> new Http2Connection(connection, client2, exchange, initial));
}
/**
* Cases 2) 3)
*
* request is request to be sent.
*/
! Http2Connection(HttpRequestImpl request, Http2ClientImpl h2client)
! throws IOException, InterruptedException
{
! this(HttpConnection.getConnection(request.getAddress(h2client.client()), h2client.client(), request, true),
h2client,
1,
! keyFor(request.uri(), request.proxy(h2client.client())));
Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize());
- // start reading
- AsyncConnection asyncConn = (AsyncConnection)connection;
- asyncConn.setAsyncCallbacks(this::asyncReceive, this::shutdown, this::getReadBuffer);
- connection.connect();
- checkSSLConfig();
// safe to resume async reading now.
! asyncConn.enableCallback();
sendConnectionPreface();
}
/**
* Throws an IOException if h2 was not negotiated
*/
! private void checkSSLConfig() throws IOException {
! AbstractAsyncSSLConnection aconn = (AbstractAsyncSSLConnection)connection;
SSLEngine engine = aconn.getEngine();
! String alpn = engine.getApplicationProtocol();
if (alpn == null || !alpn.equals("h2")) {
String msg;
if (alpn == null) {
Log.logSSL("ALPN not supported");
msg = "ALPN not supported";
! } else switch (alpn) {
case "":
! Log.logSSL("No ALPN returned");
! msg = "No ALPN negotiated";
break;
case "http/1.1":
! Log.logSSL("HTTP/1.1 ALPN returned");
! msg = "HTTP/1.1 ALPN returned";
break;
default:
! Log.logSSL("unknown ALPN returned");
! msg = "Unexpected ALPN: " + alpn;
! throw new IOException(msg);
}
- throw new ALPNException(msg, aconn);
}
}
static String keyFor(HttpConnection connection) {
boolean isProxy = connection.isProxied();
boolean isSecure = connection.isSecure();
--- 241,400 ----
private Http2Connection(HttpConnection connection,
Http2ClientImpl client2,
int nextstreamid,
String key) {
this.connection = connection;
this.client2 = client2;
this.nextstreamid = nextstreamid;
this.key = key;
this.clientSettings = this.client2.getClientSettings();
this.framesDecoder = new FramesDecoder(this::processFrame, clientSettings.getParameter(SettingsFrame.MAX_FRAME_SIZE));
// serverSettings will be updated by server
this.serverSettings = SettingsFrame.getDefaultSettings();
this.hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE));
this.hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE));
! debugHpack.log(Level.DEBUG, () -> "For the record:" + super.toString());
! debugHpack.log(Level.DEBUG, "Decoder created: %s", hpackIn);
! debugHpack.log(Level.DEBUG, "Encoder created: %s", hpackOut);
! this.windowUpdater = new ConnectionWindowUpdateSender(this, client().getReceiveBufferSize());
}
/**
* Case 1) Create from upgraded HTTP/1.1 connection.
! * Is ready to use. Can be SSL. exchange is the Exchange
* that initiated the connection, whose response will be delivered
* on a Stream.
*/
! private Http2Connection(HttpConnection connection,
Http2ClientImpl client2,
Exchange<?> exchange,
! Supplier<ByteBuffer> initial)
throws IOException, InterruptedException
{
this(connection,
client2,
3, // stream 1 is registered during the upgrade
keyFor(connection));
Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize());
Stream<?> initialStream = createStream(exchange);
initialStream.registerStream(1);
windowController.registerStream(1, getInitialSendWindowSize());
initialStream.requestSent();
+ // Upgrading:
+ // set callbacks before sending preface - makes sure anything that
+ // might be sent by the server will come our way.
+ this.initial = initial;
+ connectFlows(connection);
sendConnectionPreface();
}
! // Used when upgrading an HTTP/1.1 connection to HTTP/2 after receiving
! // agreement from the server. Async style but completes immediately, because
! // the connection is already connected.
static CompletableFuture<Http2Connection> createAsync(HttpConnection connection,
Http2ClientImpl client2,
Exchange<?> exchange,
! Supplier<ByteBuffer> initial)
! {
return MinimalFuture.supply(() -> new Http2Connection(connection, client2, exchange, initial));
}
+ // Requires TLS handshake. So, is really async
+ static CompletableFuture<Http2Connection> createAsync(HttpRequestImpl request,
+ Http2ClientImpl h2client) {
+ assert request.secure();
+ AbstractAsyncSSLConnection connection = (AbstractAsyncSSLConnection)
+ HttpConnection.getConnection(request.getAddress(),
+ h2client.client(),
+ request,
+ HttpClient.Version.HTTP_2);
+
+ return connection.connectAsync()
+ .thenCompose(unused -> checkSSLConfig(connection))
+ .thenCompose(notused-> {
+ CompletableFuture<Http2Connection> cf = new MinimalFuture<>();
+ try {
+ Http2Connection hc = new Http2Connection(request, h2client, connection);
+ cf.complete(hc);
+ } catch (IOException e) {
+ cf.completeExceptionally(e);
+ }
+ return cf; } );
+ }
+
/**
* Cases 2) 3)
*
* request is request to be sent.
*/
! private Http2Connection(HttpRequestImpl request,
! Http2ClientImpl h2client,
! HttpConnection connection)
! throws IOException
{
! this(connection,
h2client,
1,
! keyFor(request.uri(), request.proxy()));
!
Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize());
// safe to resume async reading now.
! connectFlows(connection);
sendConnectionPreface();
}
+ private void connectFlows(HttpConnection connection) {
+ FlowTube tube = connection.getConnectionFlow();
+ // Connect the flow to our Http2TubeSubscriber:
+ tube.connectFlows(connection.publisher(), subscriber);
+ }
+
+ final HttpClientImpl client() {
+ return client2.client();
+ }
+
/**
* Throws an IOException if h2 was not negotiated
*/
! private static CompletableFuture<?> checkSSLConfig(AbstractAsyncSSLConnection aconn) {
! assert aconn.isSecure();
!
! Function<String, CompletableFuture<Void>> checkAlpnCF = (alpn) -> {
! CompletableFuture<Void> cf = new MinimalFuture<>();
SSLEngine engine = aconn.getEngine();
! assert Objects.equals(alpn, engine.getApplicationProtocol());
!
! DEBUG_LOGGER.log(Level.DEBUG, "checkSSLConfig: alpn: %s", alpn );
!
if (alpn == null || !alpn.equals("h2")) {
String msg;
if (alpn == null) {
Log.logSSL("ALPN not supported");
msg = "ALPN not supported";
! } else {
! switch (alpn) {
case "":
! Log.logSSL(msg = "No ALPN negotiated");
break;
case "http/1.1":
! Log.logSSL( msg = "HTTP/1.1 ALPN returned");
break;
default:
! Log.logSSL(msg = "Unexpected ALPN: " + alpn);
! cf.completeExceptionally(new IOException(msg));
}
}
+ cf.completeExceptionally(new ALPNException(msg, aconn));
+ return cf;
+ }
+ cf.complete(null);
+ return cf;
+ };
+
+ return aconn.getALPN().thenCompose(checkAlpnCF);
}
static String keyFor(HttpConnection connection) {
boolean isProxy = connection.isProxied();
boolean isSecure = connection.isSecure();
*** 320,330 ****
boolean isProxy = proxy != null;
String host;
int port;
! if (isProxy) {
host = proxy.getHostString();
port = proxy.getPort();
} else {
host = uri.getHost();
port = uri.getPort();
--- 408,418 ----
boolean isProxy = proxy != null;
String host;
int port;
! if (proxy != null) {
host = proxy.getHostString();
port = proxy.getPort();
} else {
host = uri.getHost();
port = uri.getPort();
*** 348,443 ****
void putConnection() {
client2.putConnection(this);
}
! private static String toHexdump1(ByteBuffer bb) {
! bb.mark();
! StringBuilder sb = new StringBuilder(512);
! Formatter f = new Formatter(sb);
!
! while (bb.hasRemaining()) {
! int i = Byte.toUnsignedInt(bb.get());
! f.format("%02x:", i);
! }
! sb.deleteCharAt(sb.length()-1);
! bb.reset();
! return sb.toString();
! }
!
! private static String toHexdump(ByteBuffer bb) {
! List<String> words = new ArrayList<>();
! int i = 0;
! bb.mark();
! while (bb.hasRemaining()) {
! if (i % 2 == 0) {
! words.add("");
! }
! byte b = bb.get();
! String hex = Integer.toHexString(256 + Byte.toUnsignedInt(b)).substring(1);
! words.set(i / 2, words.get(i / 2) + hex);
! i++;
! }
! bb.reset();
! return words.stream().collect(Collectors.joining(" "));
}
! private void decodeHeaders(HeaderFrame frame, DecodingCallback decoder) {
boolean endOfHeaders = frame.getFlag(HeaderFrame.END_HEADERS);
! ByteBufferReference[] buffers = frame.getHeaderBlock();
! for (int i = 0; i < buffers.length; i++) {
! hpackIn.decode(buffers[i].get(), endOfHeaders && (i == buffers.length - 1), decoder);
}
}
! int getInitialSendWindowSize() {
return serverSettings.getParameter(INITIAL_WINDOW_SIZE);
}
void close() {
GoAwayFrame f = new GoAwayFrame(0, ErrorFrame.NO_ERROR, "Requested by user".getBytes());
// TODO: set last stream. For now zero ok.
sendFrame(f);
}
! private ByteBufferPool readBufferPool = new ByteBufferPool();
!
! // provides buffer to read data (default size)
! public ByteBufferReference getReadBuffer() {
! return readBufferPool.get(getMaxReceiveFrameSize() + Http2Frame.FRAME_HEADER_SIZE);
! }
!
! private final Object readlock = new Object();
!
! public void asyncReceive(ByteBufferReference buffer) {
// We don't need to read anything and
// we don't want to send anything back to the server
// until the connection preface has been sent.
// Therefore we're going to wait if needed before reading
// (and thus replying) to anything.
// Starting to reply to something (e.g send an ACK to a
// SettingsFrame sent by the server) before the connection
// preface is fully sent might result in the server
// sending a GOAWAY frame with 'invalid_preface'.
! synchronized (readlock) {
try {
! // the readlock ensures that the order of incoming buffers
! // is preserved.
framesController.processReceivedData(framesDecoder, buffer);
} catch (Throwable e) {
String msg = Utils.stackTrace(e);
Log.logTrace(msg);
shutdown(e);
}
}
- }
void shutdown(Throwable t) {
! Log.logError(t);
closed = true;
client2.deleteConnection(this);
List<Stream<?>> c = new LinkedList<>(streams.values());
for (Stream<?> s : c) {
s.cancelImpl(t);
}
--- 436,540 ----
void putConnection() {
client2.putConnection(this);
}
! private HttpPublisher publisher() {
! return connection.publisher();
}
! private void decodeHeaders(HeaderFrame frame, DecodingCallback decoder)
! throws IOException
! {
! debugHpack.log(Level.DEBUG, "decodeHeaders(%s)", decoder);
!
boolean endOfHeaders = frame.getFlag(HeaderFrame.END_HEADERS);
! List<ByteBuffer> buffers = frame.getHeaderBlock();
! int len = buffers.size();
! for (int i = 0; i < len; i++) {
! ByteBuffer b = buffers.get(i);
! hpackIn.decode(b, endOfHeaders && (i == len - 1), decoder);
}
}
! final int getInitialSendWindowSize() {
return serverSettings.getParameter(INITIAL_WINDOW_SIZE);
}
void close() {
GoAwayFrame f = new GoAwayFrame(0, ErrorFrame.NO_ERROR, "Requested by user".getBytes());
// TODO: set last stream. For now zero ok.
sendFrame(f);
}
! long count;
! final void asyncReceive(ByteBuffer buffer) {
// We don't need to read anything and
// we don't want to send anything back to the server
// until the connection preface has been sent.
// Therefore we're going to wait if needed before reading
// (and thus replying) to anything.
// Starting to reply to something (e.g send an ACK to a
// SettingsFrame sent by the server) before the connection
// preface is fully sent might result in the server
// sending a GOAWAY frame with 'invalid_preface'.
! //
! // Note: asyncReceive is only called from the Http2TubeSubscriber
! // sequential scheduler.
try {
! Supplier<ByteBuffer> bs = initial;
! // ensure that we always handle the initial buffer first,
! // if any.
! if (bs != null) {
! initial = null;
! ByteBuffer b = bs.get();
! if (b.hasRemaining()) {
! long c = ++count;
! debug.log(Level.DEBUG, () -> "H2 Receiving Initial("
! + c +"): " + b.remaining());
! framesController.processReceivedData(framesDecoder, b);
! }
! }
! ByteBuffer b = buffer;
! // the Http2TubeSubscriber scheduler ensures that the order of incoming
! // buffers is preserved.
! if (b == EMPTY_TRIGGER) {
! debug.log(Level.DEBUG, "H2 Received EMPTY_TRIGGER");
! boolean prefaceSent = framesController.prefaceSent;
! assert prefaceSent;
! // call framesController.processReceivedData to potentially
! // trigger the processing of all the data buffered there.
! framesController.processReceivedData(framesDecoder, buffer);
! debug.log(Level.DEBUG, "H2 processed buffered data");
! } else {
! long c = ++count;
! debug.log(Level.DEBUG, "H2 Receiving(%d): %d", c, b.remaining());
framesController.processReceivedData(framesDecoder, buffer);
+ debug.log(Level.DEBUG, "H2 processed(%d)", c);
+ }
} catch (Throwable e) {
String msg = Utils.stackTrace(e);
Log.logTrace(msg);
shutdown(e);
}
}
+ Throwable getRecordedCause() {
+ return cause;
+ }
void shutdown(Throwable t) {
! debug.log(Level.DEBUG, () -> "Shutting down h2c (closed="+closed+"): " + t);
! if (closed == true) return;
! synchronized (this) {
! if (closed == true) return;
closed = true;
+ }
+ Log.logError(t);
+ Throwable initialCause = this.cause;
+ if (initialCause == null) this.cause = t;
client2.deleteConnection(this);
List<Stream<?>> c = new LinkedList<>(streams.values());
for (Stream<?> s : c) {
s.cancelImpl(t);
}
*** 455,466 ****
Log.logFrames(frame, "IN");
int streamid = frame.streamid();
if (frame instanceof MalformedFrame) {
Log.logError(((MalformedFrame) frame).getMessage());
if (streamid == 0) {
! protocolError(((MalformedFrame) frame).getErrorCode());
} else {
resetStream(streamid, ((MalformedFrame) frame).getErrorCode());
}
return;
}
if (streamid == 0) {
--- 552,566 ----
Log.logFrames(frame, "IN");
int streamid = frame.streamid();
if (frame instanceof MalformedFrame) {
Log.logError(((MalformedFrame) frame).getMessage());
if (streamid == 0) {
! protocolError(((MalformedFrame) frame).getErrorCode(),
! ((MalformedFrame) frame).getMessage());
} else {
+ debug.log(Level.DEBUG, () -> "Reset stream: "
+ + ((MalformedFrame) frame).getMessage());
resetStream(streamid, ((MalformedFrame) frame).getErrorCode());
}
return;
}
if (streamid == 0) {
*** 474,486 ****
Stream<?> stream = getStream(streamid);
if (stream == null) {
// Should never receive a frame with unknown stream id
! // To avoid looping, an endpoint MUST NOT send a RST_STREAM in
! // response to a RST_STREAM frame.
! if (!(frame instanceof ResetFrame)) {
resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
}
return;
}
if (frame instanceof PushPromiseFrame) {
--- 574,593 ----
Stream<?> stream = getStream(streamid);
if (stream == null) {
// Should never receive a frame with unknown stream id
! if (frame instanceof HeaderFrame) {
! // always decode the headers as they may affect
! // connection-level HPACK decoding state
! HeaderDecoder decoder = new LoggingHeaderDecoder(new HeaderDecoder());
! decodeHeaders((HeaderFrame) frame, decoder);
! }
!
! int sid = frame.streamid();
! if (sid >= nextstreamid && !(frame instanceof ResetFrame)) {
! // otherwise the stream has already been reset/closed
resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
}
return;
}
if (frame instanceof PushPromiseFrame) {
*** 497,516 ****
}
private <T> void handlePushPromise(Stream<T> parent, PushPromiseFrame pp)
throws IOException
{
HttpRequestImpl parentReq = parent.request;
int promisedStreamid = pp.getPromisedStream();
if (promisedStreamid != nextPushStream) {
resetStream(promisedStreamid, ResetFrame.PROTOCOL_ERROR);
return;
} else {
nextPushStream += 2;
}
! HeaderDecoder decoder = new HeaderDecoder();
! decodeHeaders(pp, decoder);
HttpHeadersImpl headers = decoder.headers();
HttpRequestImpl pushReq = HttpRequestImpl.createPushRequest(parentReq, headers);
Exchange<T> pushExch = new Exchange<>(pushReq, parent.exchange.multi);
Stream.PushedStream<?,T> pushStream = createPushStream(parent, pushExch);
pushExch.exchImpl = pushStream;
--- 604,627 ----
}
private <T> void handlePushPromise(Stream<T> parent, PushPromiseFrame pp)
throws IOException
{
+ // always decode the headers as they may affect connection-level HPACK
+ // decoding state
+ HeaderDecoder decoder = new LoggingHeaderDecoder(new HeaderDecoder());
+ decodeHeaders(pp, decoder);
+
HttpRequestImpl parentReq = parent.request;
int promisedStreamid = pp.getPromisedStream();
if (promisedStreamid != nextPushStream) {
resetStream(promisedStreamid, ResetFrame.PROTOCOL_ERROR);
return;
} else {
nextPushStream += 2;
}
!
HttpHeadersImpl headers = decoder.headers();
HttpRequestImpl pushReq = HttpRequestImpl.createPushRequest(parentReq, headers);
Exchange<T> pushExch = new Exchange<>(pushReq, parent.exchange.multi);
Stream.PushedStream<?,T> pushStream = createPushStream(parent, pushExch);
pushExch.exchImpl = pushStream;
*** 547,557 ****
--- 658,676 ----
sendFrame(frame);
closeStream(streamid);
}
void closeStream(int streamid) {
+ debug.log(Level.DEBUG, "Closed stream %d", streamid);
Stream<?> s = streams.remove(streamid);
+ if (s != null) {
+ // decrement the reference count on the HttpClientImpl
+ // to allow the SelectorManager thread to exit if no
+ // other operation is pending and the facade is no
+ // longer referenced.
+ client().unreference();
+ }
// ## Remove s != null. It is a hack for delayed cancellation,reset
if (s != null && !(s instanceof Stream.PushedStream)) {
// Since PushStreams have no request body, then they have no
// corresponding entry in the window controller.
windowController.removeStream(streamid);
*** 577,589 ****
}
private void protocolError(int errorCode)
throws IOException
{
GoAwayFrame frame = new GoAwayFrame(0, errorCode);
sendFrame(frame);
! shutdown(new IOException("protocol error"));
}
private void handleSettings(SettingsFrame frame)
throws IOException
{
--- 696,714 ----
}
private void protocolError(int errorCode)
throws IOException
{
+ protocolError(errorCode, null);
+ }
+
+ private void protocolError(int errorCode, String msg)
+ throws IOException
+ {
GoAwayFrame frame = new GoAwayFrame(0, errorCode);
sendFrame(frame);
! shutdown(new IOException("protocol error" + (msg == null?"":(": " + msg))));
}
private void handleSettings(SettingsFrame frame)
throws IOException
{
*** 631,645 ****
*/
public int getMaxReceiveFrameSize() {
return clientSettings.getParameter(MAX_FRAME_SIZE);
}
- // Not sure how useful this is.
- public int getMaxHeadersSize() {
- return serverSettings.getParameter(MAX_HEADER_LIST_SIZE);
- }
-
private static final String CLIENT_PREFACE = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
private static final byte[] PREFACE_BYTES =
CLIENT_PREFACE.getBytes(StandardCharsets.ISO_8859_1);
--- 756,765 ----
*** 650,663 ****
private void sendConnectionPreface() throws IOException {
Log.logTrace("{0}: start sending connection preface to {1}",
connection.channel().getLocalAddress(),
connection.address());
SettingsFrame sf = client2.getClientSettings();
! ByteBufferReference ref = framesEncoder.encodeConnectionPreface(PREFACE_BYTES, sf);
Log.logFrames(sf, "OUT");
// send preface bytes and SettingsFrame together
! connection.write(ref.get());
// mark preface sent.
framesController.markPrefaceSent();
Log.logTrace("PREFACE_BYTES sent");
Log.logTrace("Settings Frame sent");
--- 770,785 ----
private void sendConnectionPreface() throws IOException {
Log.logTrace("{0}: start sending connection preface to {1}",
connection.channel().getLocalAddress(),
connection.address());
SettingsFrame sf = client2.getClientSettings();
! ByteBuffer buf = framesEncoder.encodeConnectionPreface(PREFACE_BYTES, sf);
Log.logFrames(sf, "OUT");
// send preface bytes and SettingsFrame together
! HttpPublisher publisher = publisher();
! publisher.enqueue(List.of(buf));
! publisher.signalEnqueued();
// mark preface sent.
framesController.markPrefaceSent();
Log.logTrace("PREFACE_BYTES sent");
Log.logTrace("Settings Frame sent");
*** 667,676 ****
--- 789,801 ----
windowUpdater.sendWindowUpdate(len);
// there will be an ACK to the windows update - which should
// cause any pending data stored before the preface was sent to be
// flushed (see PrefaceController).
Log.logTrace("finished sending connection preface");
+ debug.log(Level.DEBUG, "Triggering processing of buffered data"
+ + " after sending connection preface");
+ subscriber.onNext(List.of(EMPTY_TRIGGER));
}
/**
* Returns an existing Stream with given id, or null if doesn't exist
*/
*** 680,721 ****
}
/**
* Creates Stream with given id.
*/
! <T> Stream<T> createStream(Exchange<T> exchange) {
! Stream<T> stream = new Stream<>(client, this, exchange, windowController);
return stream;
}
<T> Stream.PushedStream<?,T> createPushStream(Stream<T> parent, Exchange<T> pushEx) {
PushGroup<?,T> pg = parent.exchange.getPushGroup();
! return new Stream.PushedStream<>(pg, client, this, parent, pushEx);
}
<T> void putStream(Stream<T> stream, int streamid) {
streams.put(streamid, stream);
}
- void deleteStream(int streamid) {
- streams.remove(streamid);
- windowController.removeStream(streamid);
- }
-
/**
* Encode the headers into a List<ByteBuffer> and then create HEADERS
* and CONTINUATION frames from the list and return the List<Http2Frame>.
*/
private List<HeaderFrame> encodeHeaders(OutgoingHeaders<Stream<?>> frame) {
! List<ByteBufferReference> buffers = encodeHeadersImpl(
getMaxSendFrameSize(),
frame.getAttachment().getRequestPseudoHeaders(),
frame.getUserHeaders(),
frame.getSystemHeaders());
List<HeaderFrame> frames = new ArrayList<>(buffers.size());
! Iterator<ByteBufferReference> bufIterator = buffers.iterator();
HeaderFrame oframe = new HeadersFrame(frame.streamid(), frame.getFlags(), bufIterator.next());
frames.add(oframe);
while(bufIterator.hasNext()) {
oframe = new ContinuationFrame(frame.streamid(), bufIterator.next());
frames.add(oframe);
--- 805,845 ----
}
/**
* Creates Stream with given id.
*/
! final <T> Stream<T> createStream(Exchange<T> exchange) {
! Stream<T> stream = new Stream<>(this, exchange, windowController);
return stream;
}
<T> Stream.PushedStream<?,T> createPushStream(Stream<T> parent, Exchange<T> pushEx) {
PushGroup<?,T> pg = parent.exchange.getPushGroup();
! return new Stream.PushedStream<>(pg, this, pushEx);
}
<T> void putStream(Stream<T> stream, int streamid) {
+ // increment the reference count on the HttpClientImpl
+ // to prevent the SelectorManager thread from exiting until
+ // the stream is closed.
+ client().reference();
streams.put(streamid, stream);
}
/**
* Encode the headers into a List<ByteBuffer> and then create HEADERS
* and CONTINUATION frames from the list and return the List<Http2Frame>.
*/
private List<HeaderFrame> encodeHeaders(OutgoingHeaders<Stream<?>> frame) {
! List<ByteBuffer> buffers = encodeHeadersImpl(
getMaxSendFrameSize(),
frame.getAttachment().getRequestPseudoHeaders(),
frame.getUserHeaders(),
frame.getSystemHeaders());
List<HeaderFrame> frames = new ArrayList<>(buffers.size());
! Iterator<ByteBuffer> bufIterator = buffers.iterator();
HeaderFrame oframe = new HeadersFrame(frame.streamid(), frame.getFlags(), bufIterator.next());
frames.add(oframe);
while(bufIterator.hasNext()) {
oframe = new ContinuationFrame(frame.streamid(), bufIterator.next());
frames.add(oframe);
*** 726,741 ****
// Dedicated cache for headers encoding ByteBuffer.
// There can be no concurrent access to this buffer as all access to this buffer
// and its content happen within a single critical code block section protected
// by the sendLock. / (see sendFrame())
! private ByteBufferPool headerEncodingPool = new ByteBufferPool();
! private ByteBufferReference getHeaderBuffer(int maxFrameSize) {
! ByteBufferReference ref = headerEncodingPool.get(maxFrameSize);
! ref.get().limit(maxFrameSize);
! return ref;
}
/*
* Encodes all the headers from the given HttpHeaders into the given List
* of buffers.
--- 850,865 ----
// Dedicated cache for headers encoding ByteBuffer.
// There can be no concurrent access to this buffer as all access to this buffer
// and its content happen within a single critical code block section protected
// by the sendLock. / (see sendFrame())
! // private final ByteBufferPool headerEncodingPool = new ByteBufferPool();
! private ByteBuffer getHeaderBuffer(int maxFrameSize) {
! ByteBuffer buf = ByteBuffer.allocate(maxFrameSize);
! buf.limit(maxFrameSize);
! return buf;
}
/*
* Encodes all the headers from the given HttpHeaders into the given List
* of buffers.
*** 745,777 ****
* ...Just as in HTTP/1.x, header field names are strings of ASCII
* characters that are compared in a case-insensitive fashion. However,
* header field names MUST be converted to lowercase prior to their
* encoding in HTTP/2...
*/
! private List<ByteBufferReference> encodeHeadersImpl(int maxFrameSize, HttpHeaders... headers) {
! ByteBufferReference buffer = getHeaderBuffer(maxFrameSize);
! List<ByteBufferReference> buffers = new ArrayList<>();
for(HttpHeaders header : headers) {
for (Map.Entry<String, List<String>> e : header.map().entrySet()) {
String lKey = e.getKey().toLowerCase();
List<String> values = e.getValue();
for (String value : values) {
hpackOut.header(lKey, value);
! while (!hpackOut.encode(buffer.get())) {
! buffer.get().flip();
buffers.add(buffer);
buffer = getHeaderBuffer(maxFrameSize);
}
}
}
}
! buffer.get().flip();
buffers.add(buffer);
return buffers;
}
! private ByteBufferReference[] encodeHeaders(OutgoingHeaders<Stream<?>> oh, Stream<?> stream) {
oh.streamid(stream.streamid);
if (Log.headers()) {
StringBuilder sb = new StringBuilder("HEADERS FRAME (stream=");
sb.append(stream.streamid).append(")\n");
Log.dumpHeaders(sb, " ", oh.getAttachment().getRequestPseudoHeaders());
--- 869,901 ----
* ...Just as in HTTP/1.x, header field names are strings of ASCII
* characters that are compared in a case-insensitive fashion. However,
* header field names MUST be converted to lowercase prior to their
* encoding in HTTP/2...
*/
! private List<ByteBuffer> encodeHeadersImpl(int maxFrameSize, HttpHeaders... headers) {
! ByteBuffer buffer = getHeaderBuffer(maxFrameSize);
! List<ByteBuffer> buffers = new ArrayList<>();
for(HttpHeaders header : headers) {
for (Map.Entry<String, List<String>> e : header.map().entrySet()) {
String lKey = e.getKey().toLowerCase();
List<String> values = e.getValue();
for (String value : values) {
hpackOut.header(lKey, value);
! while (!hpackOut.encode(buffer)) {
! buffer.flip();
buffers.add(buffer);
buffer = getHeaderBuffer(maxFrameSize);
}
}
}
}
! buffer.flip();
buffers.add(buffer);
return buffers;
}
! private List<ByteBuffer> encodeHeaders(OutgoingHeaders<Stream<?>> oh, Stream<?> stream) {
oh.streamid(stream.streamid);
if (Log.headers()) {
StringBuilder sb = new StringBuilder("HEADERS FRAME (stream=");
sb.append(stream.streamid).append(")\n");
Log.dumpHeaders(sb, " ", oh.getAttachment().getRequestPseudoHeaders());
*** 781,810 ****
}
List<HeaderFrame> frames = encodeHeaders(oh);
return encodeFrames(frames);
}
! private ByteBufferReference[] encodeFrames(List<HeaderFrame> frames) {
if (Log.frames()) {
frames.forEach(f -> Log.logFrames(f, "OUT"));
}
return framesEncoder.encodeFrames(frames);
}
- static Throwable getExceptionFrom(CompletableFuture<?> cf) {
- try {
- cf.get();
- return null;
- } catch (Throwable e) {
- if (e.getCause() != null) {
- return e.getCause();
- } else {
- return e;
- }
- }
- }
-
private Stream<?> registerNewStream(OutgoingHeaders<Stream<?>> oh) {
Stream<?> stream = oh.getAttachment();
int streamid = nextstreamid;
nextstreamid += 2;
stream.registerStream(streamid);
--- 905,921 ----
}
List<HeaderFrame> frames = encodeHeaders(oh);
return encodeFrames(frames);
}
! private List<ByteBuffer> encodeFrames(List<HeaderFrame> frames) {
if (Log.frames()) {
frames.forEach(f -> Log.logFrames(f, "OUT"));
}
return framesEncoder.encodeFrames(frames);
}
private Stream<?> registerNewStream(OutgoingHeaders<Stream<?>> oh) {
Stream<?> stream = oh.getAttachment();
int streamid = nextstreamid;
nextstreamid += 2;
stream.registerStream(streamid);
*** 816,854 ****
private final Object sendlock = new Object();
void sendFrame(Http2Frame frame) {
try {
synchronized (sendlock) {
if (frame instanceof OutgoingHeaders) {
@SuppressWarnings("unchecked")
OutgoingHeaders<Stream<?>> oh = (OutgoingHeaders<Stream<?>>) frame;
Stream<?> stream = registerNewStream(oh);
// provide protection from inserting unordered frames between Headers and Continuation
! connection.writeAsync(encodeHeaders(oh, stream));
} else {
! connection.writeAsync(encodeFrame(frame));
}
}
! connection.flushAsync();
} catch (IOException e) {
if (!closed) {
Log.logError(e);
shutdown(e);
}
}
}
! private ByteBufferReference[] encodeFrame(Http2Frame frame) {
Log.logFrames(frame, "OUT");
return framesEncoder.encodeFrame(frame);
}
void sendDataFrame(DataFrame frame) {
try {
! connection.writeAsync(encodeFrame(frame));
! connection.flushAsync();
} catch (IOException e) {
if (!closed) {
Log.logError(e);
shutdown(e);
}
--- 927,967 ----
private final Object sendlock = new Object();
void sendFrame(Http2Frame frame) {
try {
+ HttpPublisher publisher = publisher();
synchronized (sendlock) {
if (frame instanceof OutgoingHeaders) {
@SuppressWarnings("unchecked")
OutgoingHeaders<Stream<?>> oh = (OutgoingHeaders<Stream<?>>) frame;
Stream<?> stream = registerNewStream(oh);
// provide protection from inserting unordered frames between Headers and Continuation
! publisher.enqueue(encodeHeaders(oh, stream));
} else {
! publisher.enqueue(encodeFrame(frame));
}
}
! publisher.signalEnqueued();
} catch (IOException e) {
if (!closed) {
Log.logError(e);
shutdown(e);
}
}
}
! private List<ByteBuffer> encodeFrame(Http2Frame frame) {
Log.logFrames(frame, "OUT");
return framesEncoder.encodeFrame(frame);
}
void sendDataFrame(DataFrame frame) {
try {
! HttpPublisher publisher = publisher();
! publisher.enqueue(encodeFrame(frame));
! publisher.signalEnqueued();
} catch (IOException e) {
if (!closed) {
Log.logError(e);
shutdown(e);
}
*** 860,879 ****
* allowed only of control frames: WindowUpdateFrame, PingFrame and etc.
* prohibited for such frames as DataFrame, HeadersFrame, ContinuationFrame.
*/
void sendUnorderedFrame(Http2Frame frame) {
try {
! connection.writeAsyncUnordered(encodeFrame(frame));
! connection.flushAsync();
} catch (IOException e) {
if (!closed) {
Log.logError(e);
shutdown(e);
}
}
}
static class HeaderDecoder implements DecodingCallback {
HttpHeadersImpl headers;
HeaderDecoder() {
this.headers = new HttpHeadersImpl();
--- 973,1187 ----
* allowed only of control frames: WindowUpdateFrame, PingFrame and etc.
* prohibited for such frames as DataFrame, HeadersFrame, ContinuationFrame.
*/
void sendUnorderedFrame(Http2Frame frame) {
try {
! HttpPublisher publisher = publisher();
! publisher.enqueueUnordered(encodeFrame(frame));
! publisher.signalEnqueued();
} catch (IOException e) {
if (!closed) {
Log.logError(e);
shutdown(e);
}
}
}
+ /**
+ * A simple tube subscriber for reading from the connection flow.
+ */
+ final class Http2TubeSubscriber implements TubeSubscriber {
+ volatile Flow.Subscription subscription;
+ volatile boolean completed;
+ volatile boolean dropped;
+ volatile Throwable error;
+ final ConcurrentLinkedQueue<ByteBuffer> queue
+ = new ConcurrentLinkedQueue<>();
+ final SequentialScheduler scheduler =
+ SequentialScheduler.synchronizedScheduler(this::processQueue);
+
+ final void processQueue() {
+ try {
+ while (!queue.isEmpty() && !scheduler.isStopped()) {
+ ByteBuffer buffer = queue.poll();
+ debug.log(Level.DEBUG,
+ "sending %d to Http2Connection.asyncReceive",
+ buffer.remaining());
+ asyncReceive(buffer);
+ }
+ } catch (Throwable t) {
+ Throwable x = error;
+ if (x == null) error = t;
+ } finally {
+ Throwable x = error;
+ if (x != null) {
+ debug.log(Level.DEBUG, "Stopping scheduler", x);
+ scheduler.stop();
+ Http2Connection.this.shutdown(x);
+ }
+ }
+ }
+
+
+ public void onSubscribe(Flow.Subscription subscription) {
+ // supports being called multiple time.
+ // doesn't cancel the previous subscription, since that is
+ // most probably the same as the new subscription.
+ assert this.subscription == null || dropped == false;
+ this.subscription = subscription;
+ dropped = false;
+ // TODO FIXME: request(1) should be done by the delegate.
+ if (!completed) {
+ debug.log(Level.DEBUG, "onSubscribe: requesting Long.MAX_VALUE for reading");
+ subscription.request(Long.MAX_VALUE);
+ } else {
+ debug.log(Level.DEBUG, "onSubscribe: already completed");
+ }
+ }
+
+ @Override
+ public void onNext(List<ByteBuffer> item) {
+ debug.log(Level.DEBUG, () -> "onNext: got " + Utils.remaining(item)
+ + " bytes in " + item.size() + " buffers");
+ queue.addAll(item);
+ scheduler.deferOrSchedule(client().theExecutor());
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ debug.log(Level.DEBUG, () -> "onError: " + throwable);
+ error = throwable;
+ completed = true;
+ scheduler.deferOrSchedule(client().theExecutor());
+ }
+
+ @Override
+ public void onComplete() {
+ debug.log(Level.DEBUG, "EOF");
+ error = new EOFException("EOF reached while reading");
+ completed = true;
+ scheduler.deferOrSchedule(client().theExecutor());
+ }
+
+ public void dropSubscription() {
+ debug.log(Level.DEBUG, "dropSubscription");
+ // we could probably set subscription to null here...
+ // then we might not need the 'dropped' boolean?
+ dropped = true;
+ }
+ }
+
+ @Override
+ public final String toString() {
+ return dbgString();
+ }
+
+ final String dbgString() {
+ return "Http2Connection("
+ + connection.getConnectionFlow() + ")";
+ }
+
+ final class LoggingHeaderDecoder extends HeaderDecoder {
+
+ private final HeaderDecoder delegate;
+ private final System.Logger debugHpack =
+ Utils.getHpackLogger(this::dbgString, DEBUG_HPACK);
+
+ LoggingHeaderDecoder(HeaderDecoder delegate) {
+ this.delegate = delegate;
+ }
+
+ String dbgString() {
+ return Http2Connection.this.dbgString() + "/LoggingHeaderDecoder";
+ }
+
+ @Override
+ public void onDecoded(CharSequence name, CharSequence value) {
+ delegate.onDecoded(name, value);
+ }
+
+ @Override
+ public void onIndexed(int index,
+ CharSequence name,
+ CharSequence value) {
+ debugHpack.log(Level.DEBUG, "onIndexed(%s, %s, %s)%n",
+ index, name, value);
+ delegate.onIndexed(index, name, value);
+ }
+
+ @Override
+ public void onLiteral(int index,
+ CharSequence name,
+ CharSequence value,
+ boolean valueHuffman) {
+ debugHpack.log(Level.DEBUG, "onLiteral(%s, %s, %s, %s)%n",
+ index, name, value, valueHuffman);
+ delegate.onLiteral(index, name, value, valueHuffman);
+ }
+
+ @Override
+ public void onLiteral(CharSequence name,
+ boolean nameHuffman,
+ CharSequence value,
+ boolean valueHuffman) {
+ debugHpack.log(Level.DEBUG, "onLiteral(%s, %s, %s, %s)%n",
+ name, nameHuffman, value, valueHuffman);
+ delegate.onLiteral(name, nameHuffman, value, valueHuffman);
+ }
+
+ @Override
+ public void onLiteralNeverIndexed(int index,
+ CharSequence name,
+ CharSequence value,
+ boolean valueHuffman) {
+ debugHpack.log(Level.DEBUG, "onLiteralNeverIndexed(%s, %s, %s, %s)%n",
+ index, name, value, valueHuffman);
+ delegate.onLiteralNeverIndexed(index, name, value, valueHuffman);
+ }
+
+ @Override
+ public void onLiteralNeverIndexed(CharSequence name,
+ boolean nameHuffman,
+ CharSequence value,
+ boolean valueHuffman) {
+ debugHpack.log(Level.DEBUG, "onLiteralNeverIndexed(%s, %s, %s, %s)%n",
+ name, nameHuffman, value, valueHuffman);
+ delegate.onLiteralNeverIndexed(name, nameHuffman, value, valueHuffman);
+ }
+
+ @Override
+ public void onLiteralWithIndexing(int index,
+ CharSequence name,
+ CharSequence value,
+ boolean valueHuffman) {
+ debugHpack.log(Level.DEBUG, "onLiteralWithIndexing(%s, %s, %s, %s)%n",
+ index, name, value, valueHuffman);
+ delegate.onLiteralWithIndexing(index, name, value, valueHuffman);
+ }
+
+ @Override
+ public void onLiteralWithIndexing(CharSequence name,
+ boolean nameHuffman,
+ CharSequence value,
+ boolean valueHuffman) {
+ debugHpack.log(Level.DEBUG, "onLiteralWithIndexing(%s, %s, %s, %s)%n",
+ name, nameHuffman, value, valueHuffman);
+ delegate.onLiteralWithIndexing(name, nameHuffman, value, valueHuffman);
+ }
+
+ @Override
+ public void onSizeUpdate(int capacity) {
+ debugHpack.log(Level.DEBUG, "onSizeUpdate(%s)%n", capacity);
+ delegate.onSizeUpdate(capacity);
+ }
+
+ @Override
+ HttpHeadersImpl headers() {
+ return delegate.headers();
+ }
+ }
+
static class HeaderDecoder implements DecodingCallback {
HttpHeadersImpl headers;
HeaderDecoder() {
this.headers = new HttpHeadersImpl();
< prev index next >