< prev index next >

src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java

Print this page

        

*** 23,53 **** * questions. */ package jdk.incubator.http; import java.io.IOException; import java.net.InetSocketAddress; import java.net.URI; - import jdk.incubator.http.HttpConnection.Mode; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; - import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.ArrayList; ! import java.util.Collections; ! import java.util.Formatter; import java.util.concurrent.ConcurrentHashMap; ! import java.util.concurrent.CountDownLatch; ! import java.util.stream.Collectors; import javax.net.ssl.SSLEngine; ! import jdk.incubator.http.internal.common.*; ! import jdk.incubator.http.internal.frame.*; import jdk.incubator.http.internal.hpack.Encoder; import jdk.incubator.http.internal.hpack.Decoder; import jdk.incubator.http.internal.hpack.DecodingCallback; import static jdk.incubator.http.internal.frame.SettingsFrame.*; --- 23,76 ---- * questions. */ package jdk.incubator.http; + import java.io.EOFException; import java.io.IOException; + import java.lang.System.Logger.Level; import java.net.InetSocketAddress; import java.net.URI; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.ArrayList; ! import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; ! import java.util.concurrent.ConcurrentLinkedQueue; ! import java.util.concurrent.Flow; ! import java.util.function.Function; ! import java.util.function.Supplier; import javax.net.ssl.SSLEngine; ! import jdk.incubator.http.HttpConnection.HttpPublisher; ! import jdk.incubator.http.internal.common.FlowTube; ! import jdk.incubator.http.internal.common.FlowTube.TubeSubscriber; ! import jdk.incubator.http.internal.common.HttpHeadersImpl; ! import jdk.incubator.http.internal.common.Log; ! import jdk.incubator.http.internal.common.MinimalFuture; ! import jdk.incubator.http.internal.common.SequentialScheduler; ! import jdk.incubator.http.internal.common.Utils; ! import jdk.incubator.http.internal.frame.ContinuationFrame; ! import jdk.incubator.http.internal.frame.DataFrame; ! import jdk.incubator.http.internal.frame.ErrorFrame; ! import jdk.incubator.http.internal.frame.FramesDecoder; ! import jdk.incubator.http.internal.frame.FramesEncoder; ! import jdk.incubator.http.internal.frame.GoAwayFrame; ! import jdk.incubator.http.internal.frame.HeaderFrame; ! import jdk.incubator.http.internal.frame.HeadersFrame; ! import jdk.incubator.http.internal.frame.Http2Frame; ! import jdk.incubator.http.internal.frame.MalformedFrame; ! import jdk.incubator.http.internal.frame.OutgoingHeaders; ! import jdk.incubator.http.internal.frame.PingFrame; ! import jdk.incubator.http.internal.frame.PushPromiseFrame; ! import jdk.incubator.http.internal.frame.ResetFrame; ! import jdk.incubator.http.internal.frame.SettingsFrame; ! import jdk.incubator.http.internal.frame.WindowUpdateFrame; import jdk.incubator.http.internal.hpack.Encoder; import jdk.incubator.http.internal.hpack.Decoder; import jdk.incubator.http.internal.hpack.DecodingCallback; import static jdk.incubator.http.internal.frame.SettingsFrame.*;
*** 81,95 **** * or handles them directly itself. This thread performs hpack decompression * and incoming stream creation (Server push). Incoming frames destined for a * stream are provided by calling Stream.incoming(). */ class Http2Connection { /* * ByteBuffer pooling strategy for HTTP/2 protocol: * * In general there are 4 points where ByteBuffers are used: ! * - incoming/outgoing frames from/to ByteBufers plus incoming/outgoing encrypted data * in case of SSL connection. * * 1. Outgoing frames encoded to ByteBuffers. * Outgoing ByteBuffers are created with requited size and frequently small (except DataFrames, etc) * At this place no pools at all. All outgoing buffers should be collected by GC. --- 104,128 ---- * or handles them directly itself. This thread performs hpack decompression * and incoming stream creation (Server push). Incoming frames destined for a * stream are provided by calling Stream.incoming(). */ class Http2Connection { + + static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag. + static final boolean DEBUG_HPACK = Utils.DEBUG_HPACK; // Revisit: temporary dev flag. + final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG); + final static System.Logger DEBUG_LOGGER = + Utils.getDebugLogger("Http2Connection"::toString, DEBUG); + private final System.Logger debugHpack = + Utils.getHpackLogger(this::dbgString, DEBUG_HPACK); + static final ByteBuffer EMPTY_TRIGGER = ByteBuffer.allocate(0); + /* * ByteBuffer pooling strategy for HTTP/2 protocol: * * In general there are 4 points where ByteBuffers are used: ! * - incoming/outgoing frames from/to ByteBuffers plus incoming/outgoing encrypted data * in case of SSL connection. * * 1. Outgoing frames encoded to ByteBuffers. * Outgoing ByteBuffers are created with requited size and frequently small (except DataFrames, etc) * At this place no pools at all. All outgoing buffers should be collected by GC.
*** 114,157 **** // A small class that allows to control frames with respect to the state of // the connection preface. Any data received before the connection // preface is sent will be buffered. private final class FramesController { volatile boolean prefaceSent; ! volatile List<ByteBufferReference> pending; ! boolean processReceivedData(FramesDecoder decoder, ByteBufferReference buf) throws IOException { // if preface is not sent, buffers data in the pending list if (!prefaceSent) { synchronized (this) { if (!prefaceSent) { if (pending == null) pending = new ArrayList<>(); pending.add(buf); return false; } } } // Preface is sent. Checks for pending data and flush it. ! // We rely on this method being called from within the readlock, ! // so we know that no other thread could execute this method // concurrently while we're here. // This ensures that later incoming buffers will not // be processed before we have flushed the pending queue. // No additional synchronization is therefore necessary here. ! List<ByteBufferReference> pending = this.pending; this.pending = null; if (pending != null) { // flush pending data ! for (ByteBufferReference b : pending) { decoder.decode(b); } } - // push the received buffer to the frames decoder. decoder.decode(buf); return true; } // Mark that the connection preface is sent void markPrefaceSent() { --- 147,199 ---- // A small class that allows to control frames with respect to the state of // the connection preface. Any data received before the connection // preface is sent will be buffered. private final class FramesController { volatile boolean prefaceSent; ! volatile List<ByteBuffer> pending; ! boolean processReceivedData(FramesDecoder decoder, ByteBuffer buf) throws IOException { // if preface is not sent, buffers data in the pending list if (!prefaceSent) { + debug.log(Level.DEBUG, "Preface is not sent: buffering %d", + buf.remaining()); synchronized (this) { if (!prefaceSent) { if (pending == null) pending = new ArrayList<>(); pending.add(buf); + debug.log(Level.DEBUG, () -> "there are now " + + Utils.remaining(pending) + + " bytes buffered waiting for preface to be sent"); return false; } } } // Preface is sent. Checks for pending data and flush it. ! // We rely on this method being called from within the Http2TubeSubscriber ! // scheduler, so we know that no other thread could execute this method // concurrently while we're here. // This ensures that later incoming buffers will not // be processed before we have flushed the pending queue. // No additional synchronization is therefore necessary here. ! List<ByteBuffer> pending = this.pending; this.pending = null; if (pending != null) { // flush pending data ! debug.log(Level.DEBUG, () -> "Processing buffered data: " ! + Utils.remaining(pending)); ! for (ByteBuffer b : pending) { decoder.decode(b); } } // push the received buffer to the frames decoder. + if (buf != EMPTY_TRIGGER) { + debug.log(Level.DEBUG, "Processing %d", buf.remaining()); decoder.decode(buf); + } return true; } // Mark that the connection preface is sent void markPrefaceSent() {
*** 165,175 **** volatile boolean closed; //------------------------------------- final HttpConnection connection; - private final HttpClientImpl client; private final Http2ClientImpl client2; private final Map<Integer,Stream<?>> streams = new ConcurrentHashMap<>(); private int nextstreamid; private int nextPushStream = 2; private final Encoder hpackOut; --- 207,216 ----
*** 184,194 **** --- 225,238 ---- * Send Window controller for both connection and stream windows. * Each of this connection's Streams MUST use this controller. */ private final WindowController windowController = new WindowController(); private final FramesController framesController = new FramesController(); + private final Http2TubeSubscriber subscriber = new Http2TubeSubscriber(); final WindowUpdateSender windowUpdater; + private volatile Throwable cause; + private volatile Supplier<ByteBuffer> initial; static final int DEFAULT_FRAME_SIZE = 16 * 1024; // TODO: need list of control frames from other threads
*** 197,312 **** private Http2Connection(HttpConnection connection, Http2ClientImpl client2, int nextstreamid, String key) { this.connection = connection; - this.client = client2.client(); this.client2 = client2; this.nextstreamid = nextstreamid; this.key = key; this.clientSettings = this.client2.getClientSettings(); this.framesDecoder = new FramesDecoder(this::processFrame, clientSettings.getParameter(SettingsFrame.MAX_FRAME_SIZE)); // serverSettings will be updated by server this.serverSettings = SettingsFrame.getDefaultSettings(); this.hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE)); this.hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE)); ! this.windowUpdater = new ConnectionWindowUpdateSender(this, client.getReceiveBufferSize()); } /** * Case 1) Create from upgraded HTTP/1.1 connection. ! * Is ready to use. Will not be SSL. exchange is the Exchange * that initiated the connection, whose response will be delivered * on a Stream. */ ! Http2Connection(HttpConnection connection, Http2ClientImpl client2, Exchange<?> exchange, ! ByteBuffer initial) throws IOException, InterruptedException { this(connection, client2, 3, // stream 1 is registered during the upgrade keyFor(connection)); - assert !(connection instanceof SSLConnection); Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize()); Stream<?> initialStream = createStream(exchange); initialStream.registerStream(1); windowController.registerStream(1, getInitialSendWindowSize()); initialStream.requestSent(); sendConnectionPreface(); - // start reading and writing - // start reading - AsyncConnection asyncConn = (AsyncConnection)connection; - asyncConn.setAsyncCallbacks(this::asyncReceive, this::shutdown, this::getReadBuffer); - connection.configureMode(Mode.ASYNC); // set mode only AFTER setAsyncCallbacks to provide visibility. - asyncReceive(ByteBufferReference.of(initial)); - asyncConn.startReading(); } ! // async style but completes immediately static CompletableFuture<Http2Connection> createAsync(HttpConnection connection, Http2ClientImpl client2, Exchange<?> exchange, ! ByteBuffer initial) { return MinimalFuture.supply(() -> new Http2Connection(connection, client2, exchange, initial)); } /** * Cases 2) 3) * * request is request to be sent. */ ! Http2Connection(HttpRequestImpl request, Http2ClientImpl h2client) ! throws IOException, InterruptedException { ! this(HttpConnection.getConnection(request.getAddress(h2client.client()), h2client.client(), request, true), h2client, 1, ! keyFor(request.uri(), request.proxy(h2client.client()))); Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize()); - // start reading - AsyncConnection asyncConn = (AsyncConnection)connection; - asyncConn.setAsyncCallbacks(this::asyncReceive, this::shutdown, this::getReadBuffer); - connection.connect(); - checkSSLConfig(); // safe to resume async reading now. ! asyncConn.enableCallback(); sendConnectionPreface(); } /** * Throws an IOException if h2 was not negotiated */ ! private void checkSSLConfig() throws IOException { ! AbstractAsyncSSLConnection aconn = (AbstractAsyncSSLConnection)connection; SSLEngine engine = aconn.getEngine(); ! String alpn = engine.getApplicationProtocol(); if (alpn == null || !alpn.equals("h2")) { String msg; if (alpn == null) { Log.logSSL("ALPN not supported"); msg = "ALPN not supported"; ! } else switch (alpn) { case "": ! Log.logSSL("No ALPN returned"); ! msg = "No ALPN negotiated"; break; case "http/1.1": ! Log.logSSL("HTTP/1.1 ALPN returned"); ! msg = "HTTP/1.1 ALPN returned"; break; default: ! Log.logSSL("unknown ALPN returned"); ! msg = "Unexpected ALPN: " + alpn; ! throw new IOException(msg); } - throw new ALPNException(msg, aconn); } } static String keyFor(HttpConnection connection) { boolean isProxy = connection.isProxied(); boolean isSecure = connection.isSecure(); --- 241,400 ---- private Http2Connection(HttpConnection connection, Http2ClientImpl client2, int nextstreamid, String key) { this.connection = connection; this.client2 = client2; this.nextstreamid = nextstreamid; this.key = key; this.clientSettings = this.client2.getClientSettings(); this.framesDecoder = new FramesDecoder(this::processFrame, clientSettings.getParameter(SettingsFrame.MAX_FRAME_SIZE)); // serverSettings will be updated by server this.serverSettings = SettingsFrame.getDefaultSettings(); this.hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE)); this.hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE)); ! debugHpack.log(Level.DEBUG, () -> "For the record:" + super.toString()); ! debugHpack.log(Level.DEBUG, "Decoder created: %s", hpackIn); ! debugHpack.log(Level.DEBUG, "Encoder created: %s", hpackOut); ! this.windowUpdater = new ConnectionWindowUpdateSender(this, client().getReceiveBufferSize()); } /** * Case 1) Create from upgraded HTTP/1.1 connection. ! * Is ready to use. Can be SSL. exchange is the Exchange * that initiated the connection, whose response will be delivered * on a Stream. */ ! private Http2Connection(HttpConnection connection, Http2ClientImpl client2, Exchange<?> exchange, ! Supplier<ByteBuffer> initial) throws IOException, InterruptedException { this(connection, client2, 3, // stream 1 is registered during the upgrade keyFor(connection)); Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize()); Stream<?> initialStream = createStream(exchange); initialStream.registerStream(1); windowController.registerStream(1, getInitialSendWindowSize()); initialStream.requestSent(); + // Upgrading: + // set callbacks before sending preface - makes sure anything that + // might be sent by the server will come our way. + this.initial = initial; + connectFlows(connection); sendConnectionPreface(); } ! // Used when upgrading an HTTP/1.1 connection to HTTP/2 after receiving ! // agreement from the server. Async style but completes immediately, because ! // the connection is already connected. static CompletableFuture<Http2Connection> createAsync(HttpConnection connection, Http2ClientImpl client2, Exchange<?> exchange, ! Supplier<ByteBuffer> initial) ! { return MinimalFuture.supply(() -> new Http2Connection(connection, client2, exchange, initial)); } + // Requires TLS handshake. So, is really async + static CompletableFuture<Http2Connection> createAsync(HttpRequestImpl request, + Http2ClientImpl h2client) { + assert request.secure(); + AbstractAsyncSSLConnection connection = (AbstractAsyncSSLConnection) + HttpConnection.getConnection(request.getAddress(), + h2client.client(), + request, + HttpClient.Version.HTTP_2); + + return connection.connectAsync() + .thenCompose(unused -> checkSSLConfig(connection)) + .thenCompose(notused-> { + CompletableFuture<Http2Connection> cf = new MinimalFuture<>(); + try { + Http2Connection hc = new Http2Connection(request, h2client, connection); + cf.complete(hc); + } catch (IOException e) { + cf.completeExceptionally(e); + } + return cf; } ); + } + /** * Cases 2) 3) * * request is request to be sent. */ ! private Http2Connection(HttpRequestImpl request, ! Http2ClientImpl h2client, ! HttpConnection connection) ! throws IOException { ! this(connection, h2client, 1, ! keyFor(request.uri(), request.proxy())); ! Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize()); // safe to resume async reading now. ! connectFlows(connection); sendConnectionPreface(); } + private void connectFlows(HttpConnection connection) { + FlowTube tube = connection.getConnectionFlow(); + // Connect the flow to our Http2TubeSubscriber: + tube.connectFlows(connection.publisher(), subscriber); + } + + final HttpClientImpl client() { + return client2.client(); + } + /** * Throws an IOException if h2 was not negotiated */ ! private static CompletableFuture<?> checkSSLConfig(AbstractAsyncSSLConnection aconn) { ! assert aconn.isSecure(); ! ! Function<String, CompletableFuture<Void>> checkAlpnCF = (alpn) -> { ! CompletableFuture<Void> cf = new MinimalFuture<>(); SSLEngine engine = aconn.getEngine(); ! assert Objects.equals(alpn, engine.getApplicationProtocol()); ! ! DEBUG_LOGGER.log(Level.DEBUG, "checkSSLConfig: alpn: %s", alpn ); ! if (alpn == null || !alpn.equals("h2")) { String msg; if (alpn == null) { Log.logSSL("ALPN not supported"); msg = "ALPN not supported"; ! } else { ! switch (alpn) { case "": ! Log.logSSL(msg = "No ALPN negotiated"); break; case "http/1.1": ! Log.logSSL( msg = "HTTP/1.1 ALPN returned"); break; default: ! Log.logSSL(msg = "Unexpected ALPN: " + alpn); ! cf.completeExceptionally(new IOException(msg)); } } + cf.completeExceptionally(new ALPNException(msg, aconn)); + return cf; + } + cf.complete(null); + return cf; + }; + + return aconn.getALPN().thenCompose(checkAlpnCF); } static String keyFor(HttpConnection connection) { boolean isProxy = connection.isProxied(); boolean isSecure = connection.isSecure();
*** 320,330 **** boolean isProxy = proxy != null; String host; int port; ! if (isProxy) { host = proxy.getHostString(); port = proxy.getPort(); } else { host = uri.getHost(); port = uri.getPort(); --- 408,418 ---- boolean isProxy = proxy != null; String host; int port; ! if (proxy != null) { host = proxy.getHostString(); port = proxy.getPort(); } else { host = uri.getHost(); port = uri.getPort();
*** 348,443 **** void putConnection() { client2.putConnection(this); } ! private static String toHexdump1(ByteBuffer bb) { ! bb.mark(); ! StringBuilder sb = new StringBuilder(512); ! Formatter f = new Formatter(sb); ! ! while (bb.hasRemaining()) { ! int i = Byte.toUnsignedInt(bb.get()); ! f.format("%02x:", i); ! } ! sb.deleteCharAt(sb.length()-1); ! bb.reset(); ! return sb.toString(); ! } ! ! private static String toHexdump(ByteBuffer bb) { ! List<String> words = new ArrayList<>(); ! int i = 0; ! bb.mark(); ! while (bb.hasRemaining()) { ! if (i % 2 == 0) { ! words.add(""); ! } ! byte b = bb.get(); ! String hex = Integer.toHexString(256 + Byte.toUnsignedInt(b)).substring(1); ! words.set(i / 2, words.get(i / 2) + hex); ! i++; ! } ! bb.reset(); ! return words.stream().collect(Collectors.joining(" ")); } ! private void decodeHeaders(HeaderFrame frame, DecodingCallback decoder) { boolean endOfHeaders = frame.getFlag(HeaderFrame.END_HEADERS); ! ByteBufferReference[] buffers = frame.getHeaderBlock(); ! for (int i = 0; i < buffers.length; i++) { ! hpackIn.decode(buffers[i].get(), endOfHeaders && (i == buffers.length - 1), decoder); } } ! int getInitialSendWindowSize() { return serverSettings.getParameter(INITIAL_WINDOW_SIZE); } void close() { GoAwayFrame f = new GoAwayFrame(0, ErrorFrame.NO_ERROR, "Requested by user".getBytes()); // TODO: set last stream. For now zero ok. sendFrame(f); } ! private ByteBufferPool readBufferPool = new ByteBufferPool(); ! ! // provides buffer to read data (default size) ! public ByteBufferReference getReadBuffer() { ! return readBufferPool.get(getMaxReceiveFrameSize() + Http2Frame.FRAME_HEADER_SIZE); ! } ! ! private final Object readlock = new Object(); ! ! public void asyncReceive(ByteBufferReference buffer) { // We don't need to read anything and // we don't want to send anything back to the server // until the connection preface has been sent. // Therefore we're going to wait if needed before reading // (and thus replying) to anything. // Starting to reply to something (e.g send an ACK to a // SettingsFrame sent by the server) before the connection // preface is fully sent might result in the server // sending a GOAWAY frame with 'invalid_preface'. ! synchronized (readlock) { try { ! // the readlock ensures that the order of incoming buffers ! // is preserved. framesController.processReceivedData(framesDecoder, buffer); } catch (Throwable e) { String msg = Utils.stackTrace(e); Log.logTrace(msg); shutdown(e); } } - } void shutdown(Throwable t) { ! Log.logError(t); closed = true; client2.deleteConnection(this); List<Stream<?>> c = new LinkedList<>(streams.values()); for (Stream<?> s : c) { s.cancelImpl(t); } --- 436,540 ---- void putConnection() { client2.putConnection(this); } ! private HttpPublisher publisher() { ! return connection.publisher(); } ! private void decodeHeaders(HeaderFrame frame, DecodingCallback decoder) ! throws IOException ! { ! debugHpack.log(Level.DEBUG, "decodeHeaders(%s)", decoder); ! boolean endOfHeaders = frame.getFlag(HeaderFrame.END_HEADERS); ! List<ByteBuffer> buffers = frame.getHeaderBlock(); ! int len = buffers.size(); ! for (int i = 0; i < len; i++) { ! ByteBuffer b = buffers.get(i); ! hpackIn.decode(b, endOfHeaders && (i == len - 1), decoder); } } ! final int getInitialSendWindowSize() { return serverSettings.getParameter(INITIAL_WINDOW_SIZE); } void close() { GoAwayFrame f = new GoAwayFrame(0, ErrorFrame.NO_ERROR, "Requested by user".getBytes()); // TODO: set last stream. For now zero ok. sendFrame(f); } ! long count; ! final void asyncReceive(ByteBuffer buffer) { // We don't need to read anything and // we don't want to send anything back to the server // until the connection preface has been sent. // Therefore we're going to wait if needed before reading // (and thus replying) to anything. // Starting to reply to something (e.g send an ACK to a // SettingsFrame sent by the server) before the connection // preface is fully sent might result in the server // sending a GOAWAY frame with 'invalid_preface'. ! // ! // Note: asyncReceive is only called from the Http2TubeSubscriber ! // sequential scheduler. try { ! Supplier<ByteBuffer> bs = initial; ! // ensure that we always handle the initial buffer first, ! // if any. ! if (bs != null) { ! initial = null; ! ByteBuffer b = bs.get(); ! if (b.hasRemaining()) { ! long c = ++count; ! debug.log(Level.DEBUG, () -> "H2 Receiving Initial(" ! + c +"): " + b.remaining()); ! framesController.processReceivedData(framesDecoder, b); ! } ! } ! ByteBuffer b = buffer; ! // the Http2TubeSubscriber scheduler ensures that the order of incoming ! // buffers is preserved. ! if (b == EMPTY_TRIGGER) { ! debug.log(Level.DEBUG, "H2 Received EMPTY_TRIGGER"); ! boolean prefaceSent = framesController.prefaceSent; ! assert prefaceSent; ! // call framesController.processReceivedData to potentially ! // trigger the processing of all the data buffered there. ! framesController.processReceivedData(framesDecoder, buffer); ! debug.log(Level.DEBUG, "H2 processed buffered data"); ! } else { ! long c = ++count; ! debug.log(Level.DEBUG, "H2 Receiving(%d): %d", c, b.remaining()); framesController.processReceivedData(framesDecoder, buffer); + debug.log(Level.DEBUG, "H2 processed(%d)", c); + } } catch (Throwable e) { String msg = Utils.stackTrace(e); Log.logTrace(msg); shutdown(e); } } + Throwable getRecordedCause() { + return cause; + } void shutdown(Throwable t) { ! debug.log(Level.DEBUG, () -> "Shutting down h2c (closed="+closed+"): " + t); ! if (closed == true) return; ! synchronized (this) { ! if (closed == true) return; closed = true; + } + Log.logError(t); + Throwable initialCause = this.cause; + if (initialCause == null) this.cause = t; client2.deleteConnection(this); List<Stream<?>> c = new LinkedList<>(streams.values()); for (Stream<?> s : c) { s.cancelImpl(t); }
*** 455,466 **** Log.logFrames(frame, "IN"); int streamid = frame.streamid(); if (frame instanceof MalformedFrame) { Log.logError(((MalformedFrame) frame).getMessage()); if (streamid == 0) { ! protocolError(((MalformedFrame) frame).getErrorCode()); } else { resetStream(streamid, ((MalformedFrame) frame).getErrorCode()); } return; } if (streamid == 0) { --- 552,566 ---- Log.logFrames(frame, "IN"); int streamid = frame.streamid(); if (frame instanceof MalformedFrame) { Log.logError(((MalformedFrame) frame).getMessage()); if (streamid == 0) { ! protocolError(((MalformedFrame) frame).getErrorCode(), ! ((MalformedFrame) frame).getMessage()); } else { + debug.log(Level.DEBUG, () -> "Reset stream: " + + ((MalformedFrame) frame).getMessage()); resetStream(streamid, ((MalformedFrame) frame).getErrorCode()); } return; } if (streamid == 0) {
*** 474,486 **** Stream<?> stream = getStream(streamid); if (stream == null) { // Should never receive a frame with unknown stream id ! // To avoid looping, an endpoint MUST NOT send a RST_STREAM in ! // response to a RST_STREAM frame. ! if (!(frame instanceof ResetFrame)) { resetStream(streamid, ResetFrame.PROTOCOL_ERROR); } return; } if (frame instanceof PushPromiseFrame) { --- 574,593 ---- Stream<?> stream = getStream(streamid); if (stream == null) { // Should never receive a frame with unknown stream id ! if (frame instanceof HeaderFrame) { ! // always decode the headers as they may affect ! // connection-level HPACK decoding state ! HeaderDecoder decoder = new LoggingHeaderDecoder(new HeaderDecoder()); ! decodeHeaders((HeaderFrame) frame, decoder); ! } ! ! int sid = frame.streamid(); ! if (sid >= nextstreamid && !(frame instanceof ResetFrame)) { ! // otherwise the stream has already been reset/closed resetStream(streamid, ResetFrame.PROTOCOL_ERROR); } return; } if (frame instanceof PushPromiseFrame) {
*** 497,516 **** } private <T> void handlePushPromise(Stream<T> parent, PushPromiseFrame pp) throws IOException { HttpRequestImpl parentReq = parent.request; int promisedStreamid = pp.getPromisedStream(); if (promisedStreamid != nextPushStream) { resetStream(promisedStreamid, ResetFrame.PROTOCOL_ERROR); return; } else { nextPushStream += 2; } ! HeaderDecoder decoder = new HeaderDecoder(); ! decodeHeaders(pp, decoder); HttpHeadersImpl headers = decoder.headers(); HttpRequestImpl pushReq = HttpRequestImpl.createPushRequest(parentReq, headers); Exchange<T> pushExch = new Exchange<>(pushReq, parent.exchange.multi); Stream.PushedStream<?,T> pushStream = createPushStream(parent, pushExch); pushExch.exchImpl = pushStream; --- 604,627 ---- } private <T> void handlePushPromise(Stream<T> parent, PushPromiseFrame pp) throws IOException { + // always decode the headers as they may affect connection-level HPACK + // decoding state + HeaderDecoder decoder = new LoggingHeaderDecoder(new HeaderDecoder()); + decodeHeaders(pp, decoder); + HttpRequestImpl parentReq = parent.request; int promisedStreamid = pp.getPromisedStream(); if (promisedStreamid != nextPushStream) { resetStream(promisedStreamid, ResetFrame.PROTOCOL_ERROR); return; } else { nextPushStream += 2; } ! HttpHeadersImpl headers = decoder.headers(); HttpRequestImpl pushReq = HttpRequestImpl.createPushRequest(parentReq, headers); Exchange<T> pushExch = new Exchange<>(pushReq, parent.exchange.multi); Stream.PushedStream<?,T> pushStream = createPushStream(parent, pushExch); pushExch.exchImpl = pushStream;
*** 547,557 **** --- 658,676 ---- sendFrame(frame); closeStream(streamid); } void closeStream(int streamid) { + debug.log(Level.DEBUG, "Closed stream %d", streamid); Stream<?> s = streams.remove(streamid); + if (s != null) { + // decrement the reference count on the HttpClientImpl + // to allow the SelectorManager thread to exit if no + // other operation is pending and the facade is no + // longer referenced. + client().unreference(); + } // ## Remove s != null. It is a hack for delayed cancellation,reset if (s != null && !(s instanceof Stream.PushedStream)) { // Since PushStreams have no request body, then they have no // corresponding entry in the window controller. windowController.removeStream(streamid);
*** 577,589 **** } private void protocolError(int errorCode) throws IOException { GoAwayFrame frame = new GoAwayFrame(0, errorCode); sendFrame(frame); ! shutdown(new IOException("protocol error")); } private void handleSettings(SettingsFrame frame) throws IOException { --- 696,714 ---- } private void protocolError(int errorCode) throws IOException { + protocolError(errorCode, null); + } + + private void protocolError(int errorCode, String msg) + throws IOException + { GoAwayFrame frame = new GoAwayFrame(0, errorCode); sendFrame(frame); ! shutdown(new IOException("protocol error" + (msg == null?"":(": " + msg)))); } private void handleSettings(SettingsFrame frame) throws IOException {
*** 631,645 **** */ public int getMaxReceiveFrameSize() { return clientSettings.getParameter(MAX_FRAME_SIZE); } - // Not sure how useful this is. - public int getMaxHeadersSize() { - return serverSettings.getParameter(MAX_HEADER_LIST_SIZE); - } - private static final String CLIENT_PREFACE = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; private static final byte[] PREFACE_BYTES = CLIENT_PREFACE.getBytes(StandardCharsets.ISO_8859_1); --- 756,765 ----
*** 650,663 **** private void sendConnectionPreface() throws IOException { Log.logTrace("{0}: start sending connection preface to {1}", connection.channel().getLocalAddress(), connection.address()); SettingsFrame sf = client2.getClientSettings(); ! ByteBufferReference ref = framesEncoder.encodeConnectionPreface(PREFACE_BYTES, sf); Log.logFrames(sf, "OUT"); // send preface bytes and SettingsFrame together ! connection.write(ref.get()); // mark preface sent. framesController.markPrefaceSent(); Log.logTrace("PREFACE_BYTES sent"); Log.logTrace("Settings Frame sent"); --- 770,785 ---- private void sendConnectionPreface() throws IOException { Log.logTrace("{0}: start sending connection preface to {1}", connection.channel().getLocalAddress(), connection.address()); SettingsFrame sf = client2.getClientSettings(); ! ByteBuffer buf = framesEncoder.encodeConnectionPreface(PREFACE_BYTES, sf); Log.logFrames(sf, "OUT"); // send preface bytes and SettingsFrame together ! HttpPublisher publisher = publisher(); ! publisher.enqueue(List.of(buf)); ! publisher.signalEnqueued(); // mark preface sent. framesController.markPrefaceSent(); Log.logTrace("PREFACE_BYTES sent"); Log.logTrace("Settings Frame sent");
*** 667,676 **** --- 789,801 ---- windowUpdater.sendWindowUpdate(len); // there will be an ACK to the windows update - which should // cause any pending data stored before the preface was sent to be // flushed (see PrefaceController). Log.logTrace("finished sending connection preface"); + debug.log(Level.DEBUG, "Triggering processing of buffered data" + + " after sending connection preface"); + subscriber.onNext(List.of(EMPTY_TRIGGER)); } /** * Returns an existing Stream with given id, or null if doesn't exist */
*** 680,721 **** } /** * Creates Stream with given id. */ ! <T> Stream<T> createStream(Exchange<T> exchange) { ! Stream<T> stream = new Stream<>(client, this, exchange, windowController); return stream; } <T> Stream.PushedStream<?,T> createPushStream(Stream<T> parent, Exchange<T> pushEx) { PushGroup<?,T> pg = parent.exchange.getPushGroup(); ! return new Stream.PushedStream<>(pg, client, this, parent, pushEx); } <T> void putStream(Stream<T> stream, int streamid) { streams.put(streamid, stream); } - void deleteStream(int streamid) { - streams.remove(streamid); - windowController.removeStream(streamid); - } - /** * Encode the headers into a List<ByteBuffer> and then create HEADERS * and CONTINUATION frames from the list and return the List<Http2Frame>. */ private List<HeaderFrame> encodeHeaders(OutgoingHeaders<Stream<?>> frame) { ! List<ByteBufferReference> buffers = encodeHeadersImpl( getMaxSendFrameSize(), frame.getAttachment().getRequestPseudoHeaders(), frame.getUserHeaders(), frame.getSystemHeaders()); List<HeaderFrame> frames = new ArrayList<>(buffers.size()); ! Iterator<ByteBufferReference> bufIterator = buffers.iterator(); HeaderFrame oframe = new HeadersFrame(frame.streamid(), frame.getFlags(), bufIterator.next()); frames.add(oframe); while(bufIterator.hasNext()) { oframe = new ContinuationFrame(frame.streamid(), bufIterator.next()); frames.add(oframe); --- 805,845 ---- } /** * Creates Stream with given id. */ ! final <T> Stream<T> createStream(Exchange<T> exchange) { ! Stream<T> stream = new Stream<>(this, exchange, windowController); return stream; } <T> Stream.PushedStream<?,T> createPushStream(Stream<T> parent, Exchange<T> pushEx) { PushGroup<?,T> pg = parent.exchange.getPushGroup(); ! return new Stream.PushedStream<>(pg, this, pushEx); } <T> void putStream(Stream<T> stream, int streamid) { + // increment the reference count on the HttpClientImpl + // to prevent the SelectorManager thread from exiting until + // the stream is closed. + client().reference(); streams.put(streamid, stream); } /** * Encode the headers into a List<ByteBuffer> and then create HEADERS * and CONTINUATION frames from the list and return the List<Http2Frame>. */ private List<HeaderFrame> encodeHeaders(OutgoingHeaders<Stream<?>> frame) { ! List<ByteBuffer> buffers = encodeHeadersImpl( getMaxSendFrameSize(), frame.getAttachment().getRequestPseudoHeaders(), frame.getUserHeaders(), frame.getSystemHeaders()); List<HeaderFrame> frames = new ArrayList<>(buffers.size()); ! Iterator<ByteBuffer> bufIterator = buffers.iterator(); HeaderFrame oframe = new HeadersFrame(frame.streamid(), frame.getFlags(), bufIterator.next()); frames.add(oframe); while(bufIterator.hasNext()) { oframe = new ContinuationFrame(frame.streamid(), bufIterator.next()); frames.add(oframe);
*** 726,741 **** // Dedicated cache for headers encoding ByteBuffer. // There can be no concurrent access to this buffer as all access to this buffer // and its content happen within a single critical code block section protected // by the sendLock. / (see sendFrame()) ! private ByteBufferPool headerEncodingPool = new ByteBufferPool(); ! private ByteBufferReference getHeaderBuffer(int maxFrameSize) { ! ByteBufferReference ref = headerEncodingPool.get(maxFrameSize); ! ref.get().limit(maxFrameSize); ! return ref; } /* * Encodes all the headers from the given HttpHeaders into the given List * of buffers. --- 850,865 ---- // Dedicated cache for headers encoding ByteBuffer. // There can be no concurrent access to this buffer as all access to this buffer // and its content happen within a single critical code block section protected // by the sendLock. / (see sendFrame()) ! // private final ByteBufferPool headerEncodingPool = new ByteBufferPool(); ! private ByteBuffer getHeaderBuffer(int maxFrameSize) { ! ByteBuffer buf = ByteBuffer.allocate(maxFrameSize); ! buf.limit(maxFrameSize); ! return buf; } /* * Encodes all the headers from the given HttpHeaders into the given List * of buffers.
*** 745,777 **** * ...Just as in HTTP/1.x, header field names are strings of ASCII * characters that are compared in a case-insensitive fashion. However, * header field names MUST be converted to lowercase prior to their * encoding in HTTP/2... */ ! private List<ByteBufferReference> encodeHeadersImpl(int maxFrameSize, HttpHeaders... headers) { ! ByteBufferReference buffer = getHeaderBuffer(maxFrameSize); ! List<ByteBufferReference> buffers = new ArrayList<>(); for(HttpHeaders header : headers) { for (Map.Entry<String, List<String>> e : header.map().entrySet()) { String lKey = e.getKey().toLowerCase(); List<String> values = e.getValue(); for (String value : values) { hpackOut.header(lKey, value); ! while (!hpackOut.encode(buffer.get())) { ! buffer.get().flip(); buffers.add(buffer); buffer = getHeaderBuffer(maxFrameSize); } } } } ! buffer.get().flip(); buffers.add(buffer); return buffers; } ! private ByteBufferReference[] encodeHeaders(OutgoingHeaders<Stream<?>> oh, Stream<?> stream) { oh.streamid(stream.streamid); if (Log.headers()) { StringBuilder sb = new StringBuilder("HEADERS FRAME (stream="); sb.append(stream.streamid).append(")\n"); Log.dumpHeaders(sb, " ", oh.getAttachment().getRequestPseudoHeaders()); --- 869,901 ---- * ...Just as in HTTP/1.x, header field names are strings of ASCII * characters that are compared in a case-insensitive fashion. However, * header field names MUST be converted to lowercase prior to their * encoding in HTTP/2... */ ! private List<ByteBuffer> encodeHeadersImpl(int maxFrameSize, HttpHeaders... headers) { ! ByteBuffer buffer = getHeaderBuffer(maxFrameSize); ! List<ByteBuffer> buffers = new ArrayList<>(); for(HttpHeaders header : headers) { for (Map.Entry<String, List<String>> e : header.map().entrySet()) { String lKey = e.getKey().toLowerCase(); List<String> values = e.getValue(); for (String value : values) { hpackOut.header(lKey, value); ! while (!hpackOut.encode(buffer)) { ! buffer.flip(); buffers.add(buffer); buffer = getHeaderBuffer(maxFrameSize); } } } } ! buffer.flip(); buffers.add(buffer); return buffers; } ! private List<ByteBuffer> encodeHeaders(OutgoingHeaders<Stream<?>> oh, Stream<?> stream) { oh.streamid(stream.streamid); if (Log.headers()) { StringBuilder sb = new StringBuilder("HEADERS FRAME (stream="); sb.append(stream.streamid).append(")\n"); Log.dumpHeaders(sb, " ", oh.getAttachment().getRequestPseudoHeaders());
*** 781,810 **** } List<HeaderFrame> frames = encodeHeaders(oh); return encodeFrames(frames); } ! private ByteBufferReference[] encodeFrames(List<HeaderFrame> frames) { if (Log.frames()) { frames.forEach(f -> Log.logFrames(f, "OUT")); } return framesEncoder.encodeFrames(frames); } - static Throwable getExceptionFrom(CompletableFuture<?> cf) { - try { - cf.get(); - return null; - } catch (Throwable e) { - if (e.getCause() != null) { - return e.getCause(); - } else { - return e; - } - } - } - private Stream<?> registerNewStream(OutgoingHeaders<Stream<?>> oh) { Stream<?> stream = oh.getAttachment(); int streamid = nextstreamid; nextstreamid += 2; stream.registerStream(streamid); --- 905,921 ---- } List<HeaderFrame> frames = encodeHeaders(oh); return encodeFrames(frames); } ! private List<ByteBuffer> encodeFrames(List<HeaderFrame> frames) { if (Log.frames()) { frames.forEach(f -> Log.logFrames(f, "OUT")); } return framesEncoder.encodeFrames(frames); } private Stream<?> registerNewStream(OutgoingHeaders<Stream<?>> oh) { Stream<?> stream = oh.getAttachment(); int streamid = nextstreamid; nextstreamid += 2; stream.registerStream(streamid);
*** 816,854 **** private final Object sendlock = new Object(); void sendFrame(Http2Frame frame) { try { synchronized (sendlock) { if (frame instanceof OutgoingHeaders) { @SuppressWarnings("unchecked") OutgoingHeaders<Stream<?>> oh = (OutgoingHeaders<Stream<?>>) frame; Stream<?> stream = registerNewStream(oh); // provide protection from inserting unordered frames between Headers and Continuation ! connection.writeAsync(encodeHeaders(oh, stream)); } else { ! connection.writeAsync(encodeFrame(frame)); } } ! connection.flushAsync(); } catch (IOException e) { if (!closed) { Log.logError(e); shutdown(e); } } } ! private ByteBufferReference[] encodeFrame(Http2Frame frame) { Log.logFrames(frame, "OUT"); return framesEncoder.encodeFrame(frame); } void sendDataFrame(DataFrame frame) { try { ! connection.writeAsync(encodeFrame(frame)); ! connection.flushAsync(); } catch (IOException e) { if (!closed) { Log.logError(e); shutdown(e); } --- 927,967 ---- private final Object sendlock = new Object(); void sendFrame(Http2Frame frame) { try { + HttpPublisher publisher = publisher(); synchronized (sendlock) { if (frame instanceof OutgoingHeaders) { @SuppressWarnings("unchecked") OutgoingHeaders<Stream<?>> oh = (OutgoingHeaders<Stream<?>>) frame; Stream<?> stream = registerNewStream(oh); // provide protection from inserting unordered frames between Headers and Continuation ! publisher.enqueue(encodeHeaders(oh, stream)); } else { ! publisher.enqueue(encodeFrame(frame)); } } ! publisher.signalEnqueued(); } catch (IOException e) { if (!closed) { Log.logError(e); shutdown(e); } } } ! private List<ByteBuffer> encodeFrame(Http2Frame frame) { Log.logFrames(frame, "OUT"); return framesEncoder.encodeFrame(frame); } void sendDataFrame(DataFrame frame) { try { ! HttpPublisher publisher = publisher(); ! publisher.enqueue(encodeFrame(frame)); ! publisher.signalEnqueued(); } catch (IOException e) { if (!closed) { Log.logError(e); shutdown(e); }
*** 860,879 **** * allowed only of control frames: WindowUpdateFrame, PingFrame and etc. * prohibited for such frames as DataFrame, HeadersFrame, ContinuationFrame. */ void sendUnorderedFrame(Http2Frame frame) { try { ! connection.writeAsyncUnordered(encodeFrame(frame)); ! connection.flushAsync(); } catch (IOException e) { if (!closed) { Log.logError(e); shutdown(e); } } } static class HeaderDecoder implements DecodingCallback { HttpHeadersImpl headers; HeaderDecoder() { this.headers = new HttpHeadersImpl(); --- 973,1187 ---- * allowed only of control frames: WindowUpdateFrame, PingFrame and etc. * prohibited for such frames as DataFrame, HeadersFrame, ContinuationFrame. */ void sendUnorderedFrame(Http2Frame frame) { try { ! HttpPublisher publisher = publisher(); ! publisher.enqueueUnordered(encodeFrame(frame)); ! publisher.signalEnqueued(); } catch (IOException e) { if (!closed) { Log.logError(e); shutdown(e); } } } + /** + * A simple tube subscriber for reading from the connection flow. + */ + final class Http2TubeSubscriber implements TubeSubscriber { + volatile Flow.Subscription subscription; + volatile boolean completed; + volatile boolean dropped; + volatile Throwable error; + final ConcurrentLinkedQueue<ByteBuffer> queue + = new ConcurrentLinkedQueue<>(); + final SequentialScheduler scheduler = + SequentialScheduler.synchronizedScheduler(this::processQueue); + + final void processQueue() { + try { + while (!queue.isEmpty() && !scheduler.isStopped()) { + ByteBuffer buffer = queue.poll(); + debug.log(Level.DEBUG, + "sending %d to Http2Connection.asyncReceive", + buffer.remaining()); + asyncReceive(buffer); + } + } catch (Throwable t) { + Throwable x = error; + if (x == null) error = t; + } finally { + Throwable x = error; + if (x != null) { + debug.log(Level.DEBUG, "Stopping scheduler", x); + scheduler.stop(); + Http2Connection.this.shutdown(x); + } + } + } + + + public void onSubscribe(Flow.Subscription subscription) { + // supports being called multiple time. + // doesn't cancel the previous subscription, since that is + // most probably the same as the new subscription. + assert this.subscription == null || dropped == false; + this.subscription = subscription; + dropped = false; + // TODO FIXME: request(1) should be done by the delegate. + if (!completed) { + debug.log(Level.DEBUG, "onSubscribe: requesting Long.MAX_VALUE for reading"); + subscription.request(Long.MAX_VALUE); + } else { + debug.log(Level.DEBUG, "onSubscribe: already completed"); + } + } + + @Override + public void onNext(List<ByteBuffer> item) { + debug.log(Level.DEBUG, () -> "onNext: got " + Utils.remaining(item) + + " bytes in " + item.size() + " buffers"); + queue.addAll(item); + scheduler.deferOrSchedule(client().theExecutor()); + } + + @Override + public void onError(Throwable throwable) { + debug.log(Level.DEBUG, () -> "onError: " + throwable); + error = throwable; + completed = true; + scheduler.deferOrSchedule(client().theExecutor()); + } + + @Override + public void onComplete() { + debug.log(Level.DEBUG, "EOF"); + error = new EOFException("EOF reached while reading"); + completed = true; + scheduler.deferOrSchedule(client().theExecutor()); + } + + public void dropSubscription() { + debug.log(Level.DEBUG, "dropSubscription"); + // we could probably set subscription to null here... + // then we might not need the 'dropped' boolean? + dropped = true; + } + } + + @Override + public final String toString() { + return dbgString(); + } + + final String dbgString() { + return "Http2Connection(" + + connection.getConnectionFlow() + ")"; + } + + final class LoggingHeaderDecoder extends HeaderDecoder { + + private final HeaderDecoder delegate; + private final System.Logger debugHpack = + Utils.getHpackLogger(this::dbgString, DEBUG_HPACK); + + LoggingHeaderDecoder(HeaderDecoder delegate) { + this.delegate = delegate; + } + + String dbgString() { + return Http2Connection.this.dbgString() + "/LoggingHeaderDecoder"; + } + + @Override + public void onDecoded(CharSequence name, CharSequence value) { + delegate.onDecoded(name, value); + } + + @Override + public void onIndexed(int index, + CharSequence name, + CharSequence value) { + debugHpack.log(Level.DEBUG, "onIndexed(%s, %s, %s)%n", + index, name, value); + delegate.onIndexed(index, name, value); + } + + @Override + public void onLiteral(int index, + CharSequence name, + CharSequence value, + boolean valueHuffman) { + debugHpack.log(Level.DEBUG, "onLiteral(%s, %s, %s, %s)%n", + index, name, value, valueHuffman); + delegate.onLiteral(index, name, value, valueHuffman); + } + + @Override + public void onLiteral(CharSequence name, + boolean nameHuffman, + CharSequence value, + boolean valueHuffman) { + debugHpack.log(Level.DEBUG, "onLiteral(%s, %s, %s, %s)%n", + name, nameHuffman, value, valueHuffman); + delegate.onLiteral(name, nameHuffman, value, valueHuffman); + } + + @Override + public void onLiteralNeverIndexed(int index, + CharSequence name, + CharSequence value, + boolean valueHuffman) { + debugHpack.log(Level.DEBUG, "onLiteralNeverIndexed(%s, %s, %s, %s)%n", + index, name, value, valueHuffman); + delegate.onLiteralNeverIndexed(index, name, value, valueHuffman); + } + + @Override + public void onLiteralNeverIndexed(CharSequence name, + boolean nameHuffman, + CharSequence value, + boolean valueHuffman) { + debugHpack.log(Level.DEBUG, "onLiteralNeverIndexed(%s, %s, %s, %s)%n", + name, nameHuffman, value, valueHuffman); + delegate.onLiteralNeverIndexed(name, nameHuffman, value, valueHuffman); + } + + @Override + public void onLiteralWithIndexing(int index, + CharSequence name, + CharSequence value, + boolean valueHuffman) { + debugHpack.log(Level.DEBUG, "onLiteralWithIndexing(%s, %s, %s, %s)%n", + index, name, value, valueHuffman); + delegate.onLiteralWithIndexing(index, name, value, valueHuffman); + } + + @Override + public void onLiteralWithIndexing(CharSequence name, + boolean nameHuffman, + CharSequence value, + boolean valueHuffman) { + debugHpack.log(Level.DEBUG, "onLiteralWithIndexing(%s, %s, %s, %s)%n", + name, nameHuffman, value, valueHuffman); + delegate.onLiteralWithIndexing(name, nameHuffman, value, valueHuffman); + } + + @Override + public void onSizeUpdate(int capacity) { + debugHpack.log(Level.DEBUG, "onSizeUpdate(%s)%n", capacity); + delegate.onSizeUpdate(capacity); + } + + @Override + HttpHeadersImpl headers() { + return delegate.headers(); + } + } + static class HeaderDecoder implements DecodingCallback { HttpHeadersImpl headers; HeaderDecoder() { this.headers = new HttpHeadersImpl();
< prev index next >