1 /*
   2  * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.internal.net.http;
  27 
  28 import java.io.EOFException;
  29 import java.io.IOException;
  30 import java.io.UncheckedIOException;
  31 import java.net.InetSocketAddress;
  32 import java.net.URI;
  33 import java.nio.ByteBuffer;
  34 import java.nio.charset.StandardCharsets;
  35 import java.util.Iterator;
  36 import java.util.LinkedList;
  37 import java.util.List;
  38 import java.util.Locale;
  39 import java.util.Map;
  40 import java.util.Set;
  41 import java.util.concurrent.CompletableFuture;
  42 import java.util.ArrayList;
  43 import java.util.Objects;
  44 import java.util.concurrent.ConcurrentHashMap;
  45 import java.util.concurrent.ConcurrentLinkedQueue;
  46 import java.util.concurrent.Flow;
  47 import java.util.function.Function;
  48 import java.util.function.Supplier;
  49 import javax.net.ssl.SSLEngine;
  50 import javax.net.ssl.SSLException;
  51 import java.net.http.HttpClient;
  52 import java.net.http.HttpHeaders;
  53 import jdk.internal.net.http.HttpConnection.HttpPublisher;
  54 import jdk.internal.net.http.common.FlowTube;
  55 import jdk.internal.net.http.common.FlowTube.TubeSubscriber;
  56 import jdk.internal.net.http.common.HttpHeadersBuilder;
  57 import jdk.internal.net.http.common.Log;
  58 import jdk.internal.net.http.common.Logger;
  59 import jdk.internal.net.http.common.MinimalFuture;
  60 import jdk.internal.net.http.common.SequentialScheduler;
  61 import jdk.internal.net.http.common.Utils;
  62 import jdk.internal.net.http.frame.ContinuationFrame;
  63 import jdk.internal.net.http.frame.DataFrame;
  64 import jdk.internal.net.http.frame.ErrorFrame;
  65 import jdk.internal.net.http.frame.FramesDecoder;
  66 import jdk.internal.net.http.frame.FramesEncoder;
  67 import jdk.internal.net.http.frame.GoAwayFrame;
  68 import jdk.internal.net.http.frame.HeaderFrame;
  69 import jdk.internal.net.http.frame.HeadersFrame;
  70 import jdk.internal.net.http.frame.Http2Frame;
  71 import jdk.internal.net.http.frame.MalformedFrame;
  72 import jdk.internal.net.http.frame.OutgoingHeaders;
  73 import jdk.internal.net.http.frame.PingFrame;
  74 import jdk.internal.net.http.frame.PushPromiseFrame;
  75 import jdk.internal.net.http.frame.ResetFrame;
  76 import jdk.internal.net.http.frame.SettingsFrame;
  77 import jdk.internal.net.http.frame.WindowUpdateFrame;
  78 import jdk.internal.net.http.hpack.Encoder;
  79 import jdk.internal.net.http.hpack.Decoder;
  80 import jdk.internal.net.http.hpack.DecodingCallback;
  81 import static java.nio.charset.StandardCharsets.UTF_8;
  82 import static jdk.internal.net.http.frame.SettingsFrame.*;
  83 
  84 /**
  85  * An Http2Connection. Encapsulates the socket(channel) and any SSLEngine used
  86  * over it. Contains an HttpConnection which hides the SocketChannel SSL stuff.
  87  *
  88  * Http2Connections belong to a Http2ClientImpl, (one of) which belongs
  89  * to a HttpClientImpl.
  90  *
  91  * Creation cases:
  92  * 1) upgraded HTTP/1.1 plain tcp connection
  93  * 2) prior knowledge directly created plain tcp connection
  94  * 3) directly created HTTP/2 SSL connection which uses ALPN.
  95  *
  96  * Sending is done by writing directly to underlying HttpConnection object which
  97  * is operating in async mode. No flow control applies on output at this level
  98  * and all writes are just executed as puts to an output Q belonging to HttpConnection
  99  * Flow control is implemented by HTTP/2 protocol itself.
 100  *
 101  * Hpack header compression
 102  * and outgoing stream creation is also done here, because these operations
 103  * must be synchronized at the socket level. Stream objects send frames simply
 104  * by placing them on the connection's output Queue. sendFrame() is called
 105  * from a higher level (Stream) thread.
 106  *
 107  * asyncReceive(ByteBuffer) is always called from the selector thread. It assembles
 108  * incoming Http2Frames, and directs them to the appropriate Stream.incoming()
 109  * or handles them directly itself. This thread performs hpack decompression
 110  * and incoming stream creation (Server push). Incoming frames destined for a
 111  * stream are provided by calling Stream.incoming().
 112  */
 113 class Http2Connection  {
 114 
 115     final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
 116     final static Logger DEBUG_LOGGER =
 117             Utils.getDebugLogger("Http2Connection"::toString, Utils.DEBUG);
 118     private final Logger debugHpack =
 119             Utils.getHpackLogger(this::dbgString, Utils.DEBUG_HPACK);
 120     static final ByteBuffer EMPTY_TRIGGER = ByteBuffer.allocate(0);
 121 
 122     static private final int MAX_CLIENT_STREAM_ID = Integer.MAX_VALUE; // 2147483647
 123     static private final int MAX_SERVER_STREAM_ID = Integer.MAX_VALUE - 1; // 2147483646
 124 
 125     /**
 126      * Flag set when no more streams to be opened on this connection.
 127      * Two cases where it is used.
 128      *
 129      * 1. Two connections to the same server were opened concurrently, in which
 130      *    case one of them will be put in the cache, and the second will expire
 131      *    when all its opened streams (which usually should be a single client
 132      *    stream + possibly some additional push-promise server streams) complete.
 133      * 2. A cached connection reaches its maximum number of streams (~ 2^31-1)
 134      *    either server / or client allocated, in which case it will be taken
 135      *    out of the cache - allowing a new connection to replace it. It will
 136      *    expire when all its still open streams (which could be many) eventually
 137      *    complete.
 138      */
 139     private boolean finalStream;
 140 
 141     /*
 142      * ByteBuffer pooling strategy for HTTP/2 protocol.
 143      *
 144      * In general there are 4 points where ByteBuffers are used:
 145      *  - incoming/outgoing frames from/to ByteBuffers plus incoming/outgoing
 146      *    encrypted data in case of SSL connection.
 147      *
 148      * 1. Outgoing frames encoded to ByteBuffers.
 149      *
 150      *  Outgoing ByteBuffers are created with required size and frequently
 151      *  small (except DataFrames, etc). At this place no pools at all. All
 152      *  outgoing buffers should eventually be collected by GC.
 153      *
 154      * 2. Incoming ByteBuffers (decoded to frames).
 155      *
 156      *  Here, total elimination of BB pool is not a good idea.
 157      *  We don't know how many bytes we will receive through network.
 158      *
 159      *  A possible future improvement ( currently not implemented ):
 160      *  Allocate buffers of reasonable size. The following life of the BB:
 161      *   - If all frames decoded from the BB are other than DataFrame and
 162      *     HeaderFrame (and HeaderFrame subclasses) BB is returned to pool,
 163      *   - If a DataFrame is decoded from the BB. In that case DataFrame refers
 164      *     to sub-buffer obtained by slice(). Such a BB is never returned to the
 165      *     pool and will eventually be GC'ed.
 166      *   - If a HeadersFrame is decoded from the BB. Then header decoding is
 167      *     performed inside processFrame method and the buffer could be release
 168      *     back to pool.
 169      *
 170      * 3. SSL encrypted buffers ( received ).
 171      *
 172      *  The current implementation recycles encrypted buffers read from the
 173      *  channel. The pool of buffers has a maximum size of 3, SocketTube.MAX_BUFFERS,
 174      *  direct buffers which are shared by all connections on a given client.
 175      *  The pool is used by all SSL connections - whether HTTP/1.1 or HTTP/2,
 176      *  but only for SSL encrypted buffers that circulate between the SocketTube
 177      *  Publisher and the SSLFlowDelegate Reader. Limiting the pool to this
 178      *  particular segment allows the use of direct buffers, thus avoiding any
 179      *  additional copy in the NIO socket channel implementation. See
 180      *  HttpClientImpl.SSLDirectBufferSupplier, SocketTube.SSLDirectBufferSource,
 181      *  and SSLTube.recycler.
 182      */
 183 
 184 
 185     // A small class that allows to control frames with respect to the state of
 186     // the connection preface. Any data received before the connection
 187     // preface is sent will be buffered.
 188     private final class FramesController {
 189         volatile boolean prefaceSent;
 190         volatile List<ByteBuffer> pending;
 191 
 192         boolean processReceivedData(FramesDecoder decoder, ByteBuffer buf)
 193                 throws IOException
 194         {
 195             // if preface is not sent, buffers data in the pending list
 196             if (!prefaceSent) {
 197                 if (debug.on())
 198                     debug.log("Preface not sent: buffering %d", buf.remaining());
 199                 synchronized (this) {
 200                     if (!prefaceSent) {
 201                         if (pending == null) pending = new ArrayList<>();
 202                         pending.add(buf);
 203                         if (debug.on())
 204                             debug.log("there are now %d bytes buffered waiting for preface to be sent"
 205                                     + Utils.remaining(pending)
 206                             );
 207                         return false;
 208                     }
 209                 }
 210             }
 211 
 212             // Preface is sent. Checks for pending data and flush it.
 213             // We rely on this method being called from within the Http2TubeSubscriber
 214             // scheduler, so we know that no other thread could execute this method
 215             // concurrently while we're here.
 216             // This ensures that later incoming buffers will not
 217             // be processed before we have flushed the pending queue.
 218             // No additional synchronization is therefore necessary here.
 219             List<ByteBuffer> pending = this.pending;
 220             this.pending = null;
 221             if (pending != null) {
 222                 // flush pending data
 223                 if (debug.on()) debug.log(() -> "Processing buffered data: "
 224                       + Utils.remaining(pending));
 225                 for (ByteBuffer b : pending) {
 226                     decoder.decode(b);
 227                 }
 228             }
 229             // push the received buffer to the frames decoder.
 230             if (buf != EMPTY_TRIGGER) {
 231                 if (debug.on()) debug.log("Processing %d", buf.remaining());
 232                 decoder.decode(buf);
 233             }
 234             return true;
 235         }
 236 
 237         // Mark that the connection preface is sent
 238         void markPrefaceSent() {
 239             assert !prefaceSent;
 240             synchronized (this) {
 241                 prefaceSent = true;
 242             }
 243         }
 244     }
 245 
 246     volatile boolean closed;
 247 
 248     //-------------------------------------
 249     final HttpConnection connection;
 250     private final Http2ClientImpl client2;
 251     private final Map<Integer,Stream<?>> streams = new ConcurrentHashMap<>();
 252     private int nextstreamid;
 253     private int nextPushStream = 2;
 254     // actual stream ids are not allocated until the Headers frame is ready
 255     // to be sent. The following two fields are updated as soon as a stream
 256     // is created and assigned to a connection. They are checked before
 257     // assigning a stream to a connection.
 258     private int lastReservedClientStreamid = 1;
 259     private int lastReservedServerStreamid = 0;
 260     private int numReservedClientStreams = 0; // count of current streams
 261     private int numReservedServerStreams = 0; // count of current streams
 262     private final Encoder hpackOut;
 263     private final Decoder hpackIn;
 264     final SettingsFrame clientSettings;
 265     private volatile SettingsFrame serverSettings;
 266     private final String key; // for HttpClientImpl.connections map
 267     private final FramesDecoder framesDecoder;
 268     private final FramesEncoder framesEncoder = new FramesEncoder();
 269 
 270     /**
 271      * Send Window controller for both connection and stream windows.
 272      * Each of this connection's Streams MUST use this controller.
 273      */
 274     private final WindowController windowController = new WindowController();
 275     private final FramesController framesController = new FramesController();
 276     private final Http2TubeSubscriber subscriber;
 277     final ConnectionWindowUpdateSender windowUpdater;
 278     private volatile Throwable cause;
 279     private volatile Supplier<ByteBuffer> initial;
 280 
 281     static final int DEFAULT_FRAME_SIZE = 16 * 1024;
 282 
 283 
 284     // TODO: need list of control frames from other threads
 285     // that need to be sent
 286 
 287     private Http2Connection(HttpConnection connection,
 288                             Http2ClientImpl client2,
 289                             int nextstreamid,
 290                             String key) {
 291         this.connection = connection;
 292         this.client2 = client2;
 293         this.subscriber = new Http2TubeSubscriber(client2.client());
 294         this.nextstreamid = nextstreamid;
 295         this.key = key;
 296         this.clientSettings = this.client2.getClientSettings();
 297         this.framesDecoder = new FramesDecoder(this::processFrame,
 298                 clientSettings.getParameter(SettingsFrame.MAX_FRAME_SIZE));
 299         // serverSettings will be updated by server
 300         this.serverSettings = SettingsFrame.defaultRFCSettings();
 301         this.hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE));
 302         this.hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE));
 303         if (debugHpack.on()) {
 304             debugHpack.log("For the record:" + super.toString());
 305             debugHpack.log("Decoder created: %s", hpackIn);
 306             debugHpack.log("Encoder created: %s", hpackOut);
 307         }
 308         this.windowUpdater = new ConnectionWindowUpdateSender(this,
 309                 client2.getConnectionWindowSize(clientSettings));
 310     }
 311 
 312     /**
 313      * Case 1) Create from upgraded HTTP/1.1 connection.
 314      * Is ready to use. Can't be SSL. exchange is the Exchange
 315      * that initiated the connection, whose response will be delivered
 316      * on a Stream.
 317      */
 318     private Http2Connection(HttpConnection connection,
 319                     Http2ClientImpl client2,
 320                     Exchange<?> exchange,
 321                     Supplier<ByteBuffer> initial)
 322         throws IOException, InterruptedException
 323     {
 324         this(connection,
 325                 client2,
 326                 3, // stream 1 is registered during the upgrade
 327                 keyFor(connection));
 328         reserveStream(true);
 329         Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize());
 330 
 331         Stream<?> initialStream = createStream(exchange);
 332         initialStream.registerStream(1);
 333         windowController.registerStream(1, getInitialSendWindowSize());
 334         initialStream.requestSent();
 335         // Upgrading:
 336         //    set callbacks before sending preface - makes sure anything that
 337         //    might be sent by the server will come our way.
 338         this.initial = initial;
 339         connectFlows(connection);
 340         sendConnectionPreface();
 341     }
 342 
 343     // Used when upgrading an HTTP/1.1 connection to HTTP/2 after receiving
 344     // agreement from the server. Async style but completes immediately, because
 345     // the connection is already connected.
 346     static CompletableFuture<Http2Connection> createAsync(HttpConnection connection,
 347                                                           Http2ClientImpl client2,
 348                                                           Exchange<?> exchange,
 349                                                           Supplier<ByteBuffer> initial)
 350     {
 351         return MinimalFuture.supply(() -> new Http2Connection(connection, client2, exchange, initial));
 352     }
 353 
 354     // Requires TLS handshake. So, is really async
 355     static CompletableFuture<Http2Connection> createAsync(HttpRequestImpl request,
 356                                                           Http2ClientImpl h2client) {
 357         assert request.secure();
 358         AbstractAsyncSSLConnection connection = (AbstractAsyncSSLConnection)
 359         HttpConnection.getConnection(request.getAddress(),
 360                                      h2client.client(),
 361                                      request,
 362                                      HttpClient.Version.HTTP_2);
 363 
 364         return connection.connectAsync()
 365                   .thenCompose(unused -> checkSSLConfig(connection))
 366                   .thenCompose(notused-> {
 367                       CompletableFuture<Http2Connection> cf = new MinimalFuture<>();
 368                       try {
 369                           Http2Connection hc = new Http2Connection(request, h2client, connection);
 370                           cf.complete(hc);
 371                       } catch (IOException e) {
 372                           cf.completeExceptionally(e);
 373                       }
 374                       return cf; } );
 375     }
 376 
 377     /**
 378      * Cases 2) 3)
 379      *
 380      * request is request to be sent.
 381      */
 382     private Http2Connection(HttpRequestImpl request,
 383                             Http2ClientImpl h2client,
 384                             HttpConnection connection)
 385         throws IOException
 386     {
 387         this(connection,
 388              h2client,
 389              1,
 390              keyFor(request.uri(), request.proxy()));
 391 
 392         Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize());
 393 
 394         // safe to resume async reading now.
 395         connectFlows(connection);
 396         sendConnectionPreface();
 397     }
 398 
 399     private void connectFlows(HttpConnection connection) {
 400         FlowTube tube =  connection.getConnectionFlow();
 401         // Connect the flow to our Http2TubeSubscriber:
 402         tube.connectFlows(connection.publisher(), subscriber);
 403     }
 404 
 405     final HttpClientImpl client() {
 406         return client2.client();
 407     }
 408 
 409     // call these before assigning a request/stream to a connection
 410     // if false returned then a new Http2Connection is required
 411     // if true, the the stream may be assigned to this connection
 412     // for server push, if false returned, then the stream should be cancelled
 413     synchronized boolean reserveStream(boolean clientInitiated) throws IOException {
 414         if (finalStream) {
 415             return false;
 416         }
 417         if (clientInitiated && (lastReservedClientStreamid + 2) >= MAX_CLIENT_STREAM_ID) {
 418             setFinalStream();
 419             client2.deleteConnection(this);
 420             return false;
 421         } else if (!clientInitiated && (lastReservedServerStreamid + 2) >= MAX_SERVER_STREAM_ID) {
 422             setFinalStream();
 423             client2.deleteConnection(this);
 424             return false;
 425         }
 426         if (clientInitiated)
 427             lastReservedClientStreamid+=2;
 428         else
 429             lastReservedServerStreamid+=2;
 430 
 431         assert numReservedClientStreams >= 0;
 432         assert numReservedServerStreams >= 0;
 433         if (clientInitiated &&numReservedClientStreams >= maxConcurrentClientInitiatedStreams()) {
 434             throw new IOException("too many concurrent streams");
 435         } else if (clientInitiated) {
 436             numReservedClientStreams++;
 437         }
 438         if (!clientInitiated && numReservedServerStreams >= maxConcurrentServerInitiatedStreams()) {
 439             return false;
 440         } else if (!clientInitiated) {
 441             numReservedServerStreams++;
 442         }
 443         return true;
 444     }
 445 
 446     /**
 447      * Throws an IOException if h2 was not negotiated
 448      */
 449     private static CompletableFuture<?> checkSSLConfig(AbstractAsyncSSLConnection aconn) {
 450         assert aconn.isSecure();
 451 
 452         Function<String, CompletableFuture<Void>> checkAlpnCF = (alpn) -> {
 453             CompletableFuture<Void> cf = new MinimalFuture<>();
 454             SSLEngine engine = aconn.getEngine();
 455             assert Objects.equals(alpn, engine.getApplicationProtocol());
 456 
 457             DEBUG_LOGGER.log("checkSSLConfig: alpn: %s", alpn );
 458 
 459             if (alpn == null || !alpn.equals("h2")) {
 460                 String msg;
 461                 if (alpn == null) {
 462                     Log.logSSL("ALPN not supported");
 463                     msg = "ALPN not supported";
 464                 } else {
 465                     switch (alpn) {
 466                         case "":
 467                             Log.logSSL(msg = "No ALPN negotiated");
 468                             break;
 469                         case "http/1.1":
 470                             Log.logSSL( msg = "HTTP/1.1 ALPN returned");
 471                             break;
 472                         default:
 473                             Log.logSSL(msg = "Unexpected ALPN: " + alpn);
 474                             cf.completeExceptionally(new IOException(msg));
 475                     }
 476                 }
 477                 cf.completeExceptionally(new ALPNException(msg, aconn));
 478                 return cf;
 479             }
 480             cf.complete(null);
 481             return cf;
 482         };
 483 
 484         return aconn.getALPN()
 485                 .whenComplete((r,t) -> {
 486                     if (t != null && t instanceof SSLException) {
 487                         // something went wrong during the initial handshake
 488                         // close the connection
 489                         aconn.close();
 490                     }
 491                 })
 492                 .thenCompose(checkAlpnCF);
 493     }
 494 
 495     synchronized boolean finalStream() {
 496         return finalStream;
 497     }
 498 
 499     /**
 500      * Mark this connection so no more streams created on it and it will close when
 501      * all are complete.
 502      */
 503     synchronized void setFinalStream() {
 504         finalStream = true;
 505     }
 506 
 507     static String keyFor(HttpConnection connection) {
 508         boolean isProxy = connection.isProxied(); // tunnel or plain clear connection through proxy
 509         boolean isSecure = connection.isSecure();
 510         InetSocketAddress addr = connection.address();
 511 
 512         return keyString(isSecure, isProxy, addr.getHostString(), addr.getPort());
 513     }
 514 
 515     static String keyFor(URI uri, InetSocketAddress proxy) {
 516         boolean isSecure = uri.getScheme().equalsIgnoreCase("https");
 517         boolean isProxy = proxy != null;
 518 
 519         String host;
 520         int port;
 521 
 522         if (proxy != null && !isSecure) {
 523             // clear connection through proxy: use
 524             // proxy host / proxy port
 525             host = proxy.getHostString();
 526             port = proxy.getPort();
 527         } else {
 528             // either secure tunnel connection through proxy
 529             // or direct connection to host, but in either
 530             // case only that host can be reached through
 531             // the connection: use target host / target port
 532             host = uri.getHost();
 533             port = uri.getPort();
 534         }
 535         return keyString(isSecure, isProxy, host, port);
 536     }
 537 
 538     // {C,S}:{H:P}:host:port
 539     // C indicates clear text connection "http"
 540     // S indicates secure "https"
 541     // H indicates host (direct) connection
 542     // P indicates proxy
 543     // Eg: "S:H:foo.com:80"
 544     static String keyString(boolean secure, boolean proxy, String host, int port) {
 545         if (secure && port == -1)
 546             port = 443;
 547         else if (!secure && port == -1)
 548             port = 80;
 549         return (secure ? "S:" : "C:") + (proxy ? "P:" : "H:") + host + ":" + port;
 550     }
 551 
 552     String key() {
 553         return this.key;
 554     }
 555 
 556     boolean offerConnection() {
 557         return client2.offerConnection(this);
 558     }
 559 
 560     private HttpPublisher publisher() {
 561         return connection.publisher();
 562     }
 563 
 564     private void decodeHeaders(HeaderFrame frame, DecodingCallback decoder)
 565             throws IOException
 566     {
 567         if (debugHpack.on()) debugHpack.log("decodeHeaders(%s)", decoder);
 568 
 569         boolean endOfHeaders = frame.getFlag(HeaderFrame.END_HEADERS);
 570 
 571         List<ByteBuffer> buffers = frame.getHeaderBlock();
 572         int len = buffers.size();
 573         for (int i = 0; i < len; i++) {
 574             ByteBuffer b = buffers.get(i);
 575             hpackIn.decode(b, endOfHeaders && (i == len - 1), decoder);
 576         }
 577     }
 578 
 579     final int getInitialSendWindowSize() {
 580         return serverSettings.getParameter(INITIAL_WINDOW_SIZE);
 581     }
 582 
 583     final int maxConcurrentClientInitiatedStreams() {
 584         return serverSettings.getParameter(MAX_CONCURRENT_STREAMS);
 585     }
 586 
 587     final int maxConcurrentServerInitiatedStreams() {
 588         return clientSettings.getParameter(MAX_CONCURRENT_STREAMS);
 589     }
 590 
 591     void close() {
 592         Log.logTrace("Closing HTTP/2 connection: to {0}", connection.address());
 593         GoAwayFrame f = new GoAwayFrame(0,
 594                                         ErrorFrame.NO_ERROR,
 595                                         "Requested by user".getBytes(UTF_8));
 596         // TODO: set last stream. For now zero ok.
 597         sendFrame(f);
 598     }
 599 
 600     long count;
 601     final void asyncReceive(ByteBuffer buffer) {
 602         // We don't need to read anything and
 603         // we don't want to send anything back to the server
 604         // until the connection preface has been sent.
 605         // Therefore we're going to wait if needed before reading
 606         // (and thus replying) to anything.
 607         // Starting to reply to something (e.g send an ACK to a
 608         // SettingsFrame sent by the server) before the connection
 609         // preface is fully sent might result in the server
 610         // sending a GOAWAY frame with 'invalid_preface'.
 611         //
 612         // Note: asyncReceive is only called from the Http2TubeSubscriber
 613         //       sequential scheduler.
 614         try {
 615             Supplier<ByteBuffer> bs = initial;
 616             // ensure that we always handle the initial buffer first,
 617             // if any.
 618             if (bs != null) {
 619                 initial = null;
 620                 ByteBuffer b = bs.get();
 621                 if (b.hasRemaining()) {
 622                     long c = ++count;
 623                     if (debug.on())
 624                         debug.log(() -> "H2 Receiving Initial(" + c +"): " + b.remaining());
 625                     framesController.processReceivedData(framesDecoder, b);
 626                 }
 627             }
 628             ByteBuffer b = buffer;
 629             // the Http2TubeSubscriber scheduler ensures that the order of incoming
 630             // buffers is preserved.
 631             if (b == EMPTY_TRIGGER) {
 632                 if (debug.on()) debug.log("H2 Received EMPTY_TRIGGER");
 633                 boolean prefaceSent = framesController.prefaceSent;
 634                 assert prefaceSent;
 635                 // call framesController.processReceivedData to potentially
 636                 // trigger the processing of all the data buffered there.
 637                 framesController.processReceivedData(framesDecoder, buffer);
 638                 if (debug.on()) debug.log("H2 processed buffered data");
 639             } else {
 640                 long c = ++count;
 641                 if (debug.on())
 642                     debug.log("H2 Receiving(%d): %d", c, b.remaining());
 643                 framesController.processReceivedData(framesDecoder, buffer);
 644                 if (debug.on()) debug.log("H2 processed(%d)", c);
 645             }
 646         } catch (Throwable e) {
 647             String msg = Utils.stackTrace(e);
 648             Log.logTrace(msg);
 649             shutdown(e);
 650         }
 651     }
 652 
 653     Throwable getRecordedCause() {
 654         return cause;
 655     }
 656 
 657     void shutdown(Throwable t) {
 658         if (debug.on()) debug.log(() -> "Shutting down h2c (closed="+closed+"): " + t);
 659         if (closed == true) return;
 660         synchronized (this) {
 661             if (closed == true) return;
 662             closed = true;
 663         }
 664         if (Log.errors()) {
 665             if (!(t instanceof EOFException) || isActive()) {
 666                 Log.logError(t);
 667             } else if (t != null) {
 668                 Log.logError("Shutting down connection: {0}", t.getMessage());
 669             }
 670         }
 671         Throwable initialCause = this.cause;
 672         if (initialCause == null) this.cause = t;
 673         client2.deleteConnection(this);
 674         List<Stream<?>> c = new LinkedList<>(streams.values());
 675         for (Stream<?> s : c) {
 676             try {
 677                 s.connectionClosing(t);
 678             } catch (Throwable e) {
 679                 Log.logError("Failed to close stream {0}: {1}", s.streamid, e);
 680             }
 681         }
 682         connection.close();
 683     }
 684 
 685     /**
 686      * Streams initiated by a client MUST use odd-numbered stream
 687      * identifiers; those initiated by the server MUST use even-numbered
 688      * stream identifiers.
 689      */
 690     private static final boolean isServerInitiatedStream(int streamid) {
 691         return (streamid & 0x1) == 0;
 692     }
 693 
 694     /**
 695      * Handles stream 0 (common) frames that apply to whole connection and passes
 696      * other stream specific frames to that Stream object.
 697      *
 698      * Invokes Stream.incoming() which is expected to process frame without
 699      * blocking.
 700      */
 701     void processFrame(Http2Frame frame) throws IOException {
 702         Log.logFrames(frame, "IN");
 703         int streamid = frame.streamid();
 704         if (frame instanceof MalformedFrame) {
 705             Log.logError(((MalformedFrame) frame).getMessage());
 706             if (streamid == 0) {
 707                 framesDecoder.close("Malformed frame on stream 0");
 708                 protocolError(((MalformedFrame) frame).getErrorCode(),
 709                         ((MalformedFrame) frame).getMessage());
 710             } else {
 711                 if (debug.on())
 712                     debug.log(() -> "Reset stream: " + ((MalformedFrame) frame).getMessage());
 713                 resetStream(streamid, ((MalformedFrame) frame).getErrorCode());
 714             }
 715             return;
 716         }
 717         if (streamid == 0) {
 718             handleConnectionFrame(frame);
 719         } else {
 720             if (frame instanceof SettingsFrame) {
 721                 // The stream identifier for a SETTINGS frame MUST be zero
 722                 framesDecoder.close(
 723                         "The stream identifier for a SETTINGS frame MUST be zero");
 724                 protocolError(GoAwayFrame.PROTOCOL_ERROR);
 725                 return;
 726             }
 727 
 728             Stream<?> stream = getStream(streamid);
 729             if (stream == null) {
 730                 // Should never receive a frame with unknown stream id
 731 
 732                 if (frame instanceof HeaderFrame) {
 733                     // always decode the headers as they may affect
 734                     // connection-level HPACK decoding state
 735                     DecodingCallback decoder = new ValidatingHeadersConsumer();
 736                     try {
 737                         decodeHeaders((HeaderFrame) frame, decoder);
 738                     } catch (UncheckedIOException e) {
 739                         protocolError(ResetFrame.PROTOCOL_ERROR, e.getMessage());
 740                         return;
 741                     }
 742                 }
 743 
 744                 if (!(frame instanceof ResetFrame)) {
 745                     if (frame instanceof DataFrame) {
 746                         dropDataFrame((DataFrame)frame);
 747                     }
 748                     if (isServerInitiatedStream(streamid)) {
 749                         if (streamid < nextPushStream) {
 750                             // trailing data on a cancelled push promise stream,
 751                             // reset will already have been sent, ignore
 752                             Log.logTrace("Ignoring cancelled push promise frame " + frame);
 753                         } else {
 754                             resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
 755                         }
 756                     } else if (streamid >= nextstreamid) {
 757                         // otherwise the stream has already been reset/closed
 758                         resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
 759                     }
 760                 }
 761                 return;
 762             }
 763             if (frame instanceof PushPromiseFrame) {
 764                 PushPromiseFrame pp = (PushPromiseFrame)frame;
 765                 try {
 766                     handlePushPromise(stream, pp);
 767                 } catch (UncheckedIOException e) {
 768                     protocolError(ResetFrame.PROTOCOL_ERROR, e.getMessage());
 769                     return;
 770                 }
 771             } else if (frame instanceof HeaderFrame) {
 772                 // decode headers (or continuation)
 773                 try {
 774                     decodeHeaders((HeaderFrame) frame, stream.rspHeadersConsumer());
 775                 } catch (UncheckedIOException e) {
 776                     protocolError(ResetFrame.PROTOCOL_ERROR, e.getMessage());
 777                     return;
 778                 }
 779                 stream.incoming(frame);
 780             } else {
 781                 stream.incoming(frame);
 782             }
 783         }
 784     }
 785 
 786     final void dropDataFrame(DataFrame df) {
 787         if (closed) return;
 788         if (debug.on()) {
 789             debug.log("Dropping data frame for stream %d (%d payload bytes)",
 790                     df.streamid(), df.payloadLength());
 791         }
 792         ensureWindowUpdated(df);
 793     }
 794 
 795     final void ensureWindowUpdated(DataFrame df) {
 796         try {
 797             if (closed) return;
 798             int length = df.payloadLength();
 799             if (length > 0) {
 800                 windowUpdater.update(length);
 801             }
 802         } catch(Throwable t) {
 803             Log.logError("Unexpected exception while updating window: {0}", (Object)t);
 804         }
 805     }
 806 
 807     private <T> void handlePushPromise(Stream<T> parent, PushPromiseFrame pp)
 808         throws IOException
 809     {
 810         // always decode the headers as they may affect connection-level HPACK
 811         // decoding state
 812         HeaderDecoder decoder = new HeaderDecoder();
 813         decodeHeaders(pp, decoder);
 814 
 815         HttpRequestImpl parentReq = parent.request;
 816         int promisedStreamid = pp.getPromisedStream();
 817         if (promisedStreamid != nextPushStream) {
 818             resetStream(promisedStreamid, ResetFrame.PROTOCOL_ERROR);
 819             return;
 820         } else if (!reserveStream(false)) {
 821             resetStream(promisedStreamid, ResetFrame.REFUSED_STREAM);
 822             return;
 823         } else {
 824             nextPushStream += 2;
 825         }
 826 
 827         HttpHeaders headers = decoder.headers();
 828         HttpRequestImpl pushReq = HttpRequestImpl.createPushRequest(parentReq, headers);
 829         Exchange<T> pushExch = new Exchange<>(pushReq, parent.exchange.multi);
 830         Stream.PushedStream<T> pushStream = createPushStream(parent, pushExch);
 831         pushExch.exchImpl = pushStream;
 832         pushStream.registerStream(promisedStreamid);
 833         parent.incoming_pushPromise(pushReq, pushStream);
 834     }
 835 
 836     private void handleConnectionFrame(Http2Frame frame)
 837         throws IOException
 838     {
 839         switch (frame.type()) {
 840           case SettingsFrame.TYPE:
 841               handleSettings((SettingsFrame)frame);
 842               break;
 843           case PingFrame.TYPE:
 844               handlePing((PingFrame)frame);
 845               break;
 846           case GoAwayFrame.TYPE:
 847               handleGoAway((GoAwayFrame)frame);
 848               break;
 849           case WindowUpdateFrame.TYPE:
 850               handleWindowUpdate((WindowUpdateFrame)frame);
 851               break;
 852           default:
 853             protocolError(ErrorFrame.PROTOCOL_ERROR);
 854         }
 855     }
 856 
 857     void resetStream(int streamid, int code) throws IOException {
 858         try {
 859             if (connection.channel().isOpen()) {
 860                 // no need to try & send a reset frame if the
 861                 // connection channel is already closed.
 862                 Log.logError(
 863                         "Resetting stream {0,number,integer} with error code {1,number,integer}",
 864                         streamid, code);
 865                 ResetFrame frame = new ResetFrame(streamid, code);
 866                 sendFrame(frame);
 867             } else if (debug.on()) {
 868                 debug.log("Channel already closed, no need to reset stream %d",
 869                           streamid);
 870             }
 871         } finally {
 872             decrementStreamsCount(streamid);
 873             closeStream(streamid);
 874         }
 875     }
 876 
 877     // reduce count of streams by 1 if stream still exists
 878     synchronized void decrementStreamsCount(int streamid) {
 879         Stream<?> s = streams.get(streamid);
 880         if (s == null || !s.deRegister())
 881             return;
 882         if (streamid % 2 == 1) {
 883             numReservedClientStreams--;
 884             assert numReservedClientStreams >= 0 :
 885                     "negative client stream count for stream=" + streamid;
 886         } else {
 887             numReservedServerStreams--;
 888             assert numReservedServerStreams >= 0 :
 889                     "negative server stream count for stream=" + streamid;
 890         }
 891     }
 892 
 893     void closeStream(int streamid) {
 894         if (debug.on()) debug.log("Closed stream %d", streamid);
 895         boolean isClient = (streamid % 2) == 1;
 896         Stream<?> s = streams.remove(streamid);
 897         if (s != null) {
 898             // decrement the reference count on the HttpClientImpl
 899             // to allow the SelectorManager thread to exit if no
 900             // other operation is pending and the facade is no
 901             // longer referenced.
 902             client().streamUnreference();
 903         }
 904         // ## Remove s != null. It is a hack for delayed cancellation,reset
 905         if (s != null && !(s instanceof Stream.PushedStream)) {
 906             // Since PushStreams have no request body, then they have no
 907             // corresponding entry in the window controller.
 908             windowController.removeStream(streamid);
 909         }
 910         if (finalStream() && streams.isEmpty()) {
 911             // should be only 1 stream, but there might be more if server push
 912             close();
 913         }
 914     }
 915 
 916     /**
 917      * Increments this connection's send Window by the amount in the given frame.
 918      */
 919     private void handleWindowUpdate(WindowUpdateFrame f)
 920         throws IOException
 921     {
 922         int amount = f.getUpdate();
 923         if (amount <= 0) {
 924             // ## temporarily disable to workaround a bug in Jetty where it
 925             // ## sends Window updates with a 0 update value.
 926             //protocolError(ErrorFrame.PROTOCOL_ERROR);
 927         } else {
 928             boolean success = windowController.increaseConnectionWindow(amount);
 929             if (!success) {
 930                 protocolError(ErrorFrame.FLOW_CONTROL_ERROR);  // overflow
 931             }
 932         }
 933     }
 934 
 935     private void protocolError(int errorCode)
 936         throws IOException
 937     {
 938         protocolError(errorCode, null);
 939     }
 940 
 941     private void protocolError(int errorCode, String msg)
 942         throws IOException
 943     {
 944         GoAwayFrame frame = new GoAwayFrame(0, errorCode);
 945         sendFrame(frame);
 946         shutdown(new IOException("protocol error" + (msg == null?"":(": " + msg))));
 947     }
 948 
 949     private void handleSettings(SettingsFrame frame)
 950         throws IOException
 951     {
 952         assert frame.streamid() == 0;
 953         if (!frame.getFlag(SettingsFrame.ACK)) {
 954             int newWindowSize = frame.getParameter(INITIAL_WINDOW_SIZE);
 955             if (newWindowSize != -1) {
 956                 int oldWindowSize = serverSettings.getParameter(INITIAL_WINDOW_SIZE);
 957                 int diff = newWindowSize - oldWindowSize;
 958                 if (diff != 0) {
 959                     windowController.adjustActiveStreams(diff);
 960                 }
 961             }
 962 
 963             serverSettings.update(frame);
 964             sendFrame(new SettingsFrame(SettingsFrame.ACK));
 965         }
 966     }
 967 
 968     private void handlePing(PingFrame frame)
 969         throws IOException
 970     {
 971         frame.setFlag(PingFrame.ACK);
 972         sendUnorderedFrame(frame);
 973     }
 974 
 975     private void handleGoAway(GoAwayFrame frame)
 976         throws IOException
 977     {
 978         shutdown(new IOException(
 979                         String.valueOf(connection.channel().getLocalAddress())
 980                         +": GOAWAY received"));
 981     }
 982 
 983     /**
 984      * Max frame size we are allowed to send
 985      */
 986     public int getMaxSendFrameSize() {
 987         int param = serverSettings.getParameter(MAX_FRAME_SIZE);
 988         if (param == -1) {
 989             param = DEFAULT_FRAME_SIZE;
 990         }
 991         return param;
 992     }
 993 
 994     /**
 995      * Max frame size we will receive
 996      */
 997     public int getMaxReceiveFrameSize() {
 998         return clientSettings.getParameter(MAX_FRAME_SIZE);
 999     }
