1 /*
   2  * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.incubator.http;
  27 
  28 import java.io.IOException;
  29 import java.net.InetSocketAddress;
  30 import java.net.URI;
  31 import jdk.incubator.http.HttpConnection.Mode;
  32 import java.nio.ByteBuffer;
  33 import java.nio.charset.StandardCharsets;
  34 import java.util.HashMap;
  35 import java.util.Iterator;
  36 import java.util.LinkedList;
  37 import java.util.List;
  38 import java.util.Map;
  39 import java.util.concurrent.CompletableFuture;
  40 import java.util.ArrayList;
  41 import java.util.Collections;
  42 import java.util.Formatter;
  43 import java.util.concurrent.ConcurrentHashMap;
  44 import java.util.concurrent.CountDownLatch;
  45 import java.util.stream.Collectors;
  46 import javax.net.ssl.SSLEngine;
  47 import jdk.incubator.http.internal.common.*;
  48 import jdk.incubator.http.internal.frame.*;
  49 import jdk.incubator.http.internal.hpack.Encoder;
  50 import jdk.incubator.http.internal.hpack.Decoder;
  51 import jdk.incubator.http.internal.hpack.DecodingCallback;
  52 
  53 import static jdk.incubator.http.internal.frame.SettingsFrame.*;
  54 
  55 
  56 /**
  57  * An Http2Connection. Encapsulates the socket(channel) and any SSLEngine used
  58  * over it. Contains an HttpConnection which hides the SocketChannel SSL stuff.
  59  *
  60  * Http2Connections belong to a Http2ClientImpl, (one of) which belongs
  61  * to a HttpClientImpl.
  62  *
  63  * Creation cases:
  64  * 1) upgraded HTTP/1.1 plain tcp connection
  65  * 2) prior knowledge directly created plain tcp connection
  66  * 3) directly created HTTP/2 SSL connection which uses ALPN.
  67  *
  68  * Sending is done by writing directly to underlying HttpConnection object which
  69  * is operating in async mode. No flow control applies on output at this level
  70  * and all writes are just executed as puts to an output Q belonging to HttpConnection
  71  * Flow control is implemented by HTTP/2 protocol itself.
  72  *
  73  * Hpack header compression
  74  * and outgoing stream creation is also done here, because these operations
  75  * must be synchronized at the socket level. Stream objects send frames simply
  76  * by placing them on the connection's output Queue. sendFrame() is called
  77  * from a higher level (Stream) thread.
  78  *
  79  * asyncReceive(ByteBuffer) is always called from the selector thread. It assembles
  80  * incoming Http2Frames, and directs them to the appropriate Stream.incoming()
  81  * or handles them directly itself. This thread performs hpack decompression
  82  * and incoming stream creation (Server push). Incoming frames destined for a
  83  * stream are provided by calling Stream.incoming().
  84  */
  85 class Http2Connection  {
  86     /*
  87      *  ByteBuffer pooling strategy for HTTP/2 protocol:
  88      *
  89      * In general there are 4 points where ByteBuffers are used:
  90      *  - incoming/outgoing frames from/to ByteBufers plus incoming/outgoing encrypted data
  91      *    in case of SSL connection.
  92      *
  93      * 1. Outgoing frames encoded to ByteBuffers.
  94      *    Outgoing ByteBuffers are created with requited size and frequently small (except DataFrames, etc)
  95      *    At this place no pools at all. All outgoing buffers should be collected by GC.
  96      *
  97      * 2. Incoming ByteBuffers (decoded to frames).
  98      *    Here, total elimination of BB pool is not a good idea.
  99      *    We don't know how many bytes we will receive through network.
 100      * So here we allocate buffer of reasonable size. The following life of the BB:
 101      * - If all frames decoded from the BB are other than DataFrame and HeaderFrame (and HeaderFrame subclasses)
 102      *     BB is returned to pool,
 103      * - If we decoded DataFrame from the BB. In that case DataFrame refers to subbuffer obtained by slice() method.
 104      *     Such BB is never returned to pool and will be GCed.
 105      * - If we decoded HeadersFrame from the BB. Then header decoding is performed inside processFrame method and
 106      *     the buffer could be release to pool.
 107      *
 108      * 3. SLL encrypted buffers. Here another pool was introduced and all net buffers are to/from the pool,
 109      *    because of we can't predict size encrypted packets.
 110      *
 111      */
 112 
 113 
 114     // A small class that allows to control frames with respect to the state of
 115     // the connection preface. Any data received before the connection
 116     // preface is sent will be buffered.
 117     private final class FramesController {
 118         volatile boolean prefaceSent;
 119         volatile List<ByteBufferReference> pending;
 120 
 121         boolean processReceivedData(FramesDecoder decoder, ByteBufferReference buf)
 122                 throws IOException
 123         {
 124             // if preface is not sent, buffers data in the pending list
 125             if (!prefaceSent) {
 126                 synchronized (this) {
 127                     if (!prefaceSent) {
 128                         if (pending == null) pending = new ArrayList<>();
 129                         pending.add(buf);
 130                         return false;
 131                     }
 132                 }
 133             }
 134 
 135             // Preface is sent. Checks for pending data and flush it.
 136             // We rely on this method being called from within the readlock,
 137             // so we know that no other thread could execute this method
 138             // concurrently while we're here.
 139             // This ensures that later incoming buffers will not
 140             // be processed before we have flushed the pending queue.
 141             // No additional synchronization is therefore necessary here.
 142             List<ByteBufferReference> pending = this.pending;
 143             this.pending = null;
 144             if (pending != null) {
 145                 // flush pending data
 146                 for (ByteBufferReference b : pending) {
 147                     decoder.decode(b);
 148                 }
 149             }
 150 
 151             // push the received buffer to the frames decoder.
 152             decoder.decode(buf);
 153             return true;
 154         }
 155 
 156         // Mark that the connection preface is sent
 157         void markPrefaceSent() {
 158             assert !prefaceSent;
 159             synchronized (this) {
 160                 prefaceSent = true;
 161             }
 162         }
 163 
 164     }
 165 
 166     volatile boolean closed;
 167 
 168     //-------------------------------------
 169     final HttpConnection connection;
 170     private final HttpClientImpl client;
 171     private final Http2ClientImpl client2;
 172     private final Map<Integer,Stream<?>> streams = new ConcurrentHashMap<>();
 173     private int nextstreamid;
 174     private int nextPushStream = 2;
 175     private final Encoder hpackOut;
 176     private final Decoder hpackIn;
 177     final SettingsFrame clientSettings;
 178     private volatile SettingsFrame serverSettings;
 179     private final String key; // for HttpClientImpl.connections map
 180     private final FramesDecoder framesDecoder;
 181     private final FramesEncoder framesEncoder = new FramesEncoder();
 182 
 183     /**
 184      * Send Window controller for both connection and stream windows.
 185      * Each of this connection's Streams MUST use this controller.
 186      */
 187     private final WindowController windowController = new WindowController();
 188     private final FramesController framesController = new FramesController();
 189     final WindowUpdateSender windowUpdater;
 190 
 191     static final int DEFAULT_FRAME_SIZE = 16 * 1024;
 192 
 193 
 194     // TODO: need list of control frames from other threads
 195     // that need to be sent
 196 
 197     private Http2Connection(HttpConnection connection,
 198                             Http2ClientImpl client2,
 199                             int nextstreamid,
 200                             String key) {
 201         this.connection = connection;
 202         this.client = client2.client();
 203         this.client2 = client2;
 204         this.nextstreamid = nextstreamid;
 205         this.key = key;
 206         this.clientSettings = this.client2.getClientSettings();
 207         this.framesDecoder = new FramesDecoder(this::processFrame, clientSettings.getParameter(SettingsFrame.MAX_FRAME_SIZE));
 208         // serverSettings will be updated by server
 209         this.serverSettings = SettingsFrame.getDefaultSettings();
 210         this.hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE));
 211         this.hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE));
 212         this.windowUpdater = new ConnectionWindowUpdateSender(this, client.getReceiveBufferSize());
 213     }
 214 
 215     /**
 216      * Case 1) Create from upgraded HTTP/1.1 connection.
 217      * Is ready to use. Will not be SSL. exchange is the Exchange
 218      * that initiated the connection, whose response will be delivered
 219      * on a Stream.
 220      */
 221     Http2Connection(HttpConnection connection,
 222                     Http2ClientImpl client2,
 223                     Exchange<?> exchange,
 224                     ByteBuffer initial)
 225         throws IOException, InterruptedException
 226     {
 227         this(connection,
 228                 client2,
 229                 3, // stream 1 is registered during the upgrade
 230                 keyFor(connection));
 231         assert !(connection instanceof SSLConnection);
 232         Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize());
 233 
 234         Stream<?> initialStream = createStream(exchange);
 235         initialStream.registerStream(1);
 236         windowController.registerStream(1, getInitialSendWindowSize());
 237         initialStream.requestSent();
 238         sendConnectionPreface();
 239         // start reading and writing
 240         // start reading
 241         AsyncConnection asyncConn = (AsyncConnection)connection;
 242         asyncConn.setAsyncCallbacks(this::asyncReceive, this::shutdown, this::getReadBuffer);
 243         connection.configureMode(Mode.ASYNC); // set mode only AFTER setAsyncCallbacks to provide visibility.
 244         asyncReceive(ByteBufferReference.of(initial));
 245         asyncConn.startReading();
 246     }
 247 
 248     // async style but completes immediately
 249     static CompletableFuture<Http2Connection> createAsync(HttpConnection connection,
 250                                                           Http2ClientImpl client2,
 251                                                           Exchange<?> exchange,
 252                                                           ByteBuffer initial) {
 253         return MinimalFuture.supply(() -> new Http2Connection(connection, client2, exchange, initial));
 254     }
 255 
 256     /**
 257      * Cases 2) 3)
 258      *
 259      * request is request to be sent.
 260      */
 261     Http2Connection(HttpRequestImpl request, Http2ClientImpl h2client)
 262         throws IOException, InterruptedException
 263     {
 264         this(HttpConnection.getConnection(request.getAddress(h2client.client()), h2client.client(), request, true),
 265                 h2client,
 266                 1,
 267                 keyFor(request.uri(), request.proxy(h2client.client())));
 268         Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize());
 269 
 270         // start reading
 271         AsyncConnection asyncConn = (AsyncConnection)connection;
 272         asyncConn.setAsyncCallbacks(this::asyncReceive, this::shutdown, this::getReadBuffer);
 273         connection.connect();
 274         checkSSLConfig();
 275         // safe to resume async reading now.
 276         asyncConn.enableCallback();
 277         sendConnectionPreface();
 278     }
 279 
 280     /**
 281      * Throws an IOException if h2 was not negotiated
 282      */
 283     private void checkSSLConfig() throws IOException {
 284         AbstractAsyncSSLConnection aconn = (AbstractAsyncSSLConnection)connection;
 285         SSLEngine engine = aconn.getEngine();
 286         String alpn = engine.getApplicationProtocol();
 287         if (alpn == null || !alpn.equals("h2")) {
 288             String msg;
 289             if (alpn == null) {
 290                 Log.logSSL("ALPN not supported");
 291                 msg = "ALPN not supported";
 292             } else switch (alpn) {
 293               case "":
 294                 Log.logSSL("No ALPN returned");
 295                 msg = "No ALPN negotiated";
 296                 break;
 297               case "http/1.1":
 298                 Log.logSSL("HTTP/1.1 ALPN returned");
 299                 msg = "HTTP/1.1 ALPN returned";
 300                 break;
 301               default:
 302                 Log.logSSL("unknown ALPN returned");
 303                 msg = "Unexpected ALPN: " + alpn;
 304                 throw new IOException(msg);
 305             }
 306             throw new ALPNException(msg, aconn);
 307         }
 308     }
 309 
 310     static String keyFor(HttpConnection connection) {
 311         boolean isProxy = connection.isProxied();
 312         boolean isSecure = connection.isSecure();
 313         InetSocketAddress addr = connection.address();
 314 
 315         return keyString(isSecure, isProxy, addr.getHostString(), addr.getPort());
 316     }
 317 
 318     static String keyFor(URI uri, InetSocketAddress proxy) {
 319         boolean isSecure = uri.getScheme().equalsIgnoreCase("https");
 320         boolean isProxy = proxy != null;
 321 
 322         String host;
 323         int port;
 324 
 325         if (isProxy) {
 326             host = proxy.getHostString();
 327             port = proxy.getPort();
 328         } else {
 329             host = uri.getHost();
 330             port = uri.getPort();
 331         }
 332         return keyString(isSecure, isProxy, host, port);
 333     }
 334 
 335     // {C,S}:{H:P}:host:port
 336     // C indicates clear text connection "http"
 337     // S indicates secure "https"
 338     // H indicates host (direct) connection
 339     // P indicates proxy
 340     // Eg: "S:H:foo.com:80"
 341     static String keyString(boolean secure, boolean proxy, String host, int port) {
 342         return (secure ? "S:" : "C:") + (proxy ? "P:" : "H:") + host + ":" + port;
 343     }
 344 
 345     String key() {
 346         return this.key;
 347     }
 348 
 349     void putConnection() {
 350         client2.putConnection(this);
 351     }
 352 
 353     private static String toHexdump1(ByteBuffer bb) {
 354         bb.mark();
 355         StringBuilder sb = new StringBuilder(512);
 356         Formatter f = new Formatter(sb);
 357 
 358         while (bb.hasRemaining()) {
 359             int i =  Byte.toUnsignedInt(bb.get());
 360             f.format("%02x:", i);
 361         }
 362         sb.deleteCharAt(sb.length()-1);
 363         bb.reset();
 364         return sb.toString();
 365     }
 366 
 367     private static String toHexdump(ByteBuffer bb) {
 368         List<String> words = new ArrayList<>();
 369         int i = 0;
 370         bb.mark();
 371         while (bb.hasRemaining()) {
 372             if (i % 2 == 0) {
 373                 words.add("");
 374             }
 375             byte b = bb.get();
 376             String hex = Integer.toHexString(256 + Byte.toUnsignedInt(b)).substring(1);
 377             words.set(i / 2, words.get(i / 2) + hex);
 378             i++;
 379         }
 380         bb.reset();
 381         return words.stream().collect(Collectors.joining(" "));
 382     }
 383 
 384     private void decodeHeaders(HeaderFrame frame, DecodingCallback decoder) {
 385         boolean endOfHeaders = frame.getFlag(HeaderFrame.END_HEADERS);
 386 
 387         ByteBufferReference[] buffers = frame.getHeaderBlock();
 388         for (int i = 0; i < buffers.length; i++) {
 389             hpackIn.decode(buffers[i].get(), endOfHeaders && (i == buffers.length - 1), decoder);
 390         }
 391     }
 392 
 393     int getInitialSendWindowSize() {
 394         return serverSettings.getParameter(INITIAL_WINDOW_SIZE);
 395     }
 396 
 397     void close() {
 398         GoAwayFrame f = new GoAwayFrame(0, ErrorFrame.NO_ERROR, "Requested by user".getBytes());
 399         // TODO: set last stream. For now zero ok.
 400         sendFrame(f);
 401     }
 402 
 403     private ByteBufferPool readBufferPool = new ByteBufferPool();
 404 
 405     // provides buffer to read data (default size)
 406     public ByteBufferReference getReadBuffer() {
 407         return readBufferPool.get(getMaxReceiveFrameSize() + Http2Frame.FRAME_HEADER_SIZE);
 408     }
 409 
 410     private final Object readlock = new Object();
 411 
 412     public void asyncReceive(ByteBufferReference buffer) {
 413         // We don't need to read anything and
 414         // we don't want to send anything back to the server
 415         // until the connection preface has been sent.
 416         // Therefore we're going to wait if needed before reading
 417         // (and thus replying) to anything.
 418         // Starting to reply to something (e.g send an ACK to a
 419         // SettingsFrame sent by the server) before the connection
 420         // preface is fully sent might result in the server
 421         // sending a GOAWAY frame with 'invalid_preface'.
 422         synchronized (readlock) {
 423             try {
 424                 // the readlock ensures that the order of incoming buffers
 425                 // is preserved.
 426                 framesController.processReceivedData(framesDecoder, buffer);
 427             } catch (Throwable e) {
 428                 String msg = Utils.stackTrace(e);
 429                 Log.logTrace(msg);
 430                 shutdown(e);
 431             }
 432         }
 433     }
 434 
 435 
 436     void shutdown(Throwable t) {
 437         Log.logError(t);
 438         closed = true;
 439         client2.deleteConnection(this);
 440         List<Stream<?>> c = new LinkedList<>(streams.values());
 441         for (Stream<?> s : c) {
 442             s.cancelImpl(t);
 443         }
 444         connection.close();
 445     }
 446 
 447     /**
 448      * Handles stream 0 (common) frames that apply to whole connection and passes
 449      * other stream specific frames to that Stream object.
 450      *
 451      * Invokes Stream.incoming() which is expected to process frame without
 452      * blocking.
 453      */
 454     void processFrame(Http2Frame frame) throws IOException {
 455         Log.logFrames(frame, "IN");
 456         int streamid = frame.streamid();
 457         if (frame instanceof MalformedFrame) {
 458             Log.logError(((MalformedFrame) frame).getMessage());
 459             if (streamid == 0) {
 460                 protocolError(((MalformedFrame) frame).getErrorCode());
 461             } else {
 462                 resetStream(streamid, ((MalformedFrame) frame).getErrorCode());
 463             }
 464             return;
 465         }
 466         if (streamid == 0) {
 467             handleConnectionFrame(frame);
 468         } else {
 469             if (frame instanceof SettingsFrame) {
 470                 // The stream identifier for a SETTINGS frame MUST be zero
 471                 protocolError(GoAwayFrame.PROTOCOL_ERROR);
 472                 return;
 473             }
 474 
 475             Stream<?> stream = getStream(streamid);
 476             if (stream == null) {
 477                 // Should never receive a frame with unknown stream id
 478 
 479                 // To avoid looping, an endpoint MUST NOT send a RST_STREAM in
 480                 // response to a RST_STREAM frame.
 481                 if (!(frame instanceof ResetFrame)) {
 482                     resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
 483                 }
 484                 return;
 485             }
 486             if (frame instanceof PushPromiseFrame) {
 487                 PushPromiseFrame pp = (PushPromiseFrame)frame;
 488                 handlePushPromise(stream, pp);
 489             } else if (frame instanceof HeaderFrame) {
 490                 // decode headers (or continuation)
 491                 decodeHeaders((HeaderFrame) frame, stream.rspHeadersConsumer());
 492                 stream.incoming(frame);
 493             } else {
 494                 stream.incoming(frame);
 495             }
 496         }
 497     }
 498 
 499     private <T> void handlePushPromise(Stream<T> parent, PushPromiseFrame pp)
 500         throws IOException
 501     {
 502         HttpRequestImpl parentReq = parent.request;
 503         int promisedStreamid = pp.getPromisedStream();
 504         if (promisedStreamid != nextPushStream) {
 505             resetStream(promisedStreamid, ResetFrame.PROTOCOL_ERROR);
 506             return;
 507         } else {
 508             nextPushStream += 2;
 509         }
 510         HeaderDecoder decoder = new HeaderDecoder();
 511         decodeHeaders(pp, decoder);
 512         HttpHeadersImpl headers = decoder.headers();
 513         HttpRequestImpl pushReq = HttpRequestImpl.createPushRequest(parentReq, headers);
 514         Exchange<T> pushExch = new Exchange<>(pushReq, parent.exchange.multi);
 515         Stream.PushedStream<?,T> pushStream = createPushStream(parent, pushExch);
 516         pushExch.exchImpl = pushStream;
 517         pushStream.registerStream(promisedStreamid);
 518         parent.incoming_pushPromise(pushReq, pushStream);
 519     }
 520 
 521     private void handleConnectionFrame(Http2Frame frame)
 522         throws IOException
 523     {
 524         switch (frame.type()) {
 525           case SettingsFrame.TYPE:
 526               handleSettings((SettingsFrame)frame);
 527               break;
 528           case PingFrame.TYPE:
 529               handlePing((PingFrame)frame);
 530               break;
 531           case GoAwayFrame.TYPE:
 532               handleGoAway((GoAwayFrame)frame);
 533               break;
 534           case WindowUpdateFrame.TYPE:
 535               handleWindowUpdate((WindowUpdateFrame)frame);
 536               break;
 537           default:
 538             protocolError(ErrorFrame.PROTOCOL_ERROR);
 539         }
 540     }
 541 
 542     void resetStream(int streamid, int code) throws IOException {
 543         Log.logError(
 544             "Resetting stream {0,number,integer} with error code {1,number,integer}",
 545             streamid, code);
 546         ResetFrame frame = new ResetFrame(streamid, code);
 547         sendFrame(frame);
 548         closeStream(streamid);
 549     }
 550 
 551     void closeStream(int streamid) {
 552         Stream<?> s = streams.remove(streamid);
 553         // ## Remove s != null. It is a hack for delayed cancellation,reset
 554         if (s != null && !(s instanceof Stream.PushedStream)) {
 555             // Since PushStreams have no request body, then they have no
 556             // corresponding entry in the window controller.
 557             windowController.removeStream(streamid);
 558         }
 559     }
 560     /**
 561      * Increments this connection's send Window by the amount in the given frame.
 562      */
 563     private void handleWindowUpdate(WindowUpdateFrame f)
 564         throws IOException
 565     {
 566         int amount = f.getUpdate();
 567         if (amount <= 0) {
 568             // ## temporarily disable to workaround a bug in Jetty where it
 569             // ## sends Window updates with a 0 update value.
 570             //protocolError(ErrorFrame.PROTOCOL_ERROR);
 571         } else {
 572             boolean success = windowController.increaseConnectionWindow(amount);
 573             if (!success) {
 574                 protocolError(ErrorFrame.FLOW_CONTROL_ERROR);  // overflow
 575             }
 576         }
 577     }
 578 
 579     private void protocolError(int errorCode)
 580         throws IOException
 581     {
 582         GoAwayFrame frame = new GoAwayFrame(0, errorCode);
 583         sendFrame(frame);
 584         shutdown(new IOException("protocol error"));
 585     }
 586 
 587     private void handleSettings(SettingsFrame frame)
 588         throws IOException
 589     {
 590         assert frame.streamid() == 0;
 591         if (!frame.getFlag(SettingsFrame.ACK)) {
 592             int oldWindowSize = serverSettings.getParameter(INITIAL_WINDOW_SIZE);
 593             int newWindowSize = frame.getParameter(INITIAL_WINDOW_SIZE);
 594             int diff = newWindowSize - oldWindowSize;
 595             if (diff != 0) {
 596                 windowController.adjustActiveStreams(diff);
 597             }
 598             serverSettings = frame;
 599             sendFrame(new SettingsFrame(SettingsFrame.ACK));
 600         }
 601     }
 602 
 603     private void handlePing(PingFrame frame)
 604         throws IOException
 605     {
 606         frame.setFlag(PingFrame.ACK);
 607         sendUnorderedFrame(frame);
 608     }
 609 
 610     private void handleGoAway(GoAwayFrame frame)
 611         throws IOException
 612     {
 613         shutdown(new IOException(
 614                         String.valueOf(connection.channel().getLocalAddress())
 615                         +": GOAWAY received"));
 616     }
 617 
 618     /**
 619      * Max frame size we are allowed to send
 620      */
 621     public int getMaxSendFrameSize() {
 622         int param = serverSettings.getParameter(MAX_FRAME_SIZE);
 623         if (param == -1) {
 624             param = DEFAULT_FRAME_SIZE;
 625         }
 626         return param;
 627     }
 628 
 629     /**
 630      * Max frame size we will receive
 631      */
 632     public int getMaxReceiveFrameSize() {
 633         return clientSettings.getParameter(MAX_FRAME_SIZE);
 634     }
 635 
 636     // Not sure how useful this is.
 637     public int getMaxHeadersSize() {
 638         return serverSettings.getParameter(MAX_HEADER_LIST_SIZE);
 639     }
 640 
 641     private static final String CLIENT_PREFACE = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
 642 
 643     private static final byte[] PREFACE_BYTES =
 644         CLIENT_PREFACE.getBytes(StandardCharsets.ISO_8859_1);
 645 
 646     /**
 647      * Sends Connection preface and Settings frame with current preferred
 648      * values
 649      */
 650     private void sendConnectionPreface() throws IOException {
 651         Log.logTrace("{0}: start sending connection preface to {1}",
 652                      connection.channel().getLocalAddress(),
 653                      connection.address());
 654         SettingsFrame sf = client2.getClientSettings();
 655         ByteBufferReference ref = framesEncoder.encodeConnectionPreface(PREFACE_BYTES, sf);
 656         Log.logFrames(sf, "OUT");
 657         // send preface bytes and SettingsFrame together
 658         connection.write(ref.get());
 659         // mark preface sent.
 660         framesController.markPrefaceSent();
 661         Log.logTrace("PREFACE_BYTES sent");
 662         Log.logTrace("Settings Frame sent");
 663 
 664         // send a Window update for the receive buffer we are using
 665         // minus the initial 64 K specified in protocol
 666         final int len = client2.client().getReceiveBufferSize() - (64 * 1024 - 1);
 667         windowUpdater.sendWindowUpdate(len);
 668         // there will be an ACK to the windows update - which should
 669         // cause any pending data stored before the preface was sent to be
 670         // flushed (see PrefaceController).
 671         Log.logTrace("finished sending connection preface");
 672     }
 673 
 674     /**
 675      * Returns an existing Stream with given id, or null if doesn't exist
 676      */
 677     @SuppressWarnings("unchecked")
 678     <T> Stream<T> getStream(int streamid) {
 679         return (Stream<T>)streams.get(streamid);
 680     }
 681 
 682     /**
 683      * Creates Stream with given id.
 684      */
 685     <T> Stream<T> createStream(Exchange<T> exchange) {
 686         Stream<T> stream = new Stream<>(client, this, exchange, windowController);
 687         return stream;
 688     }
 689 
 690     <T> Stream.PushedStream<?,T> createPushStream(Stream<T> parent, Exchange<T> pushEx) {
 691         PushGroup<?,T> pg = parent.exchange.getPushGroup();
 692         return new Stream.PushedStream<>(pg, client, this, parent, pushEx);
 693     }
 694 
 695     <T> void putStream(Stream<T> stream, int streamid) {
 696         streams.put(streamid, stream);
 697     }
 698 
 699     void deleteStream(int streamid) {
 700         streams.remove(streamid);
 701         windowController.removeStream(streamid);
 702     }
 703 
 704     /**
 705      * Encode the headers into a List<ByteBuffer> and then create HEADERS
 706      * and CONTINUATION frames from the list and return the List<Http2Frame>.
 707      */
 708     private List<HeaderFrame> encodeHeaders(OutgoingHeaders<Stream<?>> frame) {
 709         List<ByteBufferReference> buffers = encodeHeadersImpl(
 710                 getMaxSendFrameSize(),
 711                 frame.getAttachment().getRequestPseudoHeaders(),
 712                 frame.getUserHeaders(),
 713                 frame.getSystemHeaders());
 714 
 715         List<HeaderFrame> frames = new ArrayList<>(buffers.size());
 716         Iterator<ByteBufferReference> bufIterator = buffers.iterator();
 717         HeaderFrame oframe = new HeadersFrame(frame.streamid(), frame.getFlags(), bufIterator.next());
 718         frames.add(oframe);
 719         while(bufIterator.hasNext()) {
 720             oframe = new ContinuationFrame(frame.streamid(), bufIterator.next());
 721             frames.add(oframe);
 722         }
 723         oframe.setFlag(HeaderFrame.END_HEADERS);
 724         return frames;
 725     }
 726 
 727     // Dedicated cache for headers encoding ByteBuffer.
 728     // There can be no concurrent access to this  buffer as all access to this buffer
 729     // and its content happen within a single critical code block section protected
 730     // by the sendLock. / (see sendFrame())
 731     private ByteBufferPool headerEncodingPool = new ByteBufferPool();
 732 
 733     private ByteBufferReference getHeaderBuffer(int maxFrameSize) {
 734         ByteBufferReference ref = headerEncodingPool.get(maxFrameSize);
 735         ref.get().limit(maxFrameSize);
 736         return ref;
 737     }
 738 
 739     /*
 740      * Encodes all the headers from the given HttpHeaders into the given List
 741      * of buffers.
 742      *
 743      * From https://tools.ietf.org/html/rfc7540#section-8.1.2 :
 744      *
 745      *     ...Just as in HTTP/1.x, header field names are strings of ASCII
 746      *     characters that are compared in a case-insensitive fashion.  However,
 747      *     header field names MUST be converted to lowercase prior to their
 748      *     encoding in HTTP/2...
 749      */
 750     private List<ByteBufferReference> encodeHeadersImpl(int maxFrameSize, HttpHeaders... headers) {
 751         ByteBufferReference buffer = getHeaderBuffer(maxFrameSize);
 752         List<ByteBufferReference> buffers = new ArrayList<>();
 753         for(HttpHeaders header : headers) {
 754             for (Map.Entry<String, List<String>> e : header.map().entrySet()) {
 755                 String lKey = e.getKey().toLowerCase();
 756                 List<String> values = e.getValue();
 757                 for (String value : values) {
 758                     hpackOut.header(lKey, value);
 759                     while (!hpackOut.encode(buffer.get())) {
 760                         buffer.get().flip();
 761                         buffers.add(buffer);
 762                         buffer =  getHeaderBuffer(maxFrameSize);
 763                     }
 764                 }
 765             }
 766         }
 767         buffer.get().flip();
 768         buffers.add(buffer);
 769         return buffers;
 770     }
 771 
 772     private ByteBufferReference[] encodeHeaders(OutgoingHeaders<Stream<?>> oh, Stream<?> stream) {
 773         oh.streamid(stream.streamid);
 774         if (Log.headers()) {
 775             StringBuilder sb = new StringBuilder("HEADERS FRAME (stream=");
 776             sb.append(stream.streamid).append(")\n");
 777             Log.dumpHeaders(sb, "    ", oh.getAttachment().getRequestPseudoHeaders());
 778             Log.dumpHeaders(sb, "    ", oh.getSystemHeaders());
 779             Log.dumpHeaders(sb, "    ", oh.getUserHeaders());
 780             Log.logHeaders(sb.toString());
 781         }
 782         List<HeaderFrame> frames = encodeHeaders(oh);
 783         return encodeFrames(frames);
 784     }
 785 
 786     private ByteBufferReference[] encodeFrames(List<HeaderFrame> frames) {
 787         if (Log.frames()) {
 788             frames.forEach(f -> Log.logFrames(f, "OUT"));
 789         }
 790         return framesEncoder.encodeFrames(frames);
 791     }
 792 
 793     static Throwable getExceptionFrom(CompletableFuture<?> cf) {
 794         try {
 795             cf.get();
 796             return null;
 797         } catch (Throwable e) {
 798             if (e.getCause() != null) {
 799                 return e.getCause();
 800             } else {
 801                 return e;
 802             }
 803         }
 804     }
 805 
 806     private Stream<?> registerNewStream(OutgoingHeaders<Stream<?>> oh) {
 807         Stream<?> stream = oh.getAttachment();
 808         int streamid = nextstreamid;
 809         nextstreamid += 2;
 810         stream.registerStream(streamid);
 811         // set outgoing window here. This allows thread sending
 812         // body to proceed.
 813         windowController.registerStream(streamid, getInitialSendWindowSize());
 814         return stream;
 815     }
 816 
 817     private final Object sendlock = new Object();
 818 
 819     void sendFrame(Http2Frame frame) {
 820         try {
 821             synchronized (sendlock) {
 822                 if (frame instanceof OutgoingHeaders) {
 823                     @SuppressWarnings("unchecked")
 824                     OutgoingHeaders<Stream<?>> oh = (OutgoingHeaders<Stream<?>>) frame;
 825                     Stream<?> stream = registerNewStream(oh);
 826                     // provide protection from inserting unordered frames between Headers and Continuation
 827                     connection.writeAsync(encodeHeaders(oh, stream));
 828                 } else {
 829                     connection.writeAsync(encodeFrame(frame));
 830                 }
 831             }
 832             connection.flushAsync();
 833         } catch (IOException e) {
 834             if (!closed) {
 835                 Log.logError(e);
 836                 shutdown(e);
 837             }
 838         }
 839     }
 840 
 841     private ByteBufferReference[] encodeFrame(Http2Frame frame) {
 842         Log.logFrames(frame, "OUT");
 843         return framesEncoder.encodeFrame(frame);
 844     }
 845 
 846     void sendDataFrame(DataFrame frame) {
 847         try {
 848             connection.writeAsync(encodeFrame(frame));
 849             connection.flushAsync();
 850         } catch (IOException e) {
 851             if (!closed) {
 852                 Log.logError(e);
 853                 shutdown(e);
 854             }
 855         }
 856     }
 857 
 858     /*
 859      * Direct call of the method bypasses synchronization on "sendlock" and
 860      * allowed only of control frames: WindowUpdateFrame, PingFrame and etc.
 861      * prohibited for such frames as DataFrame, HeadersFrame, ContinuationFrame.
 862      */
 863     void sendUnorderedFrame(Http2Frame frame) {
 864         try {
 865             connection.writeAsyncUnordered(encodeFrame(frame));
 866             connection.flushAsync();
 867         } catch (IOException e) {
 868             if (!closed) {
 869                 Log.logError(e);
 870                 shutdown(e);
 871             }
 872         }
 873     }
 874 
 875     static class HeaderDecoder implements DecodingCallback {
 876         HttpHeadersImpl headers;
 877 
 878         HeaderDecoder() {
 879             this.headers = new HttpHeadersImpl();
 880         }
 881 
 882         @Override
 883         public void onDecoded(CharSequence name, CharSequence value) {
 884             headers.addHeader(name.toString(), value.toString());
 885         }
 886 
 887         HttpHeadersImpl headers() {
 888             return headers;
 889         }
 890     }
 891 
 892     static final class ConnectionWindowUpdateSender extends WindowUpdateSender {
 893 
 894         public ConnectionWindowUpdateSender(Http2Connection connection,
 895                                             int initialWindowSize) {
 896             super(connection, initialWindowSize);
 897         }
 898 
 899         @Override
 900         int getStreamId() {
 901             return 0;
 902         }
 903     }
 904 
 905     /**
 906      * Thrown when https handshake negotiates http/1.1 alpn instead of h2
 907      */
 908     static final class ALPNException extends IOException {
 909         private static final long serialVersionUID = 23138275393635783L;
 910         final AbstractAsyncSSLConnection connection;
 911 
 912         ALPNException(String msg, AbstractAsyncSSLConnection connection) {
 913             super(msg);
 914             this.connection = connection;
 915         }
 916 
 917         AbstractAsyncSSLConnection getConnection() {
 918             return connection;
 919         }
 920     }
 921 }