1 /*
   2  * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.incubator.http;
  27 
  28 import java.io.EOFException;
  29 import java.io.IOException;
  30 import java.lang.System.Logger.Level;
  31 import java.net.InetSocketAddress;
  32 import java.net.URI;
  33 import java.nio.ByteBuffer;
  34 import java.nio.charset.StandardCharsets;
  35 import java.util.Iterator;
  36 import java.util.LinkedList;
  37 import java.util.List;
  38 import java.util.Map;
  39 import java.util.concurrent.CompletableFuture;
  40 import java.util.ArrayList;
  41 import java.util.Objects;
  42 import java.util.concurrent.ConcurrentHashMap;
  43 import java.util.concurrent.ConcurrentLinkedQueue;
  44 import java.util.concurrent.Flow;
  45 import java.util.function.Function;
  46 import java.util.function.Supplier;
  47 import javax.net.ssl.SSLEngine;
  48 import jdk.incubator.http.HttpConnection.HttpPublisher;
  49 import jdk.incubator.http.internal.common.FlowTube;
  50 import jdk.incubator.http.internal.common.FlowTube.TubeSubscriber;
  51 import jdk.incubator.http.internal.common.HttpHeadersImpl;
  52 import jdk.incubator.http.internal.common.Log;
  53 import jdk.incubator.http.internal.common.MinimalFuture;
  54 import jdk.incubator.http.internal.common.SequentialScheduler;
  55 import jdk.incubator.http.internal.common.Utils;
  56 import jdk.incubator.http.internal.frame.ContinuationFrame;
  57 import jdk.incubator.http.internal.frame.DataFrame;
  58 import jdk.incubator.http.internal.frame.ErrorFrame;
  59 import jdk.incubator.http.internal.frame.FramesDecoder;
  60 import jdk.incubator.http.internal.frame.FramesEncoder;
  61 import jdk.incubator.http.internal.frame.GoAwayFrame;
  62 import jdk.incubator.http.internal.frame.HeaderFrame;
  63 import jdk.incubator.http.internal.frame.HeadersFrame;
  64 import jdk.incubator.http.internal.frame.Http2Frame;
  65 import jdk.incubator.http.internal.frame.MalformedFrame;
  66 import jdk.incubator.http.internal.frame.OutgoingHeaders;
  67 import jdk.incubator.http.internal.frame.PingFrame;
  68 import jdk.incubator.http.internal.frame.PushPromiseFrame;
  69 import jdk.incubator.http.internal.frame.ResetFrame;
  70 import jdk.incubator.http.internal.frame.SettingsFrame;
  71 import jdk.incubator.http.internal.frame.WindowUpdateFrame;
  72 import jdk.incubator.http.internal.hpack.Encoder;
  73 import jdk.incubator.http.internal.hpack.Decoder;
  74 import jdk.incubator.http.internal.hpack.DecodingCallback;
  75 
  76 import static jdk.incubator.http.internal.frame.SettingsFrame.*;
  77 
  78 
  79 /**
  80  * An Http2Connection. Encapsulates the socket(channel) and any SSLEngine used
  81  * over it. Contains an HttpConnection which hides the SocketChannel SSL stuff.
  82  *
  83  * Http2Connections belong to a Http2ClientImpl, (one of) which belongs
  84  * to a HttpClientImpl.
  85  *
  86  * Creation cases:
  87  * 1) upgraded HTTP/1.1 plain tcp connection
  88  * 2) prior knowledge directly created plain tcp connection
  89  * 3) directly created HTTP/2 SSL connection which uses ALPN.
  90  *
  91  * Sending is done by writing directly to underlying HttpConnection object which
  92  * is operating in async mode. No flow control applies on output at this level
  93  * and all writes are just executed as puts to an output Q belonging to HttpConnection
  94  * Flow control is implemented by HTTP/2 protocol itself.
  95  *
  96  * Hpack header compression
  97  * and outgoing stream creation is also done here, because these operations
  98  * must be synchronized at the socket level. Stream objects send frames simply
  99  * by placing them on the connection's output Queue. sendFrame() is called
 100  * from a higher level (Stream) thread.
 101  *
 102  * asyncReceive(ByteBuffer) is always called from the selector thread. It assembles
 103  * incoming Http2Frames, and directs them to the appropriate Stream.incoming()
 104  * or handles them directly itself. This thread performs hpack decompression
 105  * and incoming stream creation (Server push). Incoming frames destined for a
 106  * stream are provided by calling Stream.incoming().
 107  */
 108 class Http2Connection  {
 109 
 110     static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
 111     static final boolean DEBUG_HPACK = Utils.DEBUG_HPACK; // Revisit: temporary dev flag.
 112     final System.Logger  debug = Utils.getDebugLogger(this::dbgString, DEBUG);
 113     final static System.Logger  DEBUG_LOGGER =
 114             Utils.getDebugLogger("Http2Connection"::toString, DEBUG);
 115     private final System.Logger debugHpack =
 116                   Utils.getHpackLogger(this::dbgString, DEBUG_HPACK);
 117     static final ByteBuffer EMPTY_TRIGGER = ByteBuffer.allocate(0);
 118 
 119     private boolean singleStream; // used only for stream 1, then closed
 120 
 121     /*
 122      *  ByteBuffer pooling strategy for HTTP/2 protocol:
 123      *
 124      * In general there are 4 points where ByteBuffers are used:
 125      *  - incoming/outgoing frames from/to ByteBuffers plus incoming/outgoing encrypted data
 126      *    in case of SSL connection.
 127      *
 128      * 1. Outgoing frames encoded to ByteBuffers.
 129      *    Outgoing ByteBuffers are created with requited size and frequently small (except DataFrames, etc)
 130      *    At this place no pools at all. All outgoing buffers should be collected by GC.
 131      *
 132      * 2. Incoming ByteBuffers (decoded to frames).
 133      *    Here, total elimination of BB pool is not a good idea.
 134      *    We don't know how many bytes we will receive through network.
 135      * So here we allocate buffer of reasonable size. The following life of the BB:
 136      * - If all frames decoded from the BB are other than DataFrame and HeaderFrame (and HeaderFrame subclasses)
 137      *     BB is returned to pool,
 138      * - If we decoded DataFrame from the BB. In that case DataFrame refers to subbuffer obtained by slice() method.
 139      *     Such BB is never returned to pool and will be GCed.
 140      * - If we decoded HeadersFrame from the BB. Then header decoding is performed inside processFrame method and
 141      *     the buffer could be release to pool.
 142      *
 143      * 3. SLL encrypted buffers. Here another pool was introduced and all net buffers are to/from the pool,
 144      *    because of we can't predict size encrypted packets.
 145      *
 146      */
 147 
 148 
 149     // A small class that allows to control frames with respect to the state of
 150     // the connection preface. Any data received before the connection
 151     // preface is sent will be buffered.
 152     private final class FramesController {
 153         volatile boolean prefaceSent;
 154         volatile List<ByteBuffer> pending;
 155 
 156         boolean processReceivedData(FramesDecoder decoder, ByteBuffer buf)
 157                 throws IOException
 158         {
 159             // if preface is not sent, buffers data in the pending list
 160             if (!prefaceSent) {
 161                 debug.log(Level.DEBUG, "Preface is not sent: buffering %d",
 162                           buf.remaining());
 163                 synchronized (this) {
 164                     if (!prefaceSent) {
 165                         if (pending == null) pending = new ArrayList<>();
 166                         pending.add(buf);
 167                         debug.log(Level.DEBUG, () -> "there are now "
 168                               + Utils.remaining(pending)
 169                               + " bytes buffered waiting for preface to be sent");
 170                         return false;
 171                     }
 172                 }
 173             }
 174 
 175             // Preface is sent. Checks for pending data and flush it.
 176             // We rely on this method being called from within the Http2TubeSubscriber
 177             // scheduler, so we know that no other thread could execute this method
 178             // concurrently while we're here.
 179             // This ensures that later incoming buffers will not
 180             // be processed before we have flushed the pending queue.
 181             // No additional synchronization is therefore necessary here.
 182             List<ByteBuffer> pending = this.pending;
 183             this.pending = null;
 184             if (pending != null) {
 185                 // flush pending data
 186                 debug.log(Level.DEBUG, () -> "Processing buffered data: "
 187                       + Utils.remaining(pending));
 188                 for (ByteBuffer b : pending) {
 189                     decoder.decode(b);
 190                 }
 191             }
 192             // push the received buffer to the frames decoder.
 193             if (buf != EMPTY_TRIGGER) {
 194                 debug.log(Level.DEBUG, "Processing %d", buf.remaining());
 195                 decoder.decode(buf);
 196             }
 197             return true;
 198         }
 199 
 200         // Mark that the connection preface is sent
 201         void markPrefaceSent() {
 202             assert !prefaceSent;
 203             synchronized (this) {
 204                 prefaceSent = true;
 205             }
 206         }
 207     }
 208 
 209     volatile boolean closed;
 210 
 211     //-------------------------------------
 212     final HttpConnection connection;
 213     private final Http2ClientImpl client2;
 214     private final Map<Integer,Stream<?>> streams = new ConcurrentHashMap<>();
 215     private int nextstreamid;
 216     private int nextPushStream = 2;
 217     private final Encoder hpackOut;
 218     private final Decoder hpackIn;
 219     final SettingsFrame clientSettings;
 220     private volatile SettingsFrame serverSettings;
 221     private final String key; // for HttpClientImpl.connections map
 222     private final FramesDecoder framesDecoder;
 223     private final FramesEncoder framesEncoder = new FramesEncoder();
 224 
 225     /**
 226      * Send Window controller for both connection and stream windows.
 227      * Each of this connection's Streams MUST use this controller.
 228      */
 229     private final WindowController windowController = new WindowController();
 230     private final FramesController framesController = new FramesController();
 231     private final Http2TubeSubscriber subscriber = new Http2TubeSubscriber();
 232     final ConnectionWindowUpdateSender windowUpdater;
 233     private volatile Throwable cause;
 234     private volatile Supplier<ByteBuffer> initial;
 235 
 236     static final int DEFAULT_FRAME_SIZE = 16 * 1024;
 237 
 238 
 239     // TODO: need list of control frames from other threads
 240     // that need to be sent
 241 
 242     private Http2Connection(HttpConnection connection,
 243                             Http2ClientImpl client2,
 244                             int nextstreamid,
 245                             String key) {
 246         this.connection = connection;
 247         this.client2 = client2;
 248         this.nextstreamid = nextstreamid;
 249         this.key = key;
 250         this.clientSettings = this.client2.getClientSettings();
 251         this.framesDecoder = new FramesDecoder(this::processFrame,
 252                 clientSettings.getParameter(SettingsFrame.MAX_FRAME_SIZE));
 253         // serverSettings will be updated by server
 254         this.serverSettings = SettingsFrame.getDefaultSettings();
 255         this.hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE));
 256         this.hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE));
 257         debugHpack.log(Level.DEBUG, () -> "For the record:" + super.toString());
 258         debugHpack.log(Level.DEBUG, "Decoder created: %s", hpackIn);
 259         debugHpack.log(Level.DEBUG, "Encoder created: %s", hpackOut);
 260         this.windowUpdater = new ConnectionWindowUpdateSender(this,
 261                 client2.getConnectionWindowSize(clientSettings));
 262     }
 263 
 264     /**
 265      * Case 1) Create from upgraded HTTP/1.1 connection.
 266      * Is ready to use. Can be SSL. exchange is the Exchange
 267      * that initiated the connection, whose response will be delivered
 268      * on a Stream.
 269      */
 270     private Http2Connection(HttpConnection connection,
 271                     Http2ClientImpl client2,
 272                     Exchange<?> exchange,
 273                     Supplier<ByteBuffer> initial)
 274         throws IOException, InterruptedException
 275     {
 276         this(connection,
 277                 client2,
 278                 3, // stream 1 is registered during the upgrade
 279                 keyFor(connection));
 280         Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize());
 281 
 282         Stream<?> initialStream = createStream(exchange);
 283         initialStream.registerStream(1);
 284         windowController.registerStream(1, getInitialSendWindowSize());
 285         initialStream.requestSent();
 286         // Upgrading:
 287         //    set callbacks before sending preface - makes sure anything that
 288         //    might be sent by the server will come our way.
 289         this.initial = initial;
 290         connectFlows(connection);
 291         sendConnectionPreface();
 292     }
 293 
 294     // Used when upgrading an HTTP/1.1 connection to HTTP/2 after receiving
 295     // agreement from the server. Async style but completes immediately, because
 296     // the connection is already connected.
 297     static CompletableFuture<Http2Connection> createAsync(HttpConnection connection,
 298                                                           Http2ClientImpl client2,
 299                                                           Exchange<?> exchange,
 300                                                           Supplier<ByteBuffer> initial)
 301     {
 302         return MinimalFuture.supply(() -> new Http2Connection(connection, client2, exchange, initial));
 303     }
 304 
 305     // Requires TLS handshake. So, is really async
 306     static CompletableFuture<Http2Connection> createAsync(HttpRequestImpl request,
 307                                                           Http2ClientImpl h2client) {
 308         assert request.secure();
 309         AbstractAsyncSSLConnection connection = (AbstractAsyncSSLConnection)
 310         HttpConnection.getConnection(request.getAddress(),
 311                                      h2client.client(),
 312                                      request,
 313                                      HttpClient.Version.HTTP_2);
 314 
 315         return connection.connectAsync()
 316                   .thenCompose(unused -> checkSSLConfig(connection))
 317                   .thenCompose(notused-> {
 318                       CompletableFuture<Http2Connection> cf = new MinimalFuture<>();
 319                       try {
 320                           Http2Connection hc = new Http2Connection(request, h2client, connection);
 321                           cf.complete(hc);
 322                       } catch (IOException e) {
 323                           cf.completeExceptionally(e);
 324                       }
 325                       return cf; } );
 326     }
 327 
 328     /**
 329      * Cases 2) 3)
 330      *
 331      * request is request to be sent.
 332      */
 333     private Http2Connection(HttpRequestImpl request,
 334                             Http2ClientImpl h2client,
 335                             HttpConnection connection)
 336         throws IOException
 337     {
 338         this(connection,
 339              h2client,
 340              1,
 341              keyFor(request.uri(), request.proxy()));
 342 
 343         Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize());
 344 
 345         // safe to resume async reading now.
 346         connectFlows(connection);
 347         sendConnectionPreface();
 348     }
 349 
 350     private void connectFlows(HttpConnection connection) {
 351         FlowTube tube =  connection.getConnectionFlow();
 352         // Connect the flow to our Http2TubeSubscriber:
 353         tube.connectFlows(connection.publisher(), subscriber);
 354     }
 355 
 356     final HttpClientImpl client() {
 357         return client2.client();
 358     }
 359 
 360     /**
 361      * Throws an IOException if h2 was not negotiated
 362      */
 363     private static CompletableFuture<?> checkSSLConfig(AbstractAsyncSSLConnection aconn) {
 364         assert aconn.isSecure();
 365 
 366         Function<String, CompletableFuture<Void>> checkAlpnCF = (alpn) -> {
 367             CompletableFuture<Void> cf = new MinimalFuture<>();
 368             SSLEngine engine = aconn.getEngine();
 369             assert Objects.equals(alpn, engine.getApplicationProtocol());
 370 
 371             DEBUG_LOGGER.log(Level.DEBUG, "checkSSLConfig: alpn: %s", alpn );
 372 
 373             if (alpn == null || !alpn.equals("h2")) {
 374                 String msg;
 375                 if (alpn == null) {
 376                     Log.logSSL("ALPN not supported");
 377                     msg = "ALPN not supported";
 378                 } else {
 379                     switch (alpn) {
 380                         case "":
 381                             Log.logSSL(msg = "No ALPN negotiated");
 382                             break;
 383                         case "http/1.1":
 384                             Log.logSSL( msg = "HTTP/1.1 ALPN returned");
 385                             break;
 386                         default:
 387                             Log.logSSL(msg = "Unexpected ALPN: " + alpn);
 388                             cf.completeExceptionally(new IOException(msg));
 389                     }
 390                 }
 391                 cf.completeExceptionally(new ALPNException(msg, aconn));
 392                 return cf;
 393             }
 394             cf.complete(null);
 395             return cf;
 396         };
 397 
 398         return aconn.getALPN().thenCompose(checkAlpnCF);
 399     }
 400 
 401     synchronized boolean singleStream() {
 402         return singleStream;
 403     }
 404 
 405     synchronized void setSingleStream(boolean use) {
 406         singleStream = use;
 407     }
 408 
 409     static String keyFor(HttpConnection connection) {
 410         boolean isProxy = connection.isProxied();
 411         boolean isSecure = connection.isSecure();
 412         InetSocketAddress addr = connection.address();
 413 
 414         return keyString(isSecure, isProxy, addr.getHostString(), addr.getPort());
 415     }
 416 
 417     static String keyFor(URI uri, InetSocketAddress proxy) {
 418         boolean isSecure = uri.getScheme().equalsIgnoreCase("https");
 419         boolean isProxy = proxy != null;
 420 
 421         String host;
 422         int port;
 423 
 424         if (proxy != null) {
 425             host = proxy.getHostString();
 426             port = proxy.getPort();
 427         } else {
 428             host = uri.getHost();
 429             port = uri.getPort();
 430         }
 431         return keyString(isSecure, isProxy, host, port);
 432     }
 433 
 434     // {C,S}:{H:P}:host:port
 435     // C indicates clear text connection "http"
 436     // S indicates secure "https"
 437     // H indicates host (direct) connection
 438     // P indicates proxy
 439     // Eg: "S:H:foo.com:80"
 440     static String keyString(boolean secure, boolean proxy, String host, int port) {
 441         if (secure && port == -1)
 442             port = 443;
 443         else if (!secure && port == -1)
 444             port = 80;
 445         return (secure ? "S:" : "C:") + (proxy ? "P:" : "H:") + host + ":" + port;
 446     }
 447 
 448     String key() {
 449         return this.key;
 450     }
 451 
 452     boolean offerConnection() {
 453         return client2.offerConnection(this);
 454     }
 455 
 456     private HttpPublisher publisher() {
 457         return connection.publisher();
 458     }
 459 
 460     private void decodeHeaders(HeaderFrame frame, DecodingCallback decoder)
 461             throws IOException
 462     {
 463         debugHpack.log(Level.DEBUG, "decodeHeaders(%s)", decoder);
 464 
 465         boolean endOfHeaders = frame.getFlag(HeaderFrame.END_HEADERS);
 466 
 467         List<ByteBuffer> buffers = frame.getHeaderBlock();
 468         int len = buffers.size();
 469         for (int i = 0; i < len; i++) {
 470             ByteBuffer b = buffers.get(i);
 471             hpackIn.decode(b, endOfHeaders && (i == len - 1), decoder);
 472         }
 473     }
 474 
 475     final int getInitialSendWindowSize() {
 476         return serverSettings.getParameter(INITIAL_WINDOW_SIZE);
 477     }
 478 
 479     void close() {
 480         Log.logTrace("Closing HTTP/2 connection: to {0}", connection.address());
 481         GoAwayFrame f = new GoAwayFrame(0, ErrorFrame.NO_ERROR, "Requested by user".getBytes());
 482         // TODO: set last stream. For now zero ok.
 483         sendFrame(f);
 484     }
 485 
 486     long count;
 487     final void asyncReceive(ByteBuffer buffer) {
 488         // We don't need to read anything and
 489         // we don't want to send anything back to the server
 490         // until the connection preface has been sent.
 491         // Therefore we're going to wait if needed before reading
 492         // (and thus replying) to anything.
 493         // Starting to reply to something (e.g send an ACK to a
 494         // SettingsFrame sent by the server) before the connection
 495         // preface is fully sent might result in the server
 496         // sending a GOAWAY frame with 'invalid_preface'.
 497         //
 498         // Note: asyncReceive is only called from the Http2TubeSubscriber
 499         //       sequential scheduler.
 500         try {
 501             Supplier<ByteBuffer> bs = initial;
 502             // ensure that we always handle the initial buffer first,
 503             // if any.
 504             if (bs != null) {
 505                 initial = null;
 506                 ByteBuffer b = bs.get();
 507                 if (b.hasRemaining()) {
 508                     long c = ++count;
 509                     debug.log(Level.DEBUG, () -> "H2 Receiving Initial("
 510                         + c +"): " + b.remaining());
 511                     framesController.processReceivedData(framesDecoder, b);
 512                 }
 513             }
 514             ByteBuffer b = buffer;
 515             // the Http2TubeSubscriber scheduler ensures that the order of incoming
 516             // buffers is preserved.
 517             if (b == EMPTY_TRIGGER) {
 518                 debug.log(Level.DEBUG, "H2 Received EMPTY_TRIGGER");
 519                 boolean prefaceSent = framesController.prefaceSent;
 520                 assert prefaceSent;
 521                 // call framesController.processReceivedData to potentially
 522                 // trigger the processing of all the data buffered there.
 523                 framesController.processReceivedData(framesDecoder, buffer);
 524                 debug.log(Level.DEBUG, "H2 processed buffered data");
 525             } else {
 526                 long c = ++count;
 527                 debug.log(Level.DEBUG, "H2 Receiving(%d): %d", c, b.remaining());
 528                 framesController.processReceivedData(framesDecoder, buffer);
 529                 debug.log(Level.DEBUG, "H2 processed(%d)", c);
 530             }
 531         } catch (Throwable e) {
 532             String msg = Utils.stackTrace(e);
 533             Log.logTrace(msg);
 534             shutdown(e);
 535         }
 536     }
 537 
 538     Throwable getRecordedCause() {
 539         return cause;
 540     }
 541 
 542     void shutdown(Throwable t) {
 543         debug.log(Level.DEBUG, () -> "Shutting down h2c (closed="+closed+"): " + t);
 544         if (closed == true) return;
 545         synchronized (this) {
 546             if (closed == true) return;
 547             closed = true;
 548         }
 549         Log.logError(t);
 550         Throwable initialCause = this.cause;
 551         if (initialCause == null) this.cause = t;
 552         client2.deleteConnection(this);
 553         List<Stream<?>> c = new LinkedList<>(streams.values());
 554         for (Stream<?> s : c) {
 555             s.cancelImpl(t);
 556         }
 557         connection.close();
 558     }
 559 
 560     /**
 561      * Handles stream 0 (common) frames that apply to whole connection and passes
 562      * other stream specific frames to that Stream object.
 563      *
 564      * Invokes Stream.incoming() which is expected to process frame without
 565      * blocking.
 566      */
 567     void processFrame(Http2Frame frame) throws IOException {
 568         Log.logFrames(frame, "IN");
 569         int streamid = frame.streamid();
 570         if (frame instanceof MalformedFrame) {
 571             Log.logError(((MalformedFrame) frame).getMessage());
 572             if (streamid == 0) {
 573                 framesDecoder.close("Malformed frame on stream 0");
 574                 protocolError(((MalformedFrame) frame).getErrorCode(),
 575                         ((MalformedFrame) frame).getMessage());
 576             } else {
 577                 debug.log(Level.DEBUG, () -> "Reset stream: "
 578                           + ((MalformedFrame) frame).getMessage());
 579                 resetStream(streamid, ((MalformedFrame) frame).getErrorCode());
 580             }
 581             return;
 582         }
 583         if (streamid == 0) {
 584             handleConnectionFrame(frame);
 585         } else {
 586             if (frame instanceof SettingsFrame) {
 587                 // The stream identifier for a SETTINGS frame MUST be zero
 588                 framesDecoder.close(
 589                         "The stream identifier for a SETTINGS frame MUST be zero");
 590                 protocolError(GoAwayFrame.PROTOCOL_ERROR);
 591                 return;
 592             }
 593 
 594             Stream<?> stream = getStream(streamid);
 595             if (stream == null) {
 596                 // Should never receive a frame with unknown stream id
 597 
 598                 if (frame instanceof HeaderFrame) {
 599                     // always decode the headers as they may affect
 600                     // connection-level HPACK decoding state
 601                     HeaderDecoder decoder = new LoggingHeaderDecoder(new HeaderDecoder());
 602                     decodeHeaders((HeaderFrame) frame, decoder);
 603                 }
 604 
 605                 int sid = frame.streamid();
 606                 if (sid >= nextstreamid && !(frame instanceof ResetFrame)) {
 607                     // otherwise the stream has already been reset/closed
 608                     resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
 609                 }
 610                 return;
 611             }
 612             if (frame instanceof PushPromiseFrame) {
 613                 PushPromiseFrame pp = (PushPromiseFrame)frame;
 614                 handlePushPromise(stream, pp);
 615             } else if (frame instanceof HeaderFrame) {
 616                 // decode headers (or continuation)
 617                 decodeHeaders((HeaderFrame) frame, stream.rspHeadersConsumer());
 618                 stream.incoming(frame);
 619             } else {
 620                 stream.incoming(frame);
 621             }
 622         }
 623     }
 624 
 625     private <T> void handlePushPromise(Stream<T> parent, PushPromiseFrame pp)
 626         throws IOException
 627     {
 628         // always decode the headers as they may affect connection-level HPACK
 629         // decoding state
 630         HeaderDecoder decoder = new LoggingHeaderDecoder(new HeaderDecoder());
 631         decodeHeaders(pp, decoder);
 632 
 633         HttpRequestImpl parentReq = parent.request;
 634         int promisedStreamid = pp.getPromisedStream();
 635         if (promisedStreamid != nextPushStream) {
 636             resetStream(promisedStreamid, ResetFrame.PROTOCOL_ERROR);
 637             return;
 638         } else {
 639             nextPushStream += 2;
 640         }
 641 
 642         HttpHeadersImpl headers = decoder.headers();
 643         HttpRequestImpl pushReq = HttpRequestImpl.createPushRequest(parentReq, headers);
 644         Exchange<T> pushExch = new Exchange<>(pushReq, parent.exchange.multi);
 645         Stream.PushedStream<?,T> pushStream = createPushStream(parent, pushExch);
 646         pushExch.exchImpl = pushStream;
 647         pushStream.registerStream(promisedStreamid);
 648         parent.incoming_pushPromise(pushReq, pushStream);
 649     }
 650 
 651     private void handleConnectionFrame(Http2Frame frame)
 652         throws IOException
 653     {
 654         switch (frame.type()) {
 655           case SettingsFrame.TYPE:
 656               handleSettings((SettingsFrame)frame);
 657               break;
 658           case PingFrame.TYPE:
 659               handlePing((PingFrame)frame);
 660               break;
 661           case GoAwayFrame.TYPE:
 662               handleGoAway((GoAwayFrame)frame);
 663               break;
 664           case WindowUpdateFrame.TYPE:
 665               handleWindowUpdate((WindowUpdateFrame)frame);
 666               break;
 667           default:
 668             protocolError(ErrorFrame.PROTOCOL_ERROR);
 669         }
 670     }
 671 
 672     void resetStream(int streamid, int code) throws IOException {
 673         Log.logError(
 674             "Resetting stream {0,number,integer} with error code {1,number,integer}",
 675             streamid, code);
 676         ResetFrame frame = new ResetFrame(streamid, code);
 677         sendFrame(frame);
 678         closeStream(streamid);
 679     }
 680 
 681     void closeStream(int streamid) {
 682         debug.log(Level.DEBUG, "Closed stream %d", streamid);
 683         Stream<?> s = streams.remove(streamid);
 684         if (s != null) {
 685             // decrement the reference count on the HttpClientImpl
 686             // to allow the SelectorManager thread to exit if no
 687             // other operation is pending and the facade is no
 688             // longer referenced.
 689             client().unreference();
 690         }
 691         // ## Remove s != null. It is a hack for delayed cancellation,reset
 692         if (s != null && !(s instanceof Stream.PushedStream)) {
 693             // Since PushStreams have no request body, then they have no
 694             // corresponding entry in the window controller.
 695             windowController.removeStream(streamid);
 696         }
 697         if (singleStream() && streams.isEmpty()) {
 698             // should be only 1 stream, but there might be more if server push
 699             close();
 700         }
 701     }
 702 
 703     /**
 704      * Increments this connection's send Window by the amount in the given frame.
 705      */
 706     private void handleWindowUpdate(WindowUpdateFrame f)
 707         throws IOException
 708     {
 709         int amount = f.getUpdate();
 710         if (amount <= 0) {
 711             // ## temporarily disable to workaround a bug in Jetty where it
 712             // ## sends Window updates with a 0 update value.
 713             //protocolError(ErrorFrame.PROTOCOL_ERROR);
 714         } else {
 715             boolean success = windowController.increaseConnectionWindow(amount);
 716             if (!success) {
 717                 protocolError(ErrorFrame.FLOW_CONTROL_ERROR);  // overflow
 718             }
 719         }
 720     }
 721 
 722     private void protocolError(int errorCode)
 723         throws IOException
 724     {
 725         protocolError(errorCode, null);
 726     }
 727 
 728     private void protocolError(int errorCode, String msg)
 729         throws IOException
 730     {
 731         GoAwayFrame frame = new GoAwayFrame(0, errorCode);
 732         sendFrame(frame);
 733         shutdown(new IOException("protocol error" + (msg == null?"":(": " + msg))));
 734     }
 735 
 736     private void handleSettings(SettingsFrame frame)
 737         throws IOException
 738     {
 739         assert frame.streamid() == 0;
 740         if (!frame.getFlag(SettingsFrame.ACK)) {
 741             int oldWindowSize = serverSettings.getParameter(INITIAL_WINDOW_SIZE);
 742             int newWindowSize = frame.getParameter(INITIAL_WINDOW_SIZE);
 743             int diff = newWindowSize - oldWindowSize;
 744             if (diff != 0) {
 745                 windowController.adjustActiveStreams(diff);
 746             }
 747             serverSettings = frame;
 748             sendFrame(new SettingsFrame(SettingsFrame.ACK));
 749         }
 750     }
 751 
 752     private void handlePing(PingFrame frame)
 753         throws IOException
 754     {
 755         frame.setFlag(PingFrame.ACK);
 756         sendUnorderedFrame(frame);
 757     }
 758 
 759     private void handleGoAway(GoAwayFrame frame)
 760         throws IOException
 761     {
 762         shutdown(new IOException(
 763                         String.valueOf(connection.channel().getLocalAddress())
 764                         +": GOAWAY received"));
 765     }
 766 
 767     /**
 768      * Max frame size we are allowed to send
 769      */
 770     public int getMaxSendFrameSize() {
 771         int param = serverSettings.getParameter(MAX_FRAME_SIZE);
 772         if (param == -1) {
 773             param = DEFAULT_FRAME_SIZE;
 774         }
 775         return param;
 776     }
 777 
 778     /**
 779      * Max frame size we will receive
 780      */
 781     public int getMaxReceiveFrameSize() {
 782         return clientSettings.getParameter(MAX_FRAME_SIZE);
 783     }
 784 
 785     private static final String CLIENT_PREFACE = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
 786 
 787     private static final byte[] PREFACE_BYTES =
 788         CLIENT_PREFACE.getBytes(StandardCharsets.ISO_8859_1);
 789 
 790     /**
 791      * Sends Connection preface and Settings frame with current preferred
 792      * values
 793      */
 794     private void sendConnectionPreface() throws IOException {
 795         Log.logTrace("{0}: start sending connection preface to {1}",
 796                      connection.channel().getLocalAddress(),
 797                      connection.address());
 798         SettingsFrame sf = new SettingsFrame(clientSettings);
 799         int initialWindowSize = sf.getParameter(INITIAL_WINDOW_SIZE);
 800         ByteBuffer buf = framesEncoder.encodeConnectionPreface(PREFACE_BYTES, sf);
 801         Log.logFrames(sf, "OUT");
 802         // send preface bytes and SettingsFrame together
 803         HttpPublisher publisher = publisher();
 804         publisher.enqueue(List.of(buf));
 805         publisher.signalEnqueued();
 806         // mark preface sent.
 807         framesController.markPrefaceSent();
 808         Log.logTrace("PREFACE_BYTES sent");
 809         Log.logTrace("Settings Frame sent");
 810 
 811         // send a Window update for the receive buffer we are using
 812         // minus the initial 64 K specified in protocol
 813         final int len = windowUpdater.initialWindowSize - initialWindowSize;
 814         if (len > 0) {
 815             windowUpdater.sendWindowUpdate(len);
 816         }
 817         // there will be an ACK to the windows update - which should
 818         // cause any pending data stored before the preface was sent to be
 819         // flushed (see PrefaceController).
 820         Log.logTrace("finished sending connection preface");
 821         debug.log(Level.DEBUG, "Triggering processing of buffered data"
 822                   + " after sending connection preface");
 823         subscriber.onNext(List.of(EMPTY_TRIGGER));
 824     }
 825 
 826     /**
 827      * Returns an existing Stream with given id, or null if doesn't exist
 828      */
 829     @SuppressWarnings("unchecked")
 830     <T> Stream<T> getStream(int streamid) {
 831         return (Stream<T>)streams.get(streamid);
 832     }
 833 
 834     /**
 835      * Creates Stream with given id.
 836      */
 837     final <T> Stream<T> createStream(Exchange<T> exchange) {
 838         Stream<T> stream = new Stream<>(this, exchange, windowController);
 839         return stream;
 840     }
 841 
 842     <T> Stream.PushedStream<?,T> createPushStream(Stream<T> parent, Exchange<T> pushEx) {
 843         PushGroup<?,T> pg = parent.exchange.getPushGroup();
 844         return new Stream.PushedStream<>(pg, this, pushEx);
 845     }
 846 
 847     <T> void putStream(Stream<T> stream, int streamid) {
 848         // increment the reference count on the HttpClientImpl
 849         // to prevent the SelectorManager thread from exiting until
 850         // the stream is closed.
 851         client().reference();
 852         streams.put(streamid, stream);
 853     }
 854 
 855     /**
 856      * Encode the headers into a List<ByteBuffer> and then create HEADERS
 857      * and CONTINUATION frames from the list and return the List<Http2Frame>.
 858      */
 859     private List<HeaderFrame> encodeHeaders(OutgoingHeaders<Stream<?>> frame) {
 860         List<ByteBuffer> buffers = encodeHeadersImpl(
 861                 getMaxSendFrameSize(),
 862                 frame.getAttachment().getRequestPseudoHeaders(),
 863                 frame.getUserHeaders(),
 864                 frame.getSystemHeaders());
 865 
 866         List<HeaderFrame> frames = new ArrayList<>(buffers.size());
 867         Iterator<ByteBuffer> bufIterator = buffers.iterator();
 868         HeaderFrame oframe = new HeadersFrame(frame.streamid(), frame.getFlags(), bufIterator.next());
 869         frames.add(oframe);
 870         while(bufIterator.hasNext()) {
 871             oframe = new ContinuationFrame(frame.streamid(), bufIterator.next());
 872             frames.add(oframe);
 873         }
 874         oframe.setFlag(HeaderFrame.END_HEADERS);
 875         return frames;
 876     }
 877 
 878     // Dedicated cache for headers encoding ByteBuffer.
 879     // There can be no concurrent access to this  buffer as all access to this buffer
 880     // and its content happen within a single critical code block section protected
 881     // by the sendLock. / (see sendFrame())
 882     // private final ByteBufferPool headerEncodingPool = new ByteBufferPool();
 883 
 884     private ByteBuffer getHeaderBuffer(int maxFrameSize) {
 885         ByteBuffer buf = ByteBuffer.allocate(maxFrameSize);
 886         buf.limit(maxFrameSize);
 887         return buf;
 888     }
 889 
 890     /*
 891      * Encodes all the headers from the given HttpHeaders into the given List
 892      * of buffers.
 893      *
 894      * From https://tools.ietf.org/html/rfc7540#section-8.1.2 :
 895      *
 896      *     ...Just as in HTTP/1.x, header field names are strings of ASCII
 897      *     characters that are compared in a case-insensitive fashion.  However,
 898      *     header field names MUST be converted to lowercase prior to their
 899      *     encoding in HTTP/2...
 900      */
 901     private List<ByteBuffer> encodeHeadersImpl(int maxFrameSize, HttpHeaders... headers) {
 902         ByteBuffer buffer = getHeaderBuffer(maxFrameSize);
 903         List<ByteBuffer> buffers = new ArrayList<>();
 904         for(HttpHeaders header : headers) {
 905             for (Map.Entry<String, List<String>> e : header.map().entrySet()) {
 906                 String lKey = e.getKey().toLowerCase();
 907                 List<String> values = e.getValue();
 908                 for (String value : values) {
 909                     hpackOut.header(lKey, value);
 910                     while (!hpackOut.encode(buffer)) {
 911                         buffer.flip();
 912                         buffers.add(buffer);
 913                         buffer =  getHeaderBuffer(maxFrameSize);
 914                     }
 915                 }
 916             }
 917         }
 918         buffer.flip();
 919         buffers.add(buffer);
 920         return buffers;
 921     }
 922 
 923     private List<ByteBuffer> encodeHeaders(OutgoingHeaders<Stream<?>> oh, Stream<?> stream) {
 924         oh.streamid(stream.streamid);
 925         if (Log.headers()) {
 926             StringBuilder sb = new StringBuilder("HEADERS FRAME (stream=");
 927             sb.append(stream.streamid).append(")\n");
 928             Log.dumpHeaders(sb, "    ", oh.getAttachment().getRequestPseudoHeaders());
 929             Log.dumpHeaders(sb, "    ", oh.getSystemHeaders());
 930             Log.dumpHeaders(sb, "    ", oh.getUserHeaders());
 931             Log.logHeaders(sb.toString());
 932         }
 933         List<HeaderFrame> frames = encodeHeaders(oh);
 934         return encodeFrames(frames);
 935     }
 936 
 937     private List<ByteBuffer> encodeFrames(List<HeaderFrame> frames) {
 938         if (Log.frames()) {
 939             frames.forEach(f -> Log.logFrames(f, "OUT"));
 940         }
 941         return framesEncoder.encodeFrames(frames);
 942     }
 943 
 944     private Stream<?> registerNewStream(OutgoingHeaders<Stream<?>> oh) {
 945         Stream<?> stream = oh.getAttachment();
 946         int streamid = nextstreamid;
 947         nextstreamid += 2;
 948         stream.registerStream(streamid);
 949         // set outgoing window here. This allows thread sending
 950         // body to proceed.
 951         windowController.registerStream(streamid, getInitialSendWindowSize());
 952         return stream;
 953     }
 954 
 955     private final Object sendlock = new Object();
 956 
 957     void sendFrame(Http2Frame frame) {
 958         try {
 959             HttpPublisher publisher = publisher();
 960             synchronized (sendlock) {
 961                 if (frame instanceof OutgoingHeaders) {
 962                     @SuppressWarnings("unchecked")
 963                     OutgoingHeaders<Stream<?>> oh = (OutgoingHeaders<Stream<?>>) frame;
 964                     Stream<?> stream = registerNewStream(oh);
 965                     // provide protection from inserting unordered frames between Headers and Continuation
 966                     publisher.enqueue(encodeHeaders(oh, stream));
 967                 } else {
 968                     publisher.enqueue(encodeFrame(frame));
 969                 }
 970             }
 971             publisher.signalEnqueued();
 972         } catch (IOException e) {
 973             if (!closed) {
 974                 Log.logError(e);
 975                 shutdown(e);
 976             }
 977         }
 978     }
 979 
 980     private List<ByteBuffer> encodeFrame(Http2Frame frame) {
 981         Log.logFrames(frame, "OUT");
 982         return framesEncoder.encodeFrame(frame);
 983     }
 984 
 985     void sendDataFrame(DataFrame frame) {
 986         try {
 987             HttpPublisher publisher = publisher();
 988             publisher.enqueue(encodeFrame(frame));
 989             publisher.signalEnqueued();
 990         } catch (IOException e) {
 991             if (!closed) {
 992                 Log.logError(e);
 993                 shutdown(e);
 994             }
 995         }
 996     }
 997 
 998     /*
 999      * Direct call of the method bypasses synchronization on "sendlock" and
1000      * allowed only of control frames: WindowUpdateFrame, PingFrame and etc.
1001      * prohibited for such frames as DataFrame, HeadersFrame, ContinuationFrame.
1002      */
1003     void sendUnorderedFrame(Http2Frame frame) {
1004         try {
1005             HttpPublisher publisher = publisher();
1006             publisher.enqueueUnordered(encodeFrame(frame));
1007             publisher.signalEnqueued();
1008         } catch (IOException e) {
1009             if (!closed) {
1010                 Log.logError(e);
1011                 shutdown(e);
1012             }
1013         }
1014     }
1015 
1016     /**
1017      * A simple tube subscriber for reading from the connection flow.
1018      */
1019     final class Http2TubeSubscriber implements TubeSubscriber {
1020         volatile Flow.Subscription subscription;
1021         volatile boolean completed;
1022         volatile boolean dropped;
1023         volatile Throwable error;
1024         final ConcurrentLinkedQueue<ByteBuffer> queue
1025                 = new ConcurrentLinkedQueue<>();
1026         final SequentialScheduler scheduler =
1027                 SequentialScheduler.synchronizedScheduler(this::processQueue);
1028 
1029         final void processQueue() {
1030             try {
1031                 while (!queue.isEmpty() && !scheduler.isStopped()) {
1032                     ByteBuffer buffer = queue.poll();
1033                     debug.log(Level.DEBUG,
1034                               "sending %d to Http2Connection.asyncReceive",
1035                               buffer.remaining());
1036                     asyncReceive(buffer);
1037                 }
1038             } catch (Throwable t) {
1039                 Throwable x = error;
1040                 if (x == null) error = t;
1041             } finally {
1042                 Throwable x = error;
1043                 if (x != null) {
1044                     debug.log(Level.DEBUG, "Stopping scheduler", x);
1045                     scheduler.stop();
1046                     Http2Connection.this.shutdown(x);
1047                 }
1048             }
1049         }
1050 
1051 
1052         public void onSubscribe(Flow.Subscription subscription) {
1053             // supports being called multiple time.
1054             // doesn't cancel the previous subscription, since that is
1055             // most probably the same as the new subscription.
1056             assert this.subscription == null || dropped == false;
1057             this.subscription = subscription;
1058             dropped = false;
1059             // TODO FIXME: request(1) should be done by the delegate.
1060             if (!completed) {
1061                 debug.log(Level.DEBUG, "onSubscribe: requesting Long.MAX_VALUE for reading");
1062                 subscription.request(Long.MAX_VALUE);
1063             } else {
1064                 debug.log(Level.DEBUG, "onSubscribe: already completed");
1065             }
1066         }
1067 
1068         @Override
1069         public void onNext(List<ByteBuffer> item) {
1070             debug.log(Level.DEBUG, () -> "onNext: got " + Utils.remaining(item)
1071                     + " bytes in " + item.size() + " buffers");
1072             queue.addAll(item);
1073             scheduler.deferOrSchedule(client().theExecutor());
1074         }
1075 
1076         @Override
1077         public void onError(Throwable throwable) {
1078             debug.log(Level.DEBUG, () -> "onError: " + throwable);
1079             error = throwable;
1080             completed = true;
1081             scheduler.deferOrSchedule(client().theExecutor());
1082         }
1083 
1084         @Override
1085         public void onComplete() {
1086             debug.log(Level.DEBUG, "EOF");
1087             error = new EOFException("EOF reached while reading");
1088             completed = true;
1089             scheduler.deferOrSchedule(client().theExecutor());
1090         }
1091 
1092         public void dropSubscription() {
1093             debug.log(Level.DEBUG, "dropSubscription");
1094             // we could probably set subscription to null here...
1095             // then we might not need the 'dropped' boolean?
1096             dropped = true;
1097         }
1098     }
1099 
1100     @Override
1101     public final String toString() {
1102         return dbgString();
1103     }
1104 
1105     final String dbgString() {
1106         return "Http2Connection("
1107                     + connection.getConnectionFlow() + ")";
1108     }
1109 
1110     final class LoggingHeaderDecoder extends HeaderDecoder {
1111 
1112         private final HeaderDecoder delegate;
1113         private final System.Logger debugHpack =
1114                 Utils.getHpackLogger(this::dbgString, DEBUG_HPACK);
1115 
1116         LoggingHeaderDecoder(HeaderDecoder delegate) {
1117             this.delegate = delegate;
1118         }
1119 
1120         String dbgString() {
1121             return Http2Connection.this.dbgString() + "/LoggingHeaderDecoder";
1122         }
1123 
1124         @Override
1125         public void onDecoded(CharSequence name, CharSequence value) {
1126             delegate.onDecoded(name, value);
1127         }
1128 
1129         @Override
1130         public void onIndexed(int index,
1131                               CharSequence name,
1132                               CharSequence value) {
1133             debugHpack.log(Level.DEBUG, "onIndexed(%s, %s, %s)%n",
1134                            index, name, value);
1135             delegate.onIndexed(index, name, value);
1136         }
1137 
1138         @Override
1139         public void onLiteral(int index,
1140                               CharSequence name,
1141                               CharSequence value,
1142                               boolean valueHuffman) {
1143             debugHpack.log(Level.DEBUG, "onLiteral(%s, %s, %s, %s)%n",
1144                               index, name, value, valueHuffman);
1145             delegate.onLiteral(index, name, value, valueHuffman);
1146         }
1147 
1148         @Override
1149         public void onLiteral(CharSequence name,
1150                               boolean nameHuffman,
1151                               CharSequence value,
1152                               boolean valueHuffman) {
1153             debugHpack.log(Level.DEBUG, "onLiteral(%s, %s, %s, %s)%n",
1154                            name, nameHuffman, value, valueHuffman);
1155             delegate.onLiteral(name, nameHuffman, value, valueHuffman);
1156         }
1157 
1158         @Override
1159         public void onLiteralNeverIndexed(int index,
1160                                           CharSequence name,
1161                                           CharSequence value,
1162                                           boolean valueHuffman) {
1163             debugHpack.log(Level.DEBUG, "onLiteralNeverIndexed(%s, %s, %s, %s)%n",
1164                            index, name, value, valueHuffman);
1165             delegate.onLiteralNeverIndexed(index, name, value, valueHuffman);
1166         }
1167 
1168         @Override
1169         public void onLiteralNeverIndexed(CharSequence name,
1170                                           boolean nameHuffman,
1171                                           CharSequence value,
1172                                           boolean valueHuffman) {
1173             debugHpack.log(Level.DEBUG, "onLiteralNeverIndexed(%s, %s, %s, %s)%n",
1174                            name, nameHuffman, value, valueHuffman);
1175             delegate.onLiteralNeverIndexed(name, nameHuffman, value, valueHuffman);
1176         }
1177 
1178         @Override
1179         public void onLiteralWithIndexing(int index,
1180                                           CharSequence name,
1181                                           CharSequence value,
1182                                           boolean valueHuffman) {
1183             debugHpack.log(Level.DEBUG, "onLiteralWithIndexing(%s, %s, %s, %s)%n",
1184                            index, name, value, valueHuffman);
1185             delegate.onLiteralWithIndexing(index, name, value, valueHuffman);
1186         }
1187 
1188         @Override
1189         public void onLiteralWithIndexing(CharSequence name,
1190                                           boolean nameHuffman,
1191                                           CharSequence value,
1192                                           boolean valueHuffman) {
1193             debugHpack.log(Level.DEBUG, "onLiteralWithIndexing(%s, %s, %s, %s)%n",
1194                               name, nameHuffman, value, valueHuffman);
1195             delegate.onLiteralWithIndexing(name, nameHuffman, value, valueHuffman);
1196         }
1197 
1198         @Override
1199         public void onSizeUpdate(int capacity) {
1200             debugHpack.log(Level.DEBUG, "onSizeUpdate(%s)%n", capacity);
1201             delegate.onSizeUpdate(capacity);
1202         }
1203 
1204         @Override
1205         HttpHeadersImpl headers() {
1206             return delegate.headers();
1207         }
1208     }
1209 
1210     static class HeaderDecoder implements DecodingCallback {
1211         HttpHeadersImpl headers;
1212 
1213         HeaderDecoder() {
1214             this.headers = new HttpHeadersImpl();
1215         }
1216 
1217         @Override
1218         public void onDecoded(CharSequence name, CharSequence value) {
1219             headers.addHeader(name.toString(), value.toString());
1220         }
1221 
1222         HttpHeadersImpl headers() {
1223             return headers;
1224         }
1225     }
1226 
1227     static final class ConnectionWindowUpdateSender extends WindowUpdateSender {
1228 
1229         final int initialWindowSize;
1230         public ConnectionWindowUpdateSender(Http2Connection connection,
1231                                             int initialWindowSize) {
1232             super(connection, initialWindowSize);
1233             this.initialWindowSize = initialWindowSize;
1234         }
1235 
1236         @Override
1237         int getStreamId() {
1238             return 0;
1239         }
1240     }
1241 
1242     /**
1243      * Thrown when https handshake negotiates http/1.1 alpn instead of h2
1244      */
1245     static final class ALPNException extends IOException {
1246         private static final long serialVersionUID = 23138275393635783L;
1247         final AbstractAsyncSSLConnection connection;
1248 
1249         ALPNException(String msg, AbstractAsyncSSLConnection connection) {
1250             super(msg);
1251             this.connection = connection;
1252         }
1253 
1254         AbstractAsyncSSLConnection getConnection() {
1255             return connection;
1256         }
1257     }
1258 }