1000 
1001     private static final String CLIENT_PREFACE = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
1002 
1003     private static final byte[] PREFACE_BYTES =
1004         CLIENT_PREFACE.getBytes(StandardCharsets.ISO_8859_1);
1005 
1006     /**
1007      * Sends Connection preface and Settings frame with current preferred
1008      * values
1009      */
1010     private void sendConnectionPreface() throws IOException {
1011         Log.logTrace("{0}: start sending connection preface to {1}",
1012                      connection.channel().getLocalAddress(),
1013                      connection.address());
1014         SettingsFrame sf = new SettingsFrame(clientSettings);
1015         ByteBuffer buf = framesEncoder.encodeConnectionPreface(PREFACE_BYTES, sf);
1016         Log.logFrames(sf, "OUT");
1017         // send preface bytes and SettingsFrame together
1018         HttpPublisher publisher = publisher();
1019         publisher.enqueueUnordered(List.of(buf));
1020         publisher.signalEnqueued();
1021         // mark preface sent.
1022         framesController.markPrefaceSent();
1023         Log.logTrace("PREFACE_BYTES sent");
1024         Log.logTrace("Settings Frame sent");
1025 
1026         // send a Window update for the receive buffer we are using
1027         // minus the initial 64 K -1 specified in protocol:
1028         // RFC 7540, Section 6.9.2:
1029         // "[...] the connection flow-control window is set to the default
1030         // initial window size until a WINDOW_UPDATE frame is received."
1031         //
1032         // Note that the default initial window size, not to be confused
1033         // with the initial window size, is defined by RFC 7540 as
1034         // 64K -1.
1035         final int len = windowUpdater.initialWindowSize - DEFAULT_INITIAL_WINDOW_SIZE;
1036         if (len != 0) {
1037             if (Log.channel()) {
1038                 Log.logChannel("Sending initial connection window update frame: {0} ({1} - {2})",
1039                         len, windowUpdater.initialWindowSize, DEFAULT_INITIAL_WINDOW_SIZE);
1040             }
1041             windowUpdater.sendWindowUpdate(len);
1042         }
1043         // there will be an ACK to the windows update - which should
1044         // cause any pending data stored before the preface was sent to be
1045         // flushed (see PrefaceController).
1046         Log.logTrace("finished sending connection preface");
1047         if (debug.on())
1048             debug.log("Triggering processing of buffered data"
1049                       + " after sending connection preface");
1050         subscriber.onNext(List.of(EMPTY_TRIGGER));
1051     }
1052 
1053     /**
1054      * Returns an existing Stream with given id, or null if doesn't exist
1055      */
1056     @SuppressWarnings("unchecked")
1057     <T> Stream<T> getStream(int streamid) {
1058         return (Stream<T>)streams.get(streamid);
1059     }
1060 
1061     /**
1062      * Creates Stream with given id.
1063      */
1064     final <T> Stream<T> createStream(Exchange<T> exchange) {
1065         Stream<T> stream = new Stream<>(this, exchange, windowController);
1066         return stream;
1067     }
1068 
1069     <T> Stream.PushedStream<T> createPushStream(Stream<T> parent, Exchange<T> pushEx) {
1070         PushGroup<T> pg = parent.exchange.getPushGroup();
1071         return new Stream.PushedStream<>(pg, this, pushEx);
1072     }
1073 
1074     <T> void putStream(Stream<T> stream, int streamid) {
1075         // increment the reference count on the HttpClientImpl
1076         // to prevent the SelectorManager thread from exiting until
1077         // the stream is closed.
1078         client().streamReference();
1079         streams.put(streamid, stream);
1080     }
1081 
1082     /**
1083      * Encode the headers into a List<ByteBuffer> and then create HEADERS
1084      * and CONTINUATION frames from the list and return the List<Http2Frame>.
1085      */
1086     private List<HeaderFrame> encodeHeaders(OutgoingHeaders<Stream<?>> frame) {
1087         List<ByteBuffer> buffers = encodeHeadersImpl(
1088                 getMaxSendFrameSize(),
1089                 frame.getAttachment().getRequestPseudoHeaders(),
1090                 frame.getUserHeaders(),
1091                 frame.getSystemHeaders());
1092 
1093         List<HeaderFrame> frames = new ArrayList<>(buffers.size());
1094         Iterator<ByteBuffer> bufIterator = buffers.iterator();
1095         HeaderFrame oframe = new HeadersFrame(frame.streamid(), frame.getFlags(), bufIterator.next());
1096         frames.add(oframe);
1097         while(bufIterator.hasNext()) {
1098             oframe = new ContinuationFrame(frame.streamid(), bufIterator.next());
1099             frames.add(oframe);
1100         }
1101         oframe.setFlag(HeaderFrame.END_HEADERS);
1102         return frames;
1103     }
1104 
1105     // Dedicated cache for headers encoding ByteBuffer.
1106     // There can be no concurrent access to this  buffer as all access to this buffer
1107     // and its content happen within a single critical code block section protected
1108     // by the sendLock. / (see sendFrame())
1109     // private final ByteBufferPool headerEncodingPool = new ByteBufferPool();
1110 
1111     private ByteBuffer getHeaderBuffer(int maxFrameSize) {
1112         ByteBuffer buf = ByteBuffer.allocate(maxFrameSize);
1113         buf.limit(maxFrameSize);
1114         return buf;
1115     }
1116 
1117     /*
1118      * Encodes all the headers from the given HttpHeaders into the given List
1119      * of buffers.
1120      *
1121      * From https://tools.ietf.org/html/rfc7540#section-8.1.2 :
1122      *
1123      *     ...Just as in HTTP/1.x, header field names are strings of ASCII
1124      *     characters that are compared in a case-insensitive fashion.  However,
1125      *     header field names MUST be converted to lowercase prior to their
1126      *     encoding in HTTP/2...
1127      */
1128     private List<ByteBuffer> encodeHeadersImpl(int maxFrameSize, HttpHeaders... headers) {
1129         ByteBuffer buffer = getHeaderBuffer(maxFrameSize);
1130         List<ByteBuffer> buffers = new ArrayList<>();
1131         for(HttpHeaders header : headers) {
1132             for (Map.Entry<String, List<String>> e : header.map().entrySet()) {
1133                 String lKey = e.getKey().toLowerCase(Locale.US);
1134                 List<String> values = e.getValue();
1135                 for (String value : values) {
1136                     hpackOut.header(lKey, value);
1137                     while (!hpackOut.encode(buffer)) {
1138                         buffer.flip();
1139                         buffers.add(buffer);
1140                         buffer =  getHeaderBuffer(maxFrameSize);
1141                     }
1142                 }
1143             }
1144         }
1145         buffer.flip();
1146         buffers.add(buffer);
1147         return buffers;
1148     }
1149 
1150     private List<ByteBuffer> encodeHeaders(OutgoingHeaders<Stream<?>> oh, Stream<?> stream) {
1151         oh.streamid(stream.streamid);
1152         if (Log.headers()) {
1153             StringBuilder sb = new StringBuilder("HEADERS FRAME (stream=");
1154             sb.append(stream.streamid).append(")\n");
1155             Log.dumpHeaders(sb, "    ", oh.getAttachment().getRequestPseudoHeaders());
1156             Log.dumpHeaders(sb, "    ", oh.getSystemHeaders());
1157             Log.dumpHeaders(sb, "    ", oh.getUserHeaders());
1158             Log.logHeaders(sb.toString());
1159         }
1160         List<HeaderFrame> frames = encodeHeaders(oh);
1161         return encodeFrames(frames);
1162     }
1163 
1164     private List<ByteBuffer> encodeFrames(List<HeaderFrame> frames) {
1165         if (Log.frames()) {
1166             frames.forEach(f -> Log.logFrames(f, "OUT"));
1167         }
1168         return framesEncoder.encodeFrames(frames);
1169     }
1170 
1171     private Stream<?> registerNewStream(OutgoingHeaders<Stream<?>> oh) {
1172         Stream<?> stream = oh.getAttachment();
1173         assert stream.streamid == 0;
1174         int streamid = nextstreamid;
1175         nextstreamid += 2;
1176         stream.registerStream(streamid);
1177         // set outgoing window here. This allows thread sending
1178         // body to proceed.
1179         windowController.registerStream(streamid, getInitialSendWindowSize());
1180         return stream;
1181     }
1182 
1183     private final Object sendlock = new Object();
1184 
1185     void sendFrame(Http2Frame frame) {
1186         try {
1187             HttpPublisher publisher = publisher();
1188             synchronized (sendlock) {
1189                 if (frame instanceof OutgoingHeaders) {
1190                     @SuppressWarnings("unchecked")
1191                     OutgoingHeaders<Stream<?>> oh = (OutgoingHeaders<Stream<?>>) frame;
1192                     Stream<?> stream = registerNewStream(oh);
1193                     // provide protection from inserting unordered frames between Headers and Continuation
1194                     publisher.enqueue(encodeHeaders(oh, stream));
1195                 } else {
1196                     publisher.enqueue(encodeFrame(frame));
1197                 }
1198             }
1199             publisher.signalEnqueued();
1200         } catch (IOException e) {
1201             if (!closed) {
1202                 Log.logError(e);
1203                 shutdown(e);
1204             }
1205         }
1206     }
1207 
1208     private List<ByteBuffer> encodeFrame(Http2Frame frame) {
1209         Log.logFrames(frame, "OUT");
1210         return framesEncoder.encodeFrame(frame);
1211     }
1212 
1213     void sendDataFrame(DataFrame frame) {
1214         try {
1215             HttpPublisher publisher = publisher();
1216             publisher.enqueue(encodeFrame(frame));
1217             publisher.signalEnqueued();
1218         } catch (IOException e) {
1219             if (!closed) {
1220                 Log.logError(e);
1221                 shutdown(e);
1222             }
1223         }
1224     }
1225 
1226     /*
1227      * Direct call of the method bypasses synchronization on "sendlock" and
1228      * allowed only of control frames: WindowUpdateFrame, PingFrame and etc.
1229      * prohibited for such frames as DataFrame, HeadersFrame, ContinuationFrame.
1230      */
1231     void sendUnorderedFrame(Http2Frame frame) {
1232         try {
1233             HttpPublisher publisher = publisher();
1234             publisher.enqueueUnordered(encodeFrame(frame));
1235             publisher.signalEnqueued();
1236         } catch (IOException e) {
1237             if (!closed) {
1238                 Log.logError(e);
1239                 shutdown(e);
1240             }
1241         }
1242     }
1243 
1244     /**
1245      * A simple tube subscriber for reading from the connection flow.
1246      */
1247     final class Http2TubeSubscriber implements TubeSubscriber {
1248         private volatile Flow.Subscription subscription;
1249         private volatile boolean completed;
1250         private volatile boolean dropped;
1251         private volatile Throwable error;
1252         private final ConcurrentLinkedQueue<ByteBuffer> queue
1253                 = new ConcurrentLinkedQueue<>();
1254         private final SequentialScheduler scheduler =
1255                 SequentialScheduler.synchronizedScheduler(this::processQueue);
1256         private final HttpClientImpl client;
1257 
1258         Http2TubeSubscriber(HttpClientImpl client) {
1259             this.client = Objects.requireNonNull(client);
1260         }
1261 
1262         final void processQueue() {
1263             try {
1264                 while (!queue.isEmpty() && !scheduler.isStopped()) {
1265                     ByteBuffer buffer = queue.poll();
1266                     if (debug.on())
1267                         debug.log("sending %d to Http2Connection.asyncReceive",
1268                                   buffer.remaining());
1269                     asyncReceive(buffer);
1270                 }
1271             } catch (Throwable t) {
1272                 Throwable x = error;
1273                 if (x == null) error = t;
1274             } finally {
1275                 Throwable x = error;
1276                 if (x != null) {
1277                     if (debug.on()) debug.log("Stopping scheduler", x);
1278                     scheduler.stop();
1279                     Http2Connection.this.shutdown(x);
1280                 }
1281             }
1282         }
1283 
1284         private final void runOrSchedule() {
1285             if (client.isSelectorThread()) {
1286                 scheduler.runOrSchedule(client.theExecutor());
1287             } else scheduler.runOrSchedule();
1288         }
1289 
1290         @Override
1291         public void onSubscribe(Flow.Subscription subscription) {
1292             // supports being called multiple time.
1293             // doesn't cancel the previous subscription, since that is
1294             // most probably the same as the new subscription.
1295             assert this.subscription == null || dropped == false;
1296             this.subscription = subscription;
1297             dropped = false;
1298             // TODO FIXME: request(1) should be done by the delegate.
1299             if (!completed) {
1300                 if (debug.on())
1301                     debug.log("onSubscribe: requesting Long.MAX_VALUE for reading");
1302                 subscription.request(Long.MAX_VALUE);
1303             } else {
1304                 if (debug.on()) debug.log("onSubscribe: already completed");
1305             }
1306         }
1307 
1308         @Override
1309         public void onNext(List<ByteBuffer> item) {
1310             if (debug.on()) debug.log(() -> "onNext: got " + Utils.remaining(item)
1311                     + " bytes in " + item.size() + " buffers");
1312             queue.addAll(item);
1313             runOrSchedule();
1314         }
1315 
1316         @Override
1317         public void onError(Throwable throwable) {
1318             if (debug.on()) debug.log(() -> "onError: " + throwable);
1319             error = throwable;
1320             completed = true;
1321             runOrSchedule();
1322         }
1323 
1324         @Override
1325         public void onComplete() {
1326             String msg = isActive()
1327                     ? "EOF reached while reading"
1328                     : "Idle connection closed by HTTP/2 peer";
1329             if (debug.on()) debug.log(msg);
1330             error = new EOFException(msg);
1331             completed = true;
1332             runOrSchedule();
1333         }
1334 
1335         @Override
1336         public void dropSubscription() {
1337             if (debug.on()) debug.log("dropSubscription");
1338             // we could probably set subscription to null here...
1339             // then we might not need the 'dropped' boolean?
1340             dropped = true;
1341         }
1342     }
1343 
1344     synchronized boolean isActive() {
1345         return numReservedClientStreams > 0 || numReservedServerStreams > 0;
1346     }
1347 
1348     @Override
1349     public final String toString() {
1350         return dbgString();
1351     }
1352 
1353     final String dbgString() {
1354         return "Http2Connection("
1355                     + connection.getConnectionFlow() + ")";
1356     }
1357 
1358     static class HeaderDecoder extends ValidatingHeadersConsumer {
1359 
1360         HttpHeadersBuilder headersBuilder;
1361 
1362         HeaderDecoder() {
1363             this.headersBuilder = new HttpHeadersBuilder();
1364         }
1365 
1366         @Override
1367         public void onDecoded(CharSequence name, CharSequence value) {
1368             String n = name.toString();
1369             String v = value.toString();
1370             super.onDecoded(n, v);
1371             headersBuilder.addHeader(n, v);
1372         }
1373 
1374         HttpHeaders headers() {
1375             return headersBuilder.build();
1376         }
1377     }
1378 
1379     /*
1380      * Checks RFC 7540 rules (relaxed) compliance regarding pseudo-headers.
1381      */
1382     static class ValidatingHeadersConsumer implements DecodingCallback {
1383 
1384         private static final Set<String> PSEUDO_HEADERS =
1385                 Set.of(":authority", ":method", ":path", ":scheme", ":status");
1386 
1387         /** Used to check that if there are pseudo-headers, they go first */
1388         private boolean pseudoHeadersEnded;
1389 
1390         /**
1391          * Called when END_HEADERS was received. This consumer may be invoked
1392          * again after reset() is called, but for a whole new set of headers.
1393          */
1394         void reset() {
1395             pseudoHeadersEnded = false;
1396         }
1397 
1398         @Override
1399         public void onDecoded(CharSequence name, CharSequence value)
1400                 throws UncheckedIOException
1401         {
1402             String n = name.toString();
1403             if (n.startsWith(":")) {
1404                 if (pseudoHeadersEnded) {
1405                     throw newException("Unexpected pseudo-header '%s'", n);
1406                 } else if (!PSEUDO_HEADERS.contains(n)) {
1407                     throw newException("Unknown pseudo-header '%s'", n);
1408                 }
1409             } else {
1410                 pseudoHeadersEnded = true;
1411                 if (!Utils.isValidName(n)) {
1412                     throw newException("Bad header name '%s'", n);
1413                 }
1414             }
1415             String v = value.toString();
1416             if (!Utils.isValidValue(v)) {
1417                 throw newException("Bad header value '%s'", v);
1418             }
1419         }
1420 
1421         private UncheckedIOException newException(String message, String header)
1422         {
1423             return new UncheckedIOException(
1424                     new IOException(String.format(message, header)));
1425         }
1426     }
1427 
1428     static final class ConnectionWindowUpdateSender extends WindowUpdateSender {
1429 
1430         final int initialWindowSize;
1431         public ConnectionWindowUpdateSender(Http2Connection connection,
1432                                             int initialWindowSize) {
1433             super(connection, initialWindowSize);
1434             this.initialWindowSize = initialWindowSize;
1435         }
1436 
1437         @Override
1438         int getStreamId() {
1439             return 0;
1440         }
1441     }
1442 
1443     /**
1444      * Thrown when https handshake negotiates http/1.1 alpn instead of h2
1445      */
1446     static final class ALPNException extends IOException {
1447         private static final long serialVersionUID = 0L;
1448         final transient AbstractAsyncSSLConnection connection;
1449 
1450         ALPNException(String msg, AbstractAsyncSSLConnection connection) {
1451             super(msg);
1452             this.connection = connection;
1453         }
1454 
1455         AbstractAsyncSSLConnection getConnection() {
1456             return connection;
1457         }
1458     }
1459 }