--- old/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java 2017-11-30 04:03:56.616845797 -0800 +++ new/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java 2017-11-30 04:03:56.415828224 -0800 @@ -25,27 +25,50 @@ package jdk.incubator.http; +import java.io.EOFException; import java.io.IOException; +import java.lang.System.Logger.Level; import java.net.InetSocketAddress; import java.net.URI; -import jdk.incubator.http.HttpConnection.Mode; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; -import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.ArrayList; -import java.util.Collections; -import java.util.Formatter; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; -import java.util.stream.Collectors; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Flow; +import java.util.function.Function; +import java.util.function.Supplier; import javax.net.ssl.SSLEngine; -import jdk.incubator.http.internal.common.*; -import jdk.incubator.http.internal.frame.*; +import jdk.incubator.http.HttpConnection.HttpPublisher; +import jdk.incubator.http.internal.common.FlowTube; +import jdk.incubator.http.internal.common.FlowTube.TubeSubscriber; +import jdk.incubator.http.internal.common.HttpHeadersImpl; +import jdk.incubator.http.internal.common.Log; +import jdk.incubator.http.internal.common.MinimalFuture; +import jdk.incubator.http.internal.common.SequentialScheduler; +import jdk.incubator.http.internal.common.Utils; +import jdk.incubator.http.internal.frame.ContinuationFrame; +import jdk.incubator.http.internal.frame.DataFrame; +import jdk.incubator.http.internal.frame.ErrorFrame; +import jdk.incubator.http.internal.frame.FramesDecoder; +import jdk.incubator.http.internal.frame.FramesEncoder; +import jdk.incubator.http.internal.frame.GoAwayFrame; +import jdk.incubator.http.internal.frame.HeaderFrame; +import jdk.incubator.http.internal.frame.HeadersFrame; +import jdk.incubator.http.internal.frame.Http2Frame; +import jdk.incubator.http.internal.frame.MalformedFrame; +import jdk.incubator.http.internal.frame.OutgoingHeaders; +import jdk.incubator.http.internal.frame.PingFrame; +import jdk.incubator.http.internal.frame.PushPromiseFrame; +import jdk.incubator.http.internal.frame.ResetFrame; +import jdk.incubator.http.internal.frame.SettingsFrame; +import jdk.incubator.http.internal.frame.WindowUpdateFrame; import jdk.incubator.http.internal.hpack.Encoder; import jdk.incubator.http.internal.hpack.Decoder; import jdk.incubator.http.internal.hpack.DecodingCallback; @@ -83,11 +106,21 @@ * stream are provided by calling Stream.incoming(). */ class Http2Connection { + + static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag. + static final boolean DEBUG_HPACK = Utils.DEBUG_HPACK; // Revisit: temporary dev flag. + final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG); + final static System.Logger DEBUG_LOGGER = + Utils.getDebugLogger("Http2Connection"::toString, DEBUG); + private final System.Logger debugHpack = + Utils.getHpackLogger(this::dbgString, DEBUG_HPACK); + static final ByteBuffer EMPTY_TRIGGER = ByteBuffer.allocate(0); + /* * ByteBuffer pooling strategy for HTTP/2 protocol: * * In general there are 4 points where ByteBuffers are used: - * - incoming/outgoing frames from/to ByteBufers plus incoming/outgoing encrypted data + * - incoming/outgoing frames from/to ByteBuffers plus incoming/outgoing encrypted data * in case of SSL connection. * * 1. Outgoing frames encoded to ByteBuffers. @@ -116,40 +149,49 @@ // preface is sent will be buffered. private final class FramesController { volatile boolean prefaceSent; - volatile List pending; + volatile List pending; - boolean processReceivedData(FramesDecoder decoder, ByteBufferReference buf) + boolean processReceivedData(FramesDecoder decoder, ByteBuffer buf) throws IOException { // if preface is not sent, buffers data in the pending list if (!prefaceSent) { + debug.log(Level.DEBUG, "Preface is not sent: buffering %d", + buf.remaining()); synchronized (this) { if (!prefaceSent) { if (pending == null) pending = new ArrayList<>(); pending.add(buf); + debug.log(Level.DEBUG, () -> "there are now " + + Utils.remaining(pending) + + " bytes buffered waiting for preface to be sent"); return false; } } } // Preface is sent. Checks for pending data and flush it. - // We rely on this method being called from within the readlock, - // so we know that no other thread could execute this method + // We rely on this method being called from within the Http2TubeSubscriber + // scheduler, so we know that no other thread could execute this method // concurrently while we're here. // This ensures that later incoming buffers will not // be processed before we have flushed the pending queue. // No additional synchronization is therefore necessary here. - List pending = this.pending; + List pending = this.pending; this.pending = null; if (pending != null) { // flush pending data - for (ByteBufferReference b : pending) { + debug.log(Level.DEBUG, () -> "Processing buffered data: " + + Utils.remaining(pending)); + for (ByteBuffer b : pending) { decoder.decode(b); } } - // push the received buffer to the frames decoder. - decoder.decode(buf); + if (buf != EMPTY_TRIGGER) { + debug.log(Level.DEBUG, "Processing %d", buf.remaining()); + decoder.decode(buf); + } return true; } @@ -167,7 +209,6 @@ //------------------------------------- final HttpConnection connection; - private final HttpClientImpl client; private final Http2ClientImpl client2; private final Map> streams = new ConcurrentHashMap<>(); private int nextstreamid; @@ -186,7 +227,10 @@ */ private final WindowController windowController = new WindowController(); private final FramesController framesController = new FramesController(); + private final Http2TubeSubscriber subscriber = new Http2TubeSubscriber(); final WindowUpdateSender windowUpdater; + private volatile Throwable cause; + private volatile Supplier initial; static final int DEFAULT_FRAME_SIZE = 16 * 1024; @@ -199,7 +243,6 @@ int nextstreamid, String key) { this.connection = connection; - this.client = client2.client(); this.client2 = client2; this.nextstreamid = nextstreamid; this.key = key; @@ -209,102 +252,147 @@ this.serverSettings = SettingsFrame.getDefaultSettings(); this.hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE)); this.hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE)); - this.windowUpdater = new ConnectionWindowUpdateSender(this, client.getReceiveBufferSize()); + debugHpack.log(Level.DEBUG, () -> "For the record:" + super.toString()); + debugHpack.log(Level.DEBUG, "Decoder created: %s", hpackIn); + debugHpack.log(Level.DEBUG, "Encoder created: %s", hpackOut); + this.windowUpdater = new ConnectionWindowUpdateSender(this, client().getReceiveBufferSize()); } /** * Case 1) Create from upgraded HTTP/1.1 connection. - * Is ready to use. Will not be SSL. exchange is the Exchange + * Is ready to use. Can be SSL. exchange is the Exchange * that initiated the connection, whose response will be delivered * on a Stream. */ - Http2Connection(HttpConnection connection, + private Http2Connection(HttpConnection connection, Http2ClientImpl client2, Exchange exchange, - ByteBuffer initial) + Supplier initial) throws IOException, InterruptedException { this(connection, client2, 3, // stream 1 is registered during the upgrade keyFor(connection)); - assert !(connection instanceof SSLConnection); Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize()); Stream initialStream = createStream(exchange); initialStream.registerStream(1); windowController.registerStream(1, getInitialSendWindowSize()); initialStream.requestSent(); + // Upgrading: + // set callbacks before sending preface - makes sure anything that + // might be sent by the server will come our way. + this.initial = initial; + connectFlows(connection); sendConnectionPreface(); - // start reading and writing - // start reading - AsyncConnection asyncConn = (AsyncConnection)connection; - asyncConn.setAsyncCallbacks(this::asyncReceive, this::shutdown, this::getReadBuffer); - connection.configureMode(Mode.ASYNC); // set mode only AFTER setAsyncCallbacks to provide visibility. - asyncReceive(ByteBufferReference.of(initial)); - asyncConn.startReading(); } - // async style but completes immediately + // Used when upgrading an HTTP/1.1 connection to HTTP/2 after receiving + // agreement from the server. Async style but completes immediately, because + // the connection is already connected. static CompletableFuture createAsync(HttpConnection connection, Http2ClientImpl client2, Exchange exchange, - ByteBuffer initial) { + Supplier initial) + { return MinimalFuture.supply(() -> new Http2Connection(connection, client2, exchange, initial)); } + // Requires TLS handshake. So, is really async + static CompletableFuture createAsync(HttpRequestImpl request, + Http2ClientImpl h2client) { + assert request.secure(); + AbstractAsyncSSLConnection connection = (AbstractAsyncSSLConnection) + HttpConnection.getConnection(request.getAddress(), + h2client.client(), + request, + HttpClient.Version.HTTP_2); + + return connection.connectAsync() + .thenCompose(unused -> checkSSLConfig(connection)) + .thenCompose(notused-> { + CompletableFuture cf = new MinimalFuture<>(); + try { + Http2Connection hc = new Http2Connection(request, h2client, connection); + cf.complete(hc); + } catch (IOException e) { + cf.completeExceptionally(e); + } + return cf; } ); + } + /** * Cases 2) 3) * * request is request to be sent. */ - Http2Connection(HttpRequestImpl request, Http2ClientImpl h2client) - throws IOException, InterruptedException + private Http2Connection(HttpRequestImpl request, + Http2ClientImpl h2client, + HttpConnection connection) + throws IOException { - this(HttpConnection.getConnection(request.getAddress(h2client.client()), h2client.client(), request, true), - h2client, - 1, - keyFor(request.uri(), request.proxy(h2client.client()))); + this(connection, + h2client, + 1, + keyFor(request.uri(), request.proxy())); + Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize()); - // start reading - AsyncConnection asyncConn = (AsyncConnection)connection; - asyncConn.setAsyncCallbacks(this::asyncReceive, this::shutdown, this::getReadBuffer); - connection.connect(); - checkSSLConfig(); // safe to resume async reading now. - asyncConn.enableCallback(); + connectFlows(connection); sendConnectionPreface(); } + private void connectFlows(HttpConnection connection) { + FlowTube tube = connection.getConnectionFlow(); + // Connect the flow to our Http2TubeSubscriber: + tube.connectFlows(connection.publisher(), subscriber); + } + + final HttpClientImpl client() { + return client2.client(); + } + /** * Throws an IOException if h2 was not negotiated */ - private void checkSSLConfig() throws IOException { - AbstractAsyncSSLConnection aconn = (AbstractAsyncSSLConnection)connection; - SSLEngine engine = aconn.getEngine(); - String alpn = engine.getApplicationProtocol(); - if (alpn == null || !alpn.equals("h2")) { - String msg; - if (alpn == null) { - Log.logSSL("ALPN not supported"); - msg = "ALPN not supported"; - } else switch (alpn) { - case "": - Log.logSSL("No ALPN returned"); - msg = "No ALPN negotiated"; - break; - case "http/1.1": - Log.logSSL("HTTP/1.1 ALPN returned"); - msg = "HTTP/1.1 ALPN returned"; - break; - default: - Log.logSSL("unknown ALPN returned"); - msg = "Unexpected ALPN: " + alpn; - throw new IOException(msg); + private static CompletableFuture checkSSLConfig(AbstractAsyncSSLConnection aconn) { + assert aconn.isSecure(); + + Function> checkAlpnCF = (alpn) -> { + CompletableFuture cf = new MinimalFuture<>(); + SSLEngine engine = aconn.getEngine(); + assert Objects.equals(alpn, engine.getApplicationProtocol()); + + DEBUG_LOGGER.log(Level.DEBUG, "checkSSLConfig: alpn: %s", alpn ); + + if (alpn == null || !alpn.equals("h2")) { + String msg; + if (alpn == null) { + Log.logSSL("ALPN not supported"); + msg = "ALPN not supported"; + } else { + switch (alpn) { + case "": + Log.logSSL(msg = "No ALPN negotiated"); + break; + case "http/1.1": + Log.logSSL( msg = "HTTP/1.1 ALPN returned"); + break; + default: + Log.logSSL(msg = "Unexpected ALPN: " + alpn); + cf.completeExceptionally(new IOException(msg)); + } + } + cf.completeExceptionally(new ALPNException(msg, aconn)); + return cf; } - throw new ALPNException(msg, aconn); - } + cf.complete(null); + return cf; + }; + + return aconn.getALPN().thenCompose(checkAlpnCF); } static String keyFor(HttpConnection connection) { @@ -322,7 +410,7 @@ String host; int port; - if (isProxy) { + if (proxy != null) { host = proxy.getHostString(); port = proxy.getPort(); } else { @@ -350,47 +438,26 @@ client2.putConnection(this); } - private static String toHexdump1(ByteBuffer bb) { - bb.mark(); - StringBuilder sb = new StringBuilder(512); - Formatter f = new Formatter(sb); - - while (bb.hasRemaining()) { - int i = Byte.toUnsignedInt(bb.get()); - f.format("%02x:", i); - } - sb.deleteCharAt(sb.length()-1); - bb.reset(); - return sb.toString(); - } - - private static String toHexdump(ByteBuffer bb) { - List words = new ArrayList<>(); - int i = 0; - bb.mark(); - while (bb.hasRemaining()) { - if (i % 2 == 0) { - words.add(""); - } - byte b = bb.get(); - String hex = Integer.toHexString(256 + Byte.toUnsignedInt(b)).substring(1); - words.set(i / 2, words.get(i / 2) + hex); - i++; - } - bb.reset(); - return words.stream().collect(Collectors.joining(" ")); + private HttpPublisher publisher() { + return connection.publisher(); } - private void decodeHeaders(HeaderFrame frame, DecodingCallback decoder) { + private void decodeHeaders(HeaderFrame frame, DecodingCallback decoder) + throws IOException + { + debugHpack.log(Level.DEBUG, "decodeHeaders(%s)", decoder); + boolean endOfHeaders = frame.getFlag(HeaderFrame.END_HEADERS); - ByteBufferReference[] buffers = frame.getHeaderBlock(); - for (int i = 0; i < buffers.length; i++) { - hpackIn.decode(buffers[i].get(), endOfHeaders && (i == buffers.length - 1), decoder); + List buffers = frame.getHeaderBlock(); + int len = buffers.size(); + for (int i = 0; i < len; i++) { + ByteBuffer b = buffers.get(i); + hpackIn.decode(b, endOfHeaders && (i == len - 1), decoder); } } - int getInitialSendWindowSize() { + final int getInitialSendWindowSize() { return serverSettings.getParameter(INITIAL_WINDOW_SIZE); } @@ -400,16 +467,8 @@ sendFrame(f); } - private ByteBufferPool readBufferPool = new ByteBufferPool(); - - // provides buffer to read data (default size) - public ByteBufferReference getReadBuffer() { - return readBufferPool.get(getMaxReceiveFrameSize() + Http2Frame.FRAME_HEADER_SIZE); - } - - private final Object readlock = new Object(); - - public void asyncReceive(ByteBufferReference buffer) { + long count; + final void asyncReceive(ByteBuffer buffer) { // We don't need to read anything and // we don't want to send anything back to the server // until the connection preface has been sent. @@ -419,23 +478,61 @@ // SettingsFrame sent by the server) before the connection // preface is fully sent might result in the server // sending a GOAWAY frame with 'invalid_preface'. - synchronized (readlock) { - try { - // the readlock ensures that the order of incoming buffers - // is preserved. + // + // Note: asyncReceive is only called from the Http2TubeSubscriber + // sequential scheduler. + try { + Supplier bs = initial; + // ensure that we always handle the initial buffer first, + // if any. + if (bs != null) { + initial = null; + ByteBuffer b = bs.get(); + if (b.hasRemaining()) { + long c = ++count; + debug.log(Level.DEBUG, () -> "H2 Receiving Initial(" + + c +"): " + b.remaining()); + framesController.processReceivedData(framesDecoder, b); + } + } + ByteBuffer b = buffer; + // the Http2TubeSubscriber scheduler ensures that the order of incoming + // buffers is preserved. + if (b == EMPTY_TRIGGER) { + debug.log(Level.DEBUG, "H2 Received EMPTY_TRIGGER"); + boolean prefaceSent = framesController.prefaceSent; + assert prefaceSent; + // call framesController.processReceivedData to potentially + // trigger the processing of all the data buffered there. framesController.processReceivedData(framesDecoder, buffer); - } catch (Throwable e) { - String msg = Utils.stackTrace(e); - Log.logTrace(msg); - shutdown(e); + debug.log(Level.DEBUG, "H2 processed buffered data"); + } else { + long c = ++count; + debug.log(Level.DEBUG, "H2 Receiving(%d): %d", c, b.remaining()); + framesController.processReceivedData(framesDecoder, buffer); + debug.log(Level.DEBUG, "H2 processed(%d)", c); } + } catch (Throwable e) { + String msg = Utils.stackTrace(e); + Log.logTrace(msg); + shutdown(e); } } + Throwable getRecordedCause() { + return cause; + } void shutdown(Throwable t) { + debug.log(Level.DEBUG, () -> "Shutting down h2c (closed="+closed+"): " + t); + if (closed == true) return; + synchronized (this) { + if (closed == true) return; + closed = true; + } Log.logError(t); - closed = true; + Throwable initialCause = this.cause; + if (initialCause == null) this.cause = t; client2.deleteConnection(this); List> c = new LinkedList<>(streams.values()); for (Stream s : c) { @@ -457,8 +554,11 @@ if (frame instanceof MalformedFrame) { Log.logError(((MalformedFrame) frame).getMessage()); if (streamid == 0) { - protocolError(((MalformedFrame) frame).getErrorCode()); + protocolError(((MalformedFrame) frame).getErrorCode(), + ((MalformedFrame) frame).getMessage()); } else { + debug.log(Level.DEBUG, () -> "Reset stream: " + + ((MalformedFrame) frame).getMessage()); resetStream(streamid, ((MalformedFrame) frame).getErrorCode()); } return; @@ -476,9 +576,16 @@ if (stream == null) { // Should never receive a frame with unknown stream id - // To avoid looping, an endpoint MUST NOT send a RST_STREAM in - // response to a RST_STREAM frame. - if (!(frame instanceof ResetFrame)) { + if (frame instanceof HeaderFrame) { + // always decode the headers as they may affect + // connection-level HPACK decoding state + HeaderDecoder decoder = new LoggingHeaderDecoder(new HeaderDecoder()); + decodeHeaders((HeaderFrame) frame, decoder); + } + + int sid = frame.streamid(); + if (sid >= nextstreamid && !(frame instanceof ResetFrame)) { + // otherwise the stream has already been reset/closed resetStream(streamid, ResetFrame.PROTOCOL_ERROR); } return; @@ -499,6 +606,11 @@ private void handlePushPromise(Stream parent, PushPromiseFrame pp) throws IOException { + // always decode the headers as they may affect connection-level HPACK + // decoding state + HeaderDecoder decoder = new LoggingHeaderDecoder(new HeaderDecoder()); + decodeHeaders(pp, decoder); + HttpRequestImpl parentReq = parent.request; int promisedStreamid = pp.getPromisedStream(); if (promisedStreamid != nextPushStream) { @@ -507,8 +619,7 @@ } else { nextPushStream += 2; } - HeaderDecoder decoder = new HeaderDecoder(); - decodeHeaders(pp, decoder); + HttpHeadersImpl headers = decoder.headers(); HttpRequestImpl pushReq = HttpRequestImpl.createPushRequest(parentReq, headers); Exchange pushExch = new Exchange<>(pushReq, parent.exchange.multi); @@ -549,7 +660,15 @@ } void closeStream(int streamid) { + debug.log(Level.DEBUG, "Closed stream %d", streamid); Stream s = streams.remove(streamid); + if (s != null) { + // decrement the reference count on the HttpClientImpl + // to allow the SelectorManager thread to exit if no + // other operation is pending and the facade is no + // longer referenced. + client().unreference(); + } // ## Remove s != null. It is a hack for delayed cancellation,reset if (s != null && !(s instanceof Stream.PushedStream)) { // Since PushStreams have no request body, then they have no @@ -579,9 +698,15 @@ private void protocolError(int errorCode) throws IOException { + protocolError(errorCode, null); + } + + private void protocolError(int errorCode, String msg) + throws IOException + { GoAwayFrame frame = new GoAwayFrame(0, errorCode); sendFrame(frame); - shutdown(new IOException("protocol error")); + shutdown(new IOException("protocol error" + (msg == null?"":(": " + msg)))); } private void handleSettings(SettingsFrame frame) @@ -633,11 +758,6 @@ return clientSettings.getParameter(MAX_FRAME_SIZE); } - // Not sure how useful this is. - public int getMaxHeadersSize() { - return serverSettings.getParameter(MAX_HEADER_LIST_SIZE); - } - private static final String CLIENT_PREFACE = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; private static final byte[] PREFACE_BYTES = @@ -652,10 +772,12 @@ connection.channel().getLocalAddress(), connection.address()); SettingsFrame sf = client2.getClientSettings(); - ByteBufferReference ref = framesEncoder.encodeConnectionPreface(PREFACE_BYTES, sf); + ByteBuffer buf = framesEncoder.encodeConnectionPreface(PREFACE_BYTES, sf); Log.logFrames(sf, "OUT"); // send preface bytes and SettingsFrame together - connection.write(ref.get()); + HttpPublisher publisher = publisher(); + publisher.enqueue(List.of(buf)); + publisher.signalEnqueued(); // mark preface sent. framesController.markPrefaceSent(); Log.logTrace("PREFACE_BYTES sent"); @@ -669,6 +791,9 @@ // cause any pending data stored before the preface was sent to be // flushed (see PrefaceController). Log.logTrace("finished sending connection preface"); + debug.log(Level.DEBUG, "Triggering processing of buffered data" + + " after sending connection preface"); + subscriber.onNext(List.of(EMPTY_TRIGGER)); } /** @@ -682,38 +807,37 @@ /** * Creates Stream with given id. */ - Stream createStream(Exchange exchange) { - Stream stream = new Stream<>(client, this, exchange, windowController); + final Stream createStream(Exchange exchange) { + Stream stream = new Stream<>(this, exchange, windowController); return stream; } Stream.PushedStream createPushStream(Stream parent, Exchange pushEx) { PushGroup pg = parent.exchange.getPushGroup(); - return new Stream.PushedStream<>(pg, client, this, parent, pushEx); + return new Stream.PushedStream<>(pg, this, pushEx); } void putStream(Stream stream, int streamid) { + // increment the reference count on the HttpClientImpl + // to prevent the SelectorManager thread from exiting until + // the stream is closed. + client().reference(); streams.put(streamid, stream); } - void deleteStream(int streamid) { - streams.remove(streamid); - windowController.removeStream(streamid); - } - /** * Encode the headers into a List and then create HEADERS * and CONTINUATION frames from the list and return the List. */ private List encodeHeaders(OutgoingHeaders> frame) { - List buffers = encodeHeadersImpl( + List buffers = encodeHeadersImpl( getMaxSendFrameSize(), frame.getAttachment().getRequestPseudoHeaders(), frame.getUserHeaders(), frame.getSystemHeaders()); List frames = new ArrayList<>(buffers.size()); - Iterator bufIterator = buffers.iterator(); + Iterator bufIterator = buffers.iterator(); HeaderFrame oframe = new HeadersFrame(frame.streamid(), frame.getFlags(), bufIterator.next()); frames.add(oframe); while(bufIterator.hasNext()) { @@ -728,12 +852,12 @@ // There can be no concurrent access to this buffer as all access to this buffer // and its content happen within a single critical code block section protected // by the sendLock. / (see sendFrame()) - private ByteBufferPool headerEncodingPool = new ByteBufferPool(); + // private final ByteBufferPool headerEncodingPool = new ByteBufferPool(); - private ByteBufferReference getHeaderBuffer(int maxFrameSize) { - ByteBufferReference ref = headerEncodingPool.get(maxFrameSize); - ref.get().limit(maxFrameSize); - return ref; + private ByteBuffer getHeaderBuffer(int maxFrameSize) { + ByteBuffer buf = ByteBuffer.allocate(maxFrameSize); + buf.limit(maxFrameSize); + return buf; } /* @@ -747,29 +871,29 @@ * header field names MUST be converted to lowercase prior to their * encoding in HTTP/2... */ - private List encodeHeadersImpl(int maxFrameSize, HttpHeaders... headers) { - ByteBufferReference buffer = getHeaderBuffer(maxFrameSize); - List buffers = new ArrayList<>(); + private List encodeHeadersImpl(int maxFrameSize, HttpHeaders... headers) { + ByteBuffer buffer = getHeaderBuffer(maxFrameSize); + List buffers = new ArrayList<>(); for(HttpHeaders header : headers) { for (Map.Entry> e : header.map().entrySet()) { String lKey = e.getKey().toLowerCase(); List values = e.getValue(); for (String value : values) { hpackOut.header(lKey, value); - while (!hpackOut.encode(buffer.get())) { - buffer.get().flip(); + while (!hpackOut.encode(buffer)) { + buffer.flip(); buffers.add(buffer); buffer = getHeaderBuffer(maxFrameSize); } } } } - buffer.get().flip(); + buffer.flip(); buffers.add(buffer); return buffers; } - private ByteBufferReference[] encodeHeaders(OutgoingHeaders> oh, Stream stream) { + private List encodeHeaders(OutgoingHeaders> oh, Stream stream) { oh.streamid(stream.streamid); if (Log.headers()) { StringBuilder sb = new StringBuilder("HEADERS FRAME (stream="); @@ -783,26 +907,13 @@ return encodeFrames(frames); } - private ByteBufferReference[] encodeFrames(List frames) { + private List encodeFrames(List frames) { if (Log.frames()) { frames.forEach(f -> Log.logFrames(f, "OUT")); } return framesEncoder.encodeFrames(frames); } - static Throwable getExceptionFrom(CompletableFuture cf) { - try { - cf.get(); - return null; - } catch (Throwable e) { - if (e.getCause() != null) { - return e.getCause(); - } else { - return e; - } - } - } - private Stream registerNewStream(OutgoingHeaders> oh) { Stream stream = oh.getAttachment(); int streamid = nextstreamid; @@ -818,18 +929,19 @@ void sendFrame(Http2Frame frame) { try { + HttpPublisher publisher = publisher(); synchronized (sendlock) { if (frame instanceof OutgoingHeaders) { @SuppressWarnings("unchecked") OutgoingHeaders> oh = (OutgoingHeaders>) frame; Stream stream = registerNewStream(oh); // provide protection from inserting unordered frames between Headers and Continuation - connection.writeAsync(encodeHeaders(oh, stream)); + publisher.enqueue(encodeHeaders(oh, stream)); } else { - connection.writeAsync(encodeFrame(frame)); + publisher.enqueue(encodeFrame(frame)); } } - connection.flushAsync(); + publisher.signalEnqueued(); } catch (IOException e) { if (!closed) { Log.logError(e); @@ -838,15 +950,16 @@ } } - private ByteBufferReference[] encodeFrame(Http2Frame frame) { + private List encodeFrame(Http2Frame frame) { Log.logFrames(frame, "OUT"); return framesEncoder.encodeFrame(frame); } void sendDataFrame(DataFrame frame) { try { - connection.writeAsync(encodeFrame(frame)); - connection.flushAsync(); + HttpPublisher publisher = publisher(); + publisher.enqueue(encodeFrame(frame)); + publisher.signalEnqueued(); } catch (IOException e) { if (!closed) { Log.logError(e); @@ -862,8 +975,9 @@ */ void sendUnorderedFrame(Http2Frame frame) { try { - connection.writeAsyncUnordered(encodeFrame(frame)); - connection.flushAsync(); + HttpPublisher publisher = publisher(); + publisher.enqueueUnordered(encodeFrame(frame)); + publisher.signalEnqueued(); } catch (IOException e) { if (!closed) { Log.logError(e); @@ -872,6 +986,200 @@ } } + /** + * A simple tube subscriber for reading from the connection flow. + */ + final class Http2TubeSubscriber implements TubeSubscriber { + volatile Flow.Subscription subscription; + volatile boolean completed; + volatile boolean dropped; + volatile Throwable error; + final ConcurrentLinkedQueue queue + = new ConcurrentLinkedQueue<>(); + final SequentialScheduler scheduler = + SequentialScheduler.synchronizedScheduler(this::processQueue); + + final void processQueue() { + try { + while (!queue.isEmpty() && !scheduler.isStopped()) { + ByteBuffer buffer = queue.poll(); + debug.log(Level.DEBUG, + "sending %d to Http2Connection.asyncReceive", + buffer.remaining()); + asyncReceive(buffer); + } + } catch (Throwable t) { + Throwable x = error; + if (x == null) error = t; + } finally { + Throwable x = error; + if (x != null) { + debug.log(Level.DEBUG, "Stopping scheduler", x); + scheduler.stop(); + Http2Connection.this.shutdown(x); + } + } + } + + + public void onSubscribe(Flow.Subscription subscription) { + // supports being called multiple time. + // doesn't cancel the previous subscription, since that is + // most probably the same as the new subscription. + assert this.subscription == null || dropped == false; + this.subscription = subscription; + dropped = false; + // TODO FIXME: request(1) should be done by the delegate. + if (!completed) { + debug.log(Level.DEBUG, "onSubscribe: requesting Long.MAX_VALUE for reading"); + subscription.request(Long.MAX_VALUE); + } else { + debug.log(Level.DEBUG, "onSubscribe: already completed"); + } + } + + @Override + public void onNext(List item) { + debug.log(Level.DEBUG, () -> "onNext: got " + Utils.remaining(item) + + " bytes in " + item.size() + " buffers"); + queue.addAll(item); + scheduler.deferOrSchedule(client().theExecutor()); + } + + @Override + public void onError(Throwable throwable) { + debug.log(Level.DEBUG, () -> "onError: " + throwable); + error = throwable; + completed = true; + scheduler.deferOrSchedule(client().theExecutor()); + } + + @Override + public void onComplete() { + debug.log(Level.DEBUG, "EOF"); + error = new EOFException("EOF reached while reading"); + completed = true; + scheduler.deferOrSchedule(client().theExecutor()); + } + + public void dropSubscription() { + debug.log(Level.DEBUG, "dropSubscription"); + // we could probably set subscription to null here... + // then we might not need the 'dropped' boolean? + dropped = true; + } + } + + @Override + public final String toString() { + return dbgString(); + } + + final String dbgString() { + return "Http2Connection(" + + connection.getConnectionFlow() + ")"; + } + + final class LoggingHeaderDecoder extends HeaderDecoder { + + private final HeaderDecoder delegate; + private final System.Logger debugHpack = + Utils.getHpackLogger(this::dbgString, DEBUG_HPACK); + + LoggingHeaderDecoder(HeaderDecoder delegate) { + this.delegate = delegate; + } + + String dbgString() { + return Http2Connection.this.dbgString() + "/LoggingHeaderDecoder"; + } + + @Override + public void onDecoded(CharSequence name, CharSequence value) { + delegate.onDecoded(name, value); + } + + @Override + public void onIndexed(int index, + CharSequence name, + CharSequence value) { + debugHpack.log(Level.DEBUG, "onIndexed(%s, %s, %s)%n", + index, name, value); + delegate.onIndexed(index, name, value); + } + + @Override + public void onLiteral(int index, + CharSequence name, + CharSequence value, + boolean valueHuffman) { + debugHpack.log(Level.DEBUG, "onLiteral(%s, %s, %s, %s)%n", + index, name, value, valueHuffman); + delegate.onLiteral(index, name, value, valueHuffman); + } + + @Override + public void onLiteral(CharSequence name, + boolean nameHuffman, + CharSequence value, + boolean valueHuffman) { + debugHpack.log(Level.DEBUG, "onLiteral(%s, %s, %s, %s)%n", + name, nameHuffman, value, valueHuffman); + delegate.onLiteral(name, nameHuffman, value, valueHuffman); + } + + @Override + public void onLiteralNeverIndexed(int index, + CharSequence name, + CharSequence value, + boolean valueHuffman) { + debugHpack.log(Level.DEBUG, "onLiteralNeverIndexed(%s, %s, %s, %s)%n", + index, name, value, valueHuffman); + delegate.onLiteralNeverIndexed(index, name, value, valueHuffman); + } + + @Override + public void onLiteralNeverIndexed(CharSequence name, + boolean nameHuffman, + CharSequence value, + boolean valueHuffman) { + debugHpack.log(Level.DEBUG, "onLiteralNeverIndexed(%s, %s, %s, %s)%n", + name, nameHuffman, value, valueHuffman); + delegate.onLiteralNeverIndexed(name, nameHuffman, value, valueHuffman); + } + + @Override + public void onLiteralWithIndexing(int index, + CharSequence name, + CharSequence value, + boolean valueHuffman) { + debugHpack.log(Level.DEBUG, "onLiteralWithIndexing(%s, %s, %s, %s)%n", + index, name, value, valueHuffman); + delegate.onLiteralWithIndexing(index, name, value, valueHuffman); + } + + @Override + public void onLiteralWithIndexing(CharSequence name, + boolean nameHuffman, + CharSequence value, + boolean valueHuffman) { + debugHpack.log(Level.DEBUG, "onLiteralWithIndexing(%s, %s, %s, %s)%n", + name, nameHuffman, value, valueHuffman); + delegate.onLiteralWithIndexing(name, nameHuffman, value, valueHuffman); + } + + @Override + public void onSizeUpdate(int capacity) { + debugHpack.log(Level.DEBUG, "onSizeUpdate(%s)%n", capacity); + delegate.onSizeUpdate(capacity); + } + + @Override + HttpHeadersImpl headers() { + return delegate.headers(); + } + } + static class HeaderDecoder implements DecodingCallback { HttpHeadersImpl headers